Spaces:
Sleeping
Sleeping
import subprocess | |
import time | |
import gradio as gr | |
import librosa | |
import pytube as pt | |
from models import asr, processor | |
from utils import format_timestamp | |
from vad import SpeechTimestampsMap, collect_chunks, get_speech_timestamps | |
## details: https://huggingface.co/docs/diffusers/optimization/fp16#automatic-mixed-precision-amp | |
# from torch import autocast | |
apply_vad = True | |
vad_parameters = {} | |
# task = "transcribe" # transcribe or translate | |
# language = "bn" | |
# asr.model.config.forced_decoder_ids = processor.get_decoder_prompt_ids(language=language, task=task) | |
# asr.model.config.max_new_tokens = 448 #default is 448 | |
def _preprocess(filename): | |
audio_name = "audio.wav" | |
subprocess.call( | |
[ | |
"ffmpeg", | |
"-y", | |
"-i", | |
filename, | |
"-acodec", | |
"pcm_s16le", | |
"-ar", | |
"16000", | |
"-ac", | |
"1", | |
"-loglevel", | |
"quiet", | |
audio_name, | |
] | |
) | |
return audio_name | |
def transcribe(microphone, file_upload): | |
warn_output = "" | |
if (microphone is not None) and (file_upload is not None): | |
warn_output = ( | |
"WARNING: You've uploaded an audio file and used the microphone. " | |
"The recorded file from the microphone will be used and the uploaded audio will be discarded.\n" | |
) | |
elif (microphone is None) and (file_upload is None): | |
return "ERROR: You have to either use the microphone or upload an audio file" | |
file = microphone if microphone is not None else file_upload | |
print(f"\n\nFile is: {file}\n\n") | |
# for _preprocess(). No need if name of file provided in string format to asr pipeline as automatically uses ffmeg. | |
# Only required if ndarray given by using librosa.load() to load a file | |
start_time = time.time() | |
print("Starting Preprocessing") | |
# speech_array = _preprocess(filename=file) | |
filename = _preprocess(filename=file) | |
speech_array, sample_rate = librosa.load(f"{filename}", sr=16_000) | |
if apply_vad: | |
duration = speech_array.shape[0] / sample_rate | |
print(f"Processing audio with duration: {format_timestamp(duration)}") | |
speech_chunks = get_speech_timestamps(speech_array, **vad_parameters) | |
speech_array = collect_chunks(speech_array, speech_chunks) | |
print(f"VAD filter removed {format_timestamp(duration - (speech_array.shape[0] / sample_rate))}") | |
remaining_segments = ", ".join( | |
f'[{format_timestamp(chunk["start"] / sample_rate)} -> {format_timestamp(chunk["end"] / sample_rate)}]' | |
for chunk in speech_chunks | |
) | |
print(f"VAD filter kept the following audio segments: {remaining_segments}") | |
if not remaining_segments: | |
return "ERROR: No speech detected in the audio file" | |
print(f"\n Preprocessing COMPLETED in {round(time.time()-start_time, 2)}s \n") | |
start_time = time.time() | |
print("Starting Inference") | |
text = asr(speech_array)["text"] | |
# text = asr(file)["text"] | |
# with autocast("cuda"): | |
# text = asr(speech_array)["text"] | |
print(f"\n Inference COMPLETED in {round(time.time()-start_time, 2)}s \n") | |
return warn_output + text | |
def _return_yt_html_embed(yt_url): | |
if "?v=" in yt_url: | |
video_id = yt_url.split("?v=")[-1].split("&")[0] | |
else: | |
video_id = yt_url.split("/")[-1].split("?feature=")[0] | |
print(f"\n\nYT ID is: {video_id}\n\n") | |
return f'<center><iframe width="500" height="320" src="https://www.youtube.com/embed/{video_id}"> </iframe> </center>' | |
def yt_transcribe(yt_url): | |
start_time = time.time() | |
yt = pt.YouTube(yt_url) | |
html_embed_str = _return_yt_html_embed(yt_url) | |
stream = yt.streams.filter(only_audio=True)[0] | |
filename = "audio.mp3" | |
stream.download(filename=filename) | |
print(f"\n YT Audio Downloaded in {round(time.time()-start_time, 2)}s \n") | |
# for _preprocess(). No need if name of file provided in string format to asr pipeline as automatically uses ffmeg. | |
# Only required if ndarray given by using librosa.load() to load a file | |
start_time = time.time() | |
# print("Starting Preprocessing") | |
# speech_array = _preprocess(filename=filename) | |
# filename = _preprocess(filename=filename) | |
# speech_array, sample_rate = librosa.load(f"{filename}", sr=16_000) | |
# print(f"\n Preprocessing COMPLETED in {round(time.time()-start_time, 2)}s \n") | |
start_time = time.time() | |
print("Starting Inference") | |
text = asr(filename)["text"] | |
# with autocast("cuda"): | |
# text = asr(speech_array)["text"] | |
print(f"\n Inference COMPLETED in {round(time.time()-start_time, 2)}s \n") | |
return html_embed_str, text | |
mf_transcribe = gr.Interface( | |
fn=transcribe, | |
inputs=[ | |
gr.Audio(source="microphone", type="filepath", label="Microphone"), | |
gr.Audio(source="upload", type="filepath", label="Upload File"), | |
], | |
outputs="text", | |
title="Bangla Demo: Transcribe Audio", | |
description=( | |
"Transcribe long-form microphone or audio inputs in BANGLA with the click of a button!" | |
), | |
allow_flagging="never", | |
) | |
yt_transcribe = gr.Interface( | |
fn=yt_transcribe, | |
inputs=[ | |
gr.Textbox( | |
lines=1, | |
placeholder="Paste the URL to a Bangla language YouTube video here", | |
label="YouTube URL", | |
) | |
], | |
outputs=["html", "text"], | |
title="Bangla Demo: Transcribe YouTube", | |
description=( | |
"Transcribe long-form YouTube videos in BANGLA with the click of a button!" | |
), | |
allow_flagging="never", | |
) | |
# def transcribe2(audio, state=""): | |
# text = "text" | |
# state += text + " " | |
# return state, state | |
# Set the starting state to an empty string | |
# real_transcribe = gr.Interface( | |
# fn=transcribe2, | |
# inputs=[ | |
# gr.Audio(source="microphone", type="filepath", streaming=True), | |
# "state" | |
# ], | |
# outputs=[ | |
# "textbox", | |
# "state" | |
# ], | |
# live=True) | |
# demo = gr.TabbedInterface([mf_transcribe, yt_transcribe,real_transcribe], ["Transcribe Bangla Audio", "Transcribe Bangla YouTube Video","real time"]) | |
demo = gr.TabbedInterface( | |
[mf_transcribe, yt_transcribe], | |
["Transcribe Bangla Audio", "Transcribe Bangla YouTube Video"], | |
) | |
if __name__ == "__main__": | |
demo.queue() | |
demo.launch(share="True") | |
# demo.launch(share='True', server_name="0.0.0.0", server_port=8080) | |