File size: 3,417 Bytes
f0ceee4
 
 
 
 
 
 
 
2a3563d
 
6e84375
 
f0ceee4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
from pyannote.audio import Pipeline
from pydub import AudioSegment
import os
import re
import torch

def perform_diarization(audio_file_path, translated_file_path, output_dir='./audio/diarization'):

    # Initialize diarization pipeline
    accesstoken = os.environ['Diarization']
    pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=accesstoken )

    # Send pipeline to GPU (when available)
    pipeline.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))

    # Load audio file
    audio = AudioSegment.from_wav(audio_file_path)

    # Apply pretrained pipeline
    diarization = pipeline(audio_file_path)

    os.makedirs(output_dir, exist_ok=True)

    # Process and save each speaker's audio segments
    speaker_segments_audio = {}
    for turn, _, speaker in diarization.itertracks(yield_label=True):
        start_ms = int(turn.start * 1000)  # Convert to milliseconds
        end_ms = int(turn.end * 1000)      # Convert to milliseconds
        segment = audio[start_ms:end_ms]

        if speaker in speaker_segments_audio:
            speaker_segments_audio[speaker] += segment
        else:
            speaker_segments_audio[speaker] = segment

    # Save audio segments
    for speaker, segment in speaker_segments_audio.items():
        output_path = os.path.join(output_dir, f"{speaker}.wav")
        segment.export(output_path, format="wav")
        print(f"Combined audio for speaker {speaker} saved in {output_path}")

    # Load translated text
    with open(translated_file_path, "r") as file:
        translated_lines = file.readlines()

    # Process and align translated text with diarization data
    last_speaker = None
    aligned_text = []
    timestamp_pattern = re.compile(r'\[(\d+\.\d+)\-(\d+\.\d+)\]')
    for line in translated_lines:
        match = timestamp_pattern.match(line)
        
        if match:
            start_time = float(match.group(1))
            end_time = float(match.group(2))
            text = line[match.end():].strip()  # Extract text part

            speaker_found = False
            # Find corresponding speaker
            for turn, _, speaker in diarization.itertracks(yield_label=True):
                speaker_start = turn.start
                speaker_end = turn.end
                # Check for overlap between speaker segment and line timestamp
                if max(speaker_start, start_time) < min(speaker_end, end_time):
                    aligned_text.append(f"[{speaker}] [{start_time}-{end_time}] {text}")
                    speaker_found = True
                    last_speaker = speaker
                    break

            # If no speaker found, use the last speaker
            if not speaker_found:
                if last_speaker is not None:
                    aligned_text.append(f"[{last_speaker}] [{start_time}-{end_time}] {text}")
                else:
                    aligned_text.append(f"[Unknown Speaker] [{start_time}-{end_time}] {text}")

    # Save aligned text to a single file
    aligned_text_output_path = os.path.join(output_dir, "aligned_text.txt")
    with open(aligned_text_output_path, "w") as aligned_text_file:
        aligned_text_file.write('\n'.join(aligned_text))
    print(f"Aligned text saved in {aligned_text_output_path}")

# The rest of your script, if any