RobCaamano commited on
Commit
84a138e
1 Parent(s): a96698f

Update diarization.py

Browse files
Files changed (1) hide show
  1. diarization.py +81 -83
diarization.py CHANGED
@@ -1,83 +1,81 @@
1
- from pyannote.audio import Pipeline
2
- from pydub import AudioSegment
3
- import os
4
- import re
5
- import torch
6
-
7
- def perform_diarization(audio_file_path, translated_file_path, output_dir='./audio/diarization'):
8
-
9
- # Initialize diarization pipeline
10
- accesstoken = os.environ['Diarization']
11
- pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=accesstoken )
12
-
13
- # Send pipeline to GPU (when available)
14
- pipeline.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
15
-
16
- # Load audio file
17
- audio = AudioSegment.from_wav(audio_file_path)
18
-
19
- # Apply pretrained pipeline
20
- diarization = pipeline(audio_file_path)
21
-
22
- os.makedirs(output_dir, exist_ok=True)
23
-
24
- # Process and save each speaker's audio segments
25
- speaker_segments_audio = {}
26
- for turn, _, speaker in diarization.itertracks(yield_label=True):
27
- start_ms = int(turn.start * 1000) # Convert to milliseconds
28
- end_ms = int(turn.end * 1000) # Convert to milliseconds
29
- segment = audio[start_ms:end_ms]
30
-
31
- if speaker in speaker_segments_audio:
32
- speaker_segments_audio[speaker] += segment
33
- else:
34
- speaker_segments_audio[speaker] = segment
35
-
36
- # Save audio segments
37
- for speaker, segment in speaker_segments_audio.items():
38
- output_path = os.path.join(output_dir, f"{speaker}.wav")
39
- segment.export(output_path, format="wav")
40
- print(f"Combined audio for speaker {speaker} saved in {output_path}")
41
-
42
- # Load translated text
43
- with open(translated_file_path, "r") as file:
44
- translated_lines = file.readlines()
45
-
46
- # Process and align translated text with diarization data
47
- last_speaker = None
48
- aligned_text = []
49
- timestamp_pattern = re.compile(r'\[(\d+\.\d+)\-(\d+\.\d+)\]')
50
- for line in translated_lines:
51
- match = timestamp_pattern.match(line)
52
-
53
- if match:
54
- start_time = float(match.group(1))
55
- end_time = float(match.group(2))
56
- text = line[match.end():].strip() # Extract text part
57
-
58
- speaker_found = False
59
- # Find corresponding speaker
60
- for turn, _, speaker in diarization.itertracks(yield_label=True):
61
- speaker_start = turn.start
62
- speaker_end = turn.end
63
- # Check for overlap between speaker segment and line timestamp
64
- if max(speaker_start, start_time) < min(speaker_end, end_time):
65
- aligned_text.append(f"[{speaker}] [{start_time}-{end_time}] {text}")
66
- speaker_found = True
67
- last_speaker = speaker
68
- break
69
-
70
- # If no speaker found, use the last speaker
71
- if not speaker_found:
72
- if last_speaker is not None:
73
- aligned_text.append(f"[{last_speaker}] [{start_time}-{end_time}] {text}")
74
- else:
75
- aligned_text.append(f"[Unknown Speaker] [{start_time}-{end_time}] {text}")
76
-
77
- # Save aligned text to a single file
78
- aligned_text_output_path = os.path.join(output_dir, "aligned_text.txt")
79
- with open(aligned_text_output_path, "w") as aligned_text_file:
80
- aligned_text_file.write('\n'.join(aligned_text))
81
- print(f"Aligned text saved in {aligned_text_output_path}")
82
-
83
- # The rest of your script, if any
 
1
+ from pyannote.audio import Pipeline
2
+ from pydub import AudioSegment
3
+ import os
4
+ import re
5
+ import torch
6
+
7
+ def perform_diarization(audio_file_path, translated_file_path, output_dir='./audio/diarization'):
8
+
9
+ # Initialize diarization pipeline
10
+ accesstoken = os.environ['Diarization']
11
+ pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1", use_auth_token=accesstoken )
12
+
13
+ # Send pipeline to GPU (when available)
14
+ pipeline.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
15
+
16
+ # Load audio file
17
+ audio = AudioSegment.from_wav(audio_file_path)
18
+
19
+ # Apply pretrained pipeline
20
+ diarization = pipeline(audio_file_path)
21
+
22
+ os.makedirs(output_dir, exist_ok=True)
23
+
24
+ # Process and save each speaker's audio segments
25
+ speaker_segments_audio = {}
26
+ for turn, _, speaker in diarization.itertracks(yield_label=True):
27
+ start_ms = int(turn.start * 1000) # Convert to milliseconds
28
+ end_ms = int(turn.end * 1000) # Convert to milliseconds
29
+ segment = audio[start_ms:end_ms]
30
+
31
+ if speaker in speaker_segments_audio:
32
+ speaker_segments_audio[speaker] += segment
33
+ else:
34
+ speaker_segments_audio[speaker] = segment
35
+
36
+ # Save audio segments
37
+ for speaker, segment in speaker_segments_audio.items():
38
+ output_path = os.path.join(output_dir, f"{speaker}.wav")
39
+ segment.export(output_path, format="wav")
40
+ print(f"Combined audio for speaker {speaker} saved in {output_path}")
41
+
42
+ # Load translated text
43
+ with open(translated_file_path, "r") as file:
44
+ translated_lines = file.readlines()
45
+
46
+ # Process and align translated text with diarization data
47
+ last_speaker = None
48
+ aligned_text = []
49
+ timestamp_pattern = re.compile(r'\[(\d+\.\d+)\-(\d+\.\d+)\]')
50
+ for line in translated_lines:
51
+ match = timestamp_pattern.match(line)
52
+
53
+ if match:
54
+ start_time = float(match.group(1))
55
+ end_time = float(match.group(2))
56
+ text = line[match.end():].strip() # Extract text part
57
+
58
+ speaker_found = False
59
+ # Find corresponding speaker
60
+ for turn, _, speaker in diarization.itertracks(yield_label=True):
61
+ speaker_start = turn.start
62
+ speaker_end = turn.end
63
+ # Check for overlap between speaker segment and line timestamp
64
+ if max(speaker_start, start_time) < min(speaker_end, end_time):
65
+ aligned_text.append(f"[{speaker}] [{start_time}-{end_time}] {text}")
66
+ speaker_found = True
67
+ last_speaker = speaker
68
+ break
69
+
70
+ # If no speaker found, use the last speaker
71
+ if not speaker_found:
72
+ if last_speaker is not None:
73
+ aligned_text.append(f"[{last_speaker}] [{start_time}-{end_time}] {text}")
74
+ else:
75
+ aligned_text.append(f"[Unknown Speaker] [{start_time}-{end_time}] {text}")
76
+
77
+ # Save aligned text to a single file
78
+ aligned_text_output_path = os.path.join(output_dir, "aligned_text.txt")
79
+ with open(aligned_text_output_path, "w") as aligned_text_file:
80
+ aligned_text_file.write('\n'.join(aligned_text))
81
+ print(f"Aligned text saved in {aligned_text_output_path}")