QLWD commited on
Commit
d579519
1 Parent(s): e4591db

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +49 -40
app.py CHANGED
@@ -70,25 +70,10 @@ def diarize_audio(temp_file):
70
  # 返回 diarization 类对象
71
  return diarization
72
 
73
- # 将时间戳转换为秒
74
- def timestamp_to_seconds(timestamp):
75
- try:
76
- h, m, s = map(float, timestamp.split(':'))
77
- return 3600 * h + 60 * m + s
78
- except ValueError as e:
79
- print(f"转换时间戳时出错: '{timestamp}'. 错误: {e}")
80
- return None
81
-
82
- # 计算时间段的重叠部分(单位:秒)
83
- def calculate_overlap(start1, end1, start2, end2):
84
- overlap_start = max(start1, start2)
85
- overlap_end = min(end1, end2)
86
- overlap_duration = max(0, overlap_end - overlap_start)
87
- return overlap_duration
88
-
89
- # 获取所有说话人时间段(排除目标录音时间段)
90
- def get_all_speaker_segments(diarization_output, target_start_time, target_end_time, final_audio_length):
91
- speaker_segments = {}
92
 
93
  # 使用 itertracks 获取每个说话人的信息
94
  for speech_turn in diarization_output.itertracks(yield_label=True):
@@ -96,19 +81,34 @@ def get_all_speaker_segments(diarization_output, target_start_time, target_end_t
96
  end_seconds = speech_turn[0].end
97
  label = speech_turn[1]
98
 
99
- # 检查时间段是否与目标录音重叠
100
- if start_seconds < target_end_time and end_seconds > target_start_time:
101
- # 如果时间段与目标音频有重叠,调整结束时间
102
- end_seconds = min(end_seconds, final_audio_length)
103
-
104
- # 存储说话人的时间段
105
- if label not in speaker_segments:
106
- speaker_segments[label] = []
107
 
108
- # 添加处理后的时间段
109
- speaker_segments[label].append((start_seconds, end_seconds))
110
-
111
- return speaker_segments
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
112
 
113
  # 处理音频文件并返回输出
114
  def process_audio(target_audio, mixed_audio):
@@ -118,29 +118,38 @@ def process_audio(target_audio, mixed_audio):
118
  # 进行音频拼接并返回目标音频的起始和结束时间(作为字典)
119
  time_dict = combine_audio_with_time(target_audio, mixed_audio)
120
 
 
 
 
 
121
  # 执行说话人分离
122
  diarization_result = diarize_audio("final_output.wav")
123
 
124
  if isinstance(diarization_result, str) and diarization_result.startswith("错误"):
125
- return diarization_result, None # 出错时返回错误信息
126
  else:
127
  # 获取拼接后的音频长度
128
  final_audio_length = len(AudioSegment.from_wav("final_output.wav")) / 1000 # 秒为单位
129
 
130
- # 获取所有说话人的时间段
131
- speaker_segments = get_all_speaker_segments(diarization_result, time_dict['start_time'], time_dict['end_time'], final_audio_length)
 
 
 
 
 
132
 
133
- if speaker_segments:
134
- # 返回所有说话人的时间段
135
- return speaker_segments
136
  else:
137
- return "没有找到任何说话人的时间段。"
138
 
139
  # Gradio 接口
140
  with gr.Blocks() as demo:
141
  gr.Markdown("""
142
  # 🗣️ 音频拼接与说话人分类 🗣️
143
- 上传目标音频和混合音频,拼接并进行说话人分类。结果包括所有说话人的时间段(排除目标录音时间段)。
144
  """)
145
 
146
  mixed_audio_input = gr.Audio(type="filepath", label="上传混合音频")
@@ -158,4 +167,4 @@ with gr.Blocks() as demo:
158
  outputs=[diarization_output]
159
  )
160
 
161
- demo.launch(share=True)
 
70
  # 返回 diarization 类对象
71
  return diarization
72
 
73
+ # 获取目标录音所在时间范围最大的说话人及其时间段
74
+ def get_most_matched_speaker_segments(diarization_output, target_start_time, target_end_time, final_audio_length):
75
+ # 用于存储说话人与目标音频重叠时间的字典
76
+ speaker_overlaps = {}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
77
 
78
  # 使用 itertracks 获取每个说话人的信息
79
  for speech_turn in diarization_output.itertracks(yield_label=True):
 
81
  end_seconds = speech_turn[0].end
82
  label = speech_turn[1]
83
 
84
+ # 计算目标音频与当前说话人时间段的重叠时间
85
+ overlap_start = max(start_seconds, target_start_time)
86
+ overlap_end = min(end_seconds, target_end_time)
87
+ overlap_duration = max(0, overlap_end - overlap_start)
 
 
 
 
88
 
89
+ # 如果有重叠,记录重叠时间
90
+ if overlap_duration > 0:
91
+ if label not in speaker_overlaps:
92
+ speaker_overlaps[label] = {
93
+ 'total_overlap': overlap_duration,
94
+ 'segments': []
95
+ }
96
+ else:
97
+ speaker_overlaps[label]['total_overlap'] += overlap_duration
98
+
99
+ # 记录该说话人的原始时间段(排除目标音频时间段)
100
+ if start_seconds < target_start_time:
101
+ speaker_overlaps[label]['segments'].append((start_seconds, min(end_seconds, target_start_time)))
102
+
103
+ if end_seconds > target_end_time:
104
+ speaker_overlaps[label]['segments'].append((max(start_seconds, target_end_time), end_seconds))
105
+
106
+ # 找到重叠时间最长的说话人
107
+ if speaker_overlaps:
108
+ most_matched_speaker = max(speaker_overlaps, key=lambda k: speaker_overlaps[k]['total_overlap'])
109
+ return {most_matched_speaker: speaker_overlaps[most_matched_speaker]['segments']}
110
+
111
+ return {}
112
 
113
  # 处理音频文件并返回输出
114
  def process_audio(target_audio, mixed_audio):
 
118
  # 进行音频拼接并返回目标音频的起始和结束时间(作为字典)
119
  time_dict = combine_audio_with_time(target_audio, mixed_audio)
120
 
121
+ # 如果音频拼接出错,返回错误信息
122
+ if isinstance(time_dict, str):
123
+ return time_dict
124
+
125
  # 执行说话人分离
126
  diarization_result = diarize_audio("final_output.wav")
127
 
128
  if isinstance(diarization_result, str) and diarization_result.startswith("错误"):
129
+ return diarization_result # 出错时返回错误信息
130
  else:
131
  # 获取拼接后的音频长度
132
  final_audio_length = len(AudioSegment.from_wav("final_output.wav")) / 1000 # 秒为单位
133
 
134
+ # 获取目标录音所在时间范围最大的说话人时间段
135
+ most_matched_speaker_segments = get_most_matched_speaker_segments(
136
+ diarization_result,
137
+ time_dict['start_time'],
138
+ time_dict['end_time'],
139
+ final_audio_length
140
+ )
141
 
142
+ if most_matched_speaker_segments:
143
+ # 返回目标录音所在时间范围最大的说话人的时间段(排除目标音频时间段)
144
+ return most_matched_speaker_segments
145
  else:
146
+ return "没有找到与目标录音重叠的说话人时间段。"
147
 
148
  # Gradio 接口
149
  with gr.Blocks() as demo:
150
  gr.Markdown("""
151
  # 🗣️ 音频拼接与说话人分类 🗣️
152
+ 上传目标音频和混合音频,拼接并进行说话人分类。结果包括与目标录音重叠时间最长的说话人的时间段(排除目标录音时间段)。
153
  """)
154
 
155
  mixed_audio_input = gr.Audio(type="filepath", label="上传混合音频")
 
167
  outputs=[diarization_output]
168
  )
169
 
170
+ demo.launch(share=True)