Bangla_ASR_Demo / app.py
Rakib's picture
Update app.py
7806dd1
import subprocess
import time
import gradio as gr
import librosa
import pytube as pt
from models import asr, processor
from utils import format_timestamp
from vad import SpeechTimestampsMap, collect_chunks, get_speech_timestamps
## details: https://huggingface.co/docs/diffusers/optimization/fp16#automatic-mixed-precision-amp
# from torch import autocast
apply_vad = True
vad_parameters = {}
# task = "transcribe" # transcribe or translate
# language = "bn"
# asr.model.config.forced_decoder_ids = processor.get_decoder_prompt_ids(language=language, task=task)
# asr.model.config.max_new_tokens = 448 #default is 448
def _preprocess(filename):
audio_name = "audio.wav"
subprocess.call(
[
"ffmpeg",
"-y",
"-i",
filename,
"-acodec",
"pcm_s16le",
"-ar",
"16000",
"-ac",
"1",
"-loglevel",
"quiet",
audio_name,
]
)
return audio_name
def transcribe(microphone, file_upload):
warn_output = ""
if (microphone is not None) and (file_upload is not None):
warn_output = (
"WARNING: You've uploaded an audio file and used the microphone. "
"The recorded file from the microphone will be used and the uploaded audio will be discarded.\n"
)
elif (microphone is None) and (file_upload is None):
return "ERROR: You have to either use the microphone or upload an audio file"
file = microphone if microphone is not None else file_upload
print(f"\n\nFile is: {file}\n\n")
# for _preprocess(). No need if name of file provided in string format to asr pipeline as automatically uses ffmeg.
# Only required if ndarray given by using librosa.load() to load a file
start_time = time.time()
print("Starting Preprocessing")
# speech_array = _preprocess(filename=file)
filename = _preprocess(filename=file)
speech_array, sample_rate = librosa.load(f"{filename}", sr=16_000)
if apply_vad:
duration = speech_array.shape[0] / sample_rate
print(f"Processing audio with duration: {format_timestamp(duration)}")
speech_chunks = get_speech_timestamps(speech_array, **vad_parameters)
speech_array = collect_chunks(speech_array, speech_chunks)
print(f"VAD filter removed {format_timestamp(duration - (speech_array.shape[0] / sample_rate))}")
remaining_segments = ", ".join(
f'[{format_timestamp(chunk["start"] / sample_rate)} -> {format_timestamp(chunk["end"] / sample_rate)}]'
for chunk in speech_chunks
)
print(f"VAD filter kept the following audio segments: {remaining_segments}")
if not remaining_segments:
return "ERROR: No speech detected in the audio file"
print(f"\n Preprocessing COMPLETED in {round(time.time()-start_time, 2)}s \n")
start_time = time.time()
print("Starting Inference")
text = asr(speech_array)["text"]
# text = asr(file)["text"]
# with autocast("cuda"):
# text = asr(speech_array)["text"]
print(f"\n Inference COMPLETED in {round(time.time()-start_time, 2)}s \n")
return warn_output + text
def _return_yt_html_embed(yt_url):
if "?v=" in yt_url:
video_id = yt_url.split("?v=")[-1].split("&")[0]
else:
video_id = yt_url.split("/")[-1].split("?feature=")[0]
print(f"\n\nYT ID is: {video_id}\n\n")
return f'<center><iframe width="500" height="320" src="https://www.youtube.com/embed/{video_id}"> </iframe> </center>'
def yt_transcribe(yt_url):
start_time = time.time()
yt = pt.YouTube(yt_url)
html_embed_str = _return_yt_html_embed(yt_url)
stream = yt.streams.filter(only_audio=True)[0]
filename = "audio.mp3"
stream.download(filename=filename)
print(f"\n YT Audio Downloaded in {round(time.time()-start_time, 2)}s \n")
# for _preprocess(). No need if name of file provided in string format to asr pipeline as automatically uses ffmeg.
# Only required if ndarray given by using librosa.load() to load a file
start_time = time.time()
# print("Starting Preprocessing")
# speech_array = _preprocess(filename=filename)
# filename = _preprocess(filename=filename)
# speech_array, sample_rate = librosa.load(f"{filename}", sr=16_000)
# print(f"\n Preprocessing COMPLETED in {round(time.time()-start_time, 2)}s \n")
start_time = time.time()
print("Starting Inference")
text = asr(filename)["text"]
# with autocast("cuda"):
# text = asr(speech_array)["text"]
print(f"\n Inference COMPLETED in {round(time.time()-start_time, 2)}s \n")
return html_embed_str, text
mf_transcribe = gr.Interface(
fn=transcribe,
inputs=[
gr.Audio(source="microphone", type="filepath", label="Microphone"),
gr.Audio(source="upload", type="filepath", label="Upload File"),
],
outputs="text",
title="Bangla Demo: Transcribe Audio",
description=(
"Transcribe long-form microphone or audio inputs in BANGLA with the click of a button!"
),
allow_flagging="never",
)
yt_transcribe = gr.Interface(
fn=yt_transcribe,
inputs=[
gr.Textbox(
lines=1,
placeholder="Paste the URL to a Bangla language YouTube video here",
label="YouTube URL",
)
],
outputs=["html", "text"],
title="Bangla Demo: Transcribe YouTube",
description=(
"Transcribe long-form YouTube videos in BANGLA with the click of a button!"
),
allow_flagging="never",
)
# def transcribe2(audio, state=""):
# text = "text"
# state += text + " "
# return state, state
# Set the starting state to an empty string
# real_transcribe = gr.Interface(
# fn=transcribe2,
# inputs=[
# gr.Audio(source="microphone", type="filepath", streaming=True),
# "state"
# ],
# outputs=[
# "textbox",
# "state"
# ],
# live=True)
# demo = gr.TabbedInterface([mf_transcribe, yt_transcribe,real_transcribe], ["Transcribe Bangla Audio", "Transcribe Bangla YouTube Video","real time"])
demo = gr.TabbedInterface(
[mf_transcribe, yt_transcribe],
["Transcribe Bangla Audio", "Transcribe Bangla YouTube Video"],
)
if __name__ == "__main__":
demo.queue()
# demo.launch(share="True")
demo.launch()
# demo.launch(share='True', server_name="0.0.0.0", server_port=8080)