r3gm's picture
v0.5.0
b152010
from pydub import AudioSegment
from tqdm import tqdm
from .utils import run_command
from .logging_setup import logger
import numpy as np
class Mixer:
def __init__(self):
self.parts = []
def __len__(self):
parts = self._sync()
seg = parts[0][1]
frame_count = max(offset + seg.frame_count() for offset, seg in parts)
return int(1000.0 * frame_count / seg.frame_rate)
def overlay(self, sound, position=0):
self.parts.append((position, sound))
return self
def _sync(self):
positions, segs = zip(*self.parts)
frame_rate = segs[0].frame_rate
array_type = segs[0].array_type # noqa
offsets = [int(frame_rate * pos / 1000.0) for pos in positions]
segs = AudioSegment.empty()._sync(*segs)
return list(zip(offsets, segs))
def append(self, sound):
self.overlay(sound, position=len(self))
def to_audio_segment(self):
parts = self._sync()
seg = parts[0][1]
channels = seg.channels
frame_count = max(offset + seg.frame_count() for offset, seg in parts)
sample_count = int(frame_count * seg.channels)
output = np.zeros(sample_count, dtype="int32")
for offset, seg in parts:
sample_offset = offset * channels
samples = np.frombuffer(seg.get_array_of_samples(), dtype="int32")
samples = np.int16(samples/np.max(np.abs(samples)) * 32767)
start = sample_offset
end = start + len(samples)
output[start:end] += samples
return seg._spawn(
output, overrides={"sample_width": 4}).normalize(headroom=0.0)
def create_translated_audio(
result_diarize, audio_files, final_file, concat=False, avoid_overlap=False,
):
total_duration = result_diarize["segments"][-1]["end"] # in seconds
if concat:
"""
file .\audio\1.ogg
file .\audio\2.ogg
file .\audio\3.ogg
file .\audio\4.ogg
...
"""
# Write the file paths to list.txt
with open("list.txt", "w") as file:
for i, audio_file in enumerate(audio_files):
if i == len(audio_files) - 1: # Check if it's the last item
file.write(f"file {audio_file}")
else:
file.write(f"file {audio_file}\n")
# command = f"ffmpeg -f concat -safe 0 -i list.txt {final_file}"
command = (
f"ffmpeg -f concat -safe 0 -i list.txt -c:a pcm_s16le {final_file}"
)
run_command(command)
else:
# silent audio with total_duration
base_audio = AudioSegment.silent(
duration=int(total_duration * 1000), frame_rate=41000
)
combined_audio = Mixer()
combined_audio.overlay(base_audio)
logger.debug(
f"Audio duration: {total_duration // 60} "
f"minutes and {int(total_duration % 60)} seconds"
)
last_end_time = 0
previous_speaker = ""
for line, audio_file in tqdm(
zip(result_diarize["segments"], audio_files)
):
start = float(line["start"])
# Overlay each audio at the corresponding time
try:
audio = AudioSegment.from_file(audio_file)
# audio_a = audio.speedup(playback_speed=1.5)
if avoid_overlap:
speaker = line["speaker"]
if (last_end_time - 0.500) > start:
overlap_time = last_end_time - start
if previous_speaker and previous_speaker != speaker:
start = (last_end_time - 0.500)
else:
start = (last_end_time - 0.200)
if overlap_time > 2.5:
start = start - 0.3
logger.info(
f"Avoid overlap for {str(audio_file)} "
f"with {str(start)}"
)
previous_speaker = speaker
duration_tts_seconds = len(audio) / 1000.0 # to sec
last_end_time = (start + duration_tts_seconds)
start_time = start * 1000 # to ms
combined_audio = combined_audio.overlay(
audio, position=start_time
)
except Exception as error:
logger.debug(str(error))
logger.error(f"Error audio file {audio_file}")
# combined audio as a file
combined_audio_data = combined_audio.to_audio_segment()
combined_audio_data.export(
final_file, format="wav"
) # best than ogg, change if the audio is anomalous