import subprocess import time import gradio as gr import librosa import pytube as pt from models import asr, processor from utils import format_timestamp from vad import SpeechTimestampsMap, collect_chunks, get_speech_timestamps ## details: https://huggingface.co/docs/diffusers/optimization/fp16#automatic-mixed-precision-amp # from torch import autocast apply_vad = True vad_parameters = {} # task = "transcribe" # transcribe or translate # language = "bn" # asr.model.config.forced_decoder_ids = processor.get_decoder_prompt_ids(language=language, task=task) # asr.model.config.max_new_tokens = 448 #default is 448 def _preprocess(filename): audio_name = "audio.wav" subprocess.call( [ "ffmpeg", "-y", "-i", filename, "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-loglevel", "quiet", audio_name, ] ) return audio_name def transcribe(microphone, file_upload): warn_output = "" if (microphone is not None) and (file_upload is not None): warn_output = ( "WARNING: You've uploaded an audio file and used the microphone. " "The recorded file from the microphone will be used and the uploaded audio will be discarded.\n" ) elif (microphone is None) and (file_upload is None): return "ERROR: You have to either use the microphone or upload an audio file" file = microphone if microphone is not None else file_upload print(f"\n\nFile is: {file}\n\n") # for _preprocess(). No need if name of file provided in string format to asr pipeline as automatically uses ffmeg. # Only required if ndarray given by using librosa.load() to load a file start_time = time.time() print("Starting Preprocessing") # speech_array = _preprocess(filename=file) filename = _preprocess(filename=file) speech_array, sample_rate = librosa.load(f"{filename}", sr=16_000) if apply_vad: duration = speech_array.shape[0] / sample_rate print(f"Processing audio with duration: {format_timestamp(duration)}") speech_chunks = get_speech_timestamps(speech_array, **vad_parameters) speech_array = collect_chunks(speech_array, speech_chunks) print(f"VAD filter removed {format_timestamp(duration - (speech_array.shape[0] / sample_rate))}") remaining_segments = ", ".join( f'[{format_timestamp(chunk["start"] / sample_rate)} -> {format_timestamp(chunk["end"] / sample_rate)}]' for chunk in speech_chunks ) print(f"VAD filter kept the following audio segments: {remaining_segments}") if not remaining_segments: return "ERROR: No speech detected in the audio file" print(f"\n Preprocessing COMPLETED in {round(time.time()-start_time, 2)}s \n") start_time = time.time() print("Starting Inference") text = asr(speech_array)["text"] # text = asr(file)["text"] # with autocast("cuda"): # text = asr(speech_array)["text"] print(f"\n Inference COMPLETED in {round(time.time()-start_time, 2)}s \n") return warn_output + text def _return_yt_html_embed(yt_url): if "?v=" in yt_url: video_id = yt_url.split("?v=")[-1].split("&")[0] else: video_id = yt_url.split("/")[-1].split("?feature=")[0] print(f"\n\nYT ID is: {video_id}\n\n") return f'
' def yt_transcribe(yt_url): start_time = time.time() yt = pt.YouTube(yt_url) html_embed_str = _return_yt_html_embed(yt_url) stream = yt.streams.filter(only_audio=True)[0] filename = "audio.mp3" stream.download(filename=filename) print(f"\n YT Audio Downloaded in {round(time.time()-start_time, 2)}s \n") # for _preprocess(). No need if name of file provided in string format to asr pipeline as automatically uses ffmeg. # Only required if ndarray given by using librosa.load() to load a file start_time = time.time() # print("Starting Preprocessing") # speech_array = _preprocess(filename=filename) # filename = _preprocess(filename=filename) # speech_array, sample_rate = librosa.load(f"{filename}", sr=16_000) # print(f"\n Preprocessing COMPLETED in {round(time.time()-start_time, 2)}s \n") start_time = time.time() print("Starting Inference") text = asr(filename)["text"] # with autocast("cuda"): # text = asr(speech_array)["text"] print(f"\n Inference COMPLETED in {round(time.time()-start_time, 2)}s \n") return html_embed_str, text mf_transcribe = gr.Interface( fn=transcribe, inputs=[ gr.Audio(source="microphone", type="filepath", label="Microphone"), gr.Audio(source="upload", type="filepath", label="Upload File"), ], outputs="text", title="Bangla Demo: Transcribe Audio", description=( "Transcribe long-form microphone or audio inputs in BANGLA with the click of a button!" ), allow_flagging="never", ) yt_transcribe = gr.Interface( fn=yt_transcribe, inputs=[ gr.Textbox( lines=1, placeholder="Paste the URL to a Bangla language YouTube video here", label="YouTube URL", ) ], outputs=["html", "text"], title="Bangla Demo: Transcribe YouTube", description=( "Transcribe long-form YouTube videos in BANGLA with the click of a button!" ), allow_flagging="never", ) # def transcribe2(audio, state=""): # text = "text" # state += text + " " # return state, state # Set the starting state to an empty string # real_transcribe = gr.Interface( # fn=transcribe2, # inputs=[ # gr.Audio(source="microphone", type="filepath", streaming=True), # "state" # ], # outputs=[ # "textbox", # "state" # ], # live=True) # demo = gr.TabbedInterface([mf_transcribe, yt_transcribe,real_transcribe], ["Transcribe Bangla Audio", "Transcribe Bangla YouTube Video","real time"]) demo = gr.TabbedInterface( [mf_transcribe, yt_transcribe], ["Transcribe Bangla Audio", "Transcribe Bangla YouTube Video"], ) if __name__ == "__main__": demo.queue() # demo.launch(share="True") demo.launch() # demo.launch(share='True', server_name="0.0.0.0", server_port=8080)