File size: 4,832 Bytes
b152010 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
from pydub import AudioSegment
from tqdm import tqdm
from .utils import run_command
from .logging_setup import logger
import numpy as np
class Mixer:
def __init__(self):
self.parts = []
def __len__(self):
parts = self._sync()
seg = parts[0][1]
frame_count = max(offset + seg.frame_count() for offset, seg in parts)
return int(1000.0 * frame_count / seg.frame_rate)
def overlay(self, sound, position=0):
self.parts.append((position, sound))
return self
def _sync(self):
positions, segs = zip(*self.parts)
frame_rate = segs[0].frame_rate
array_type = segs[0].array_type # noqa
offsets = [int(frame_rate * pos / 1000.0) for pos in positions]
segs = AudioSegment.empty()._sync(*segs)
return list(zip(offsets, segs))
def append(self, sound):
self.overlay(sound, position=len(self))
def to_audio_segment(self):
parts = self._sync()
seg = parts[0][1]
channels = seg.channels
frame_count = max(offset + seg.frame_count() for offset, seg in parts)
sample_count = int(frame_count * seg.channels)
output = np.zeros(sample_count, dtype="int32")
for offset, seg in parts:
sample_offset = offset * channels
samples = np.frombuffer(seg.get_array_of_samples(), dtype="int32")
samples = np.int16(samples/np.max(np.abs(samples)) * 32767)
start = sample_offset
end = start + len(samples)
output[start:end] += samples
return seg._spawn(
output, overrides={"sample_width": 4}).normalize(headroom=0.0)
def create_translated_audio(
result_diarize, audio_files, final_file, concat=False, avoid_overlap=False,
):
total_duration = result_diarize["segments"][-1]["end"] # in seconds
if concat:
"""
file .\audio\1.ogg
file .\audio\2.ogg
file .\audio\3.ogg
file .\audio\4.ogg
...
"""
# Write the file paths to list.txt
with open("list.txt", "w") as file:
for i, audio_file in enumerate(audio_files):
if i == len(audio_files) - 1: # Check if it's the last item
file.write(f"file {audio_file}")
else:
file.write(f"file {audio_file}\n")
# command = f"ffmpeg -f concat -safe 0 -i list.txt {final_file}"
command = (
f"ffmpeg -f concat -safe 0 -i list.txt -c:a pcm_s16le {final_file}"
)
run_command(command)
else:
# silent audio with total_duration
base_audio = AudioSegment.silent(
duration=int(total_duration * 1000), frame_rate=41000
)
combined_audio = Mixer()
combined_audio.overlay(base_audio)
logger.debug(
f"Audio duration: {total_duration // 60} "
f"minutes and {int(total_duration % 60)} seconds"
)
last_end_time = 0
previous_speaker = ""
for line, audio_file in tqdm(
zip(result_diarize["segments"], audio_files)
):
start = float(line["start"])
# Overlay each audio at the corresponding time
try:
audio = AudioSegment.from_file(audio_file)
# audio_a = audio.speedup(playback_speed=1.5)
if avoid_overlap:
speaker = line["speaker"]
if (last_end_time - 0.500) > start:
overlap_time = last_end_time - start
if previous_speaker and previous_speaker != speaker:
start = (last_end_time - 0.500)
else:
start = (last_end_time - 0.200)
if overlap_time > 2.5:
start = start - 0.3
logger.info(
f"Avoid overlap for {str(audio_file)} "
f"with {str(start)}"
)
previous_speaker = speaker
duration_tts_seconds = len(audio) / 1000.0 # to sec
last_end_time = (start + duration_tts_seconds)
start_time = start * 1000 # to ms
combined_audio = combined_audio.overlay(
audio, position=start_time
)
except Exception as error:
logger.debug(str(error))
logger.error(f"Error audio file {audio_file}")
# combined audio as a file
combined_audio_data = combined_audio.to_audio_segment()
combined_audio_data.export(
final_file, format="wav"
) # best than ogg, change if the audio is anomalous
|