import subprocess import time import gradio as gr import librosa import pytube as pt from models import asr, processor from utils import format_timestamp from vad import SpeechTimestampsMap, collect_chunks, get_speech_timestamps ## details: https://huggingface.co/docs/diffusers/optimization/fp16#automatic-mixed-precision-amp # from torch import autocast apply_vad = True vad_parameters = {} # task = "transcribe" # transcribe or translate # language = "bn" # asr.model.config.forced_decoder_ids = processor.get_decoder_prompt_ids(language=language, task=task) # asr.model.config.max_new_tokens = 448 #default is 448 def _preprocess(filename): audio_name = "audio.wav" subprocess.call( [ "ffmpeg", "-y", "-i", filename, "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-loglevel", "quiet", audio_name, ] ) return audio_name def transcribe(microphone, file_upload): warn_output = "" if (microphone is not None) and (file_upload is not None): warn_output = ( "WARNING: You've uploaded an audio file and used the microphone. " "The recorded file from the microphone will be used and the uploaded audio will be discarded.\n" ) elif (microphone is None) and (file_upload is None): return "ERROR: You have to either use the microphone or upload an audio file" file = microphone if microphone is not None else file_upload print(f"\n\nFile is: {file}\n\n") # for _preprocess(). No need if name of file provided in string format to asr pipeline as automatically uses ffmeg. # Only required if ndarray given by using librosa.load() to load a file start_time = time.time() print("Starting Preprocessing") # speech_array = _preprocess(filename=file) filename = _preprocess(filename=file) speech_array, sample_rate = librosa.load(f"{filename}", sr=16_000) if apply_vad: duration = speech_array.shape[0] / sample_rate print(f"Processing audio with duration: {format_timestamp(duration)}") speech_chunks = get_speech_timestamps(speech_array, **vad_parameters) speech_array = collect_chunks(speech_array, speech_chunks) print(f"VAD filter removed {format_timestamp(duration - (speech_array.shape[0] / sample_rate))}") remaining_segments = ", ".join( f'[{format_timestamp(chunk["start"] / sample_rate)} -> {format_timestamp(chunk["end"] / sample_rate)}]' for chunk in speech_chunks ) print(f"VAD filter kept the following audio segments: {remaining_segments}") if not remaining_segments: return "ERROR: No speech detected in the audio file" print(f"\n Preprocessing COMPLETED in {round(time.time()-start_time, 2)}s \n") start_time = time.time() print("Starting Inference") text = asr(speech_array)["text"] # text = asr(file)["text"] # with autocast("cuda"): # text = asr(speech_array)["text"] print(f"\n Inference COMPLETED in {round(time.time()-start_time, 2)}s \n") return warn_output + text def _return_yt_html_embed(yt_url): if "?v=" in yt_url: video_id = yt_url.split("?v=")[-1].split("&")[0] else: video_id = yt_url.split("/")[-1].split("?feature=")[0] print(f"\n\nYT ID is: {video_id}\n\n") return f'