Spaces:
Sleeping
Sleeping
File size: 3,341 Bytes
f0ceee4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
from pyannote.audio import Pipeline
from pydub import AudioSegment
import os
import re
import torch
def perform_diarization(audio_file_path, translated_file_path, output_dir='./audio/diarization'):
# Initialize diarization pipeline
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1")
# Send pipeline to GPU (when available)
pipeline.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
# Load audio file
audio = AudioSegment.from_wav(audio_file_path)
# Apply pretrained pipeline
diarization = pipeline(audio_file_path)
os.makedirs(output_dir, exist_ok=True)
# Process and save each speaker's audio segments
speaker_segments_audio = {}
for turn, _, speaker in diarization.itertracks(yield_label=True):
start_ms = int(turn.start * 1000) # Convert to milliseconds
end_ms = int(turn.end * 1000) # Convert to milliseconds
segment = audio[start_ms:end_ms]
if speaker in speaker_segments_audio:
speaker_segments_audio[speaker] += segment
else:
speaker_segments_audio[speaker] = segment
# Save audio segments
for speaker, segment in speaker_segments_audio.items():
output_path = os.path.join(output_dir, f"{speaker}.wav")
segment.export(output_path, format="wav")
print(f"Combined audio for speaker {speaker} saved in {output_path}")
# Load translated text
with open(translated_file_path, "r") as file:
translated_lines = file.readlines()
# Process and align translated text with diarization data
last_speaker = None
aligned_text = []
timestamp_pattern = re.compile(r'\[(\d+\.\d+)\-(\d+\.\d+)\]')
for line in translated_lines:
match = timestamp_pattern.match(line)
if match:
start_time = float(match.group(1))
end_time = float(match.group(2))
text = line[match.end():].strip() # Extract text part
speaker_found = False
# Find corresponding speaker
for turn, _, speaker in diarization.itertracks(yield_label=True):
speaker_start = turn.start
speaker_end = turn.end
# Check for overlap between speaker segment and line timestamp
if max(speaker_start, start_time) < min(speaker_end, end_time):
aligned_text.append(f"[{speaker}] [{start_time}-{end_time}] {text}")
speaker_found = True
last_speaker = speaker
break
# If no speaker found, use the last speaker
if not speaker_found:
if last_speaker is not None:
aligned_text.append(f"[{last_speaker}] [{start_time}-{end_time}] {text}")
else:
aligned_text.append(f"[Unknown Speaker] [{start_time}-{end_time}] {text}")
# Save aligned text to a single file
aligned_text_output_path = os.path.join(output_dir, "aligned_text.txt")
with open(aligned_text_output_path, "w") as aligned_text_file:
aligned_text_file.write('\n'.join(aligned_text))
print(f"Aligned text saved in {aligned_text_output_path}")
# The rest of your script, if any
|