|
import gradio as gr |
|
import numpy as np |
|
import os, time, librosa, torch |
|
from pyannote.audio import Pipeline |
|
from transformers import pipeline |
|
from utils import second_to_timecode, download_from_youtube |
|
|
|
MODEL_NAME = 'openai/whisper-medium' |
|
lang = 'en' |
|
|
|
chunk_length_s = 9 |
|
vad_activation_min_duration = 9 |
|
device = 0 if torch.cuda.is_available() else "cpu" |
|
SAMPLE_RATE = 16_000 |
|
|
|
|
|
dia_model = Pipeline.from_pretrained("pyannote/speaker-diarization", use_auth_token="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") |
|
vad_model = Pipeline.from_pretrained("pyannote/voice-activity-detection", use_auth_token="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") |
|
pipe = pipeline(task="automatic-speech-recognition", model=MODEL_NAME, chunk_length_s=chunk_length_s, device=device) |
|
pipe.model.config.forced_decoder_ids = pipe.tokenizer.get_decoder_prompt_ids(language=lang, task="transcribe") |
|
|
|
print("----------> Loaded models <-----------") |
|
|
|
def generator(youtube_link, microphone, file_upload, num_speakers, max_duration, history): |
|
|
|
if int(youtube_link != '') + int(microphone is not None) + int(file_upload is not None) != 1: |
|
raise Exception(f"Only one of the source should be given youtube_link={youtube_link}, microphone={microphone}, file_upload={file_upload}") |
|
|
|
history = history or "" |
|
|
|
if microphone: |
|
path = microphone |
|
elif file_upload: |
|
path = file_upload |
|
elif youtube_link: |
|
path = download_from_youtube(youtube_link) |
|
|
|
waveform, sampling_rate = librosa.load(path, sr=SAMPLE_RATE, mono=True, duration=max_duration) |
|
|
|
print(waveform.shape, sampling_rate) |
|
waveform_tensor = torch.unsqueeze(torch.tensor(waveform), 0).to(device) |
|
|
|
dia_result = dia_model({ |
|
"waveform": waveform_tensor, |
|
"sample_rate": sampling_rate, |
|
}, num_speakers=num_speakers) |
|
|
|
for speech_turn, track, speaker in dia_result.itertracks(yield_label=True): |
|
print(f"{speech_turn.start:4.1f} {speech_turn.end:4.1f} {speaker}") |
|
_start = int(sampling_rate * speech_turn.start) |
|
_end = int(sampling_rate * speech_turn.end) |
|
data = waveform[_start: _end] |
|
|
|
if speech_turn.end - speech_turn.start > vad_activation_min_duration: |
|
print(f'audio duration {speech_turn.end - speech_turn.start} sec ----> activating VAD') |
|
vad_output = vad_model({ |
|
'waveform': waveform_tensor[:, _start:_end], |
|
'sample_rate': sampling_rate}) |
|
for vad_turn in vad_output.get_timeline().support(): |
|
vad_start = _start + int(sampling_rate * vad_turn.start) |
|
vad_end = _start + int(sampling_rate * vad_turn.end) |
|
prediction = pipe(waveform[vad_start: vad_end])['text'] |
|
history += f"{second_to_timecode(speech_turn.start + vad_turn.start)},{second_to_timecode(speech_turn.start + vad_turn.end)}\n" + \ |
|
f"{prediction}\n\n" |
|
|
|
yield history, history, None |
|
|
|
else: |
|
prediction = pipe(data)['text'] |
|
history += f"{second_to_timecode(speech_turn.start)},{second_to_timecode(speech_turn.end)}\n" + \ |
|
f"{prediction}\n\n" |
|
|
|
|
|
yield history, history, None |
|
|
|
|
|
file_name = 'transcript.sbv' |
|
with open(file_name, 'w') as fp: |
|
fp.write(history) |
|
|
|
yield history, history, file_name |
|
|
|
demo = gr.Interface( |
|
generator, |
|
inputs=[ |
|
gr.inputs.Textbox(lines=1, placeholder="Paste the URL to a YouTube video here", label="YouTube URL", optional=True), |
|
gr.inputs.Audio(source="microphone", type="filepath", optional=True), |
|
gr.inputs.Audio(source="upload", type="filepath", optional=True), |
|
gr.Number(value=1, label="Number of Speakers"), |
|
gr.Number(value=120, label="Maximum Duration (Seconds)"), |
|
'state', |
|
], |
|
outputs=['text', 'state', 'file'], |
|
layout="horizontal", |
|
theme="huggingface", |
|
allow_flagging="never", |
|
) |
|
|
|
|
|
demo.queue() |
|
|
|
demo.launch() |