|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import logging |
|
import os |
|
import time |
|
from datetime import datetime |
|
|
|
import gradio as gr |
|
import torchaudio |
|
|
|
from model import get_pretrained_model, language_to_models, sample_rate |
|
|
|
languages = sorted(language_to_models.keys()) |
|
|
|
|
|
def convert_to_wav(in_filename: str) -> str: |
|
"""Convert the input audio file to a wave file""" |
|
out_filename = in_filename + ".wav" |
|
logging.info(f"Converting '{in_filename}' to '{out_filename}'") |
|
_ = os.system(f"ffmpeg -hide_banner -i '{in_filename}' '{out_filename}'") |
|
return out_filename |
|
|
|
|
|
def process( |
|
in_filename: str, |
|
language: str, |
|
repo_id: str, |
|
decoding_method: str, |
|
num_active_paths: int, |
|
) -> str: |
|
logging.info(f"in_filename: {in_filename}") |
|
logging.info(f"language: {language}") |
|
logging.info(f"repo_id: {repo_id}") |
|
logging.info(f"decoding_method: {decoding_method}") |
|
logging.info(f"num_active_paths: {num_active_paths}") |
|
|
|
filename = convert_to_wav(in_filename) |
|
|
|
now = datetime.now() |
|
date_time = now.strftime("%Y-%m-%d %H:%M:%S.%f") |
|
logging.info(f"Started at {date_time}") |
|
|
|
start = time.time() |
|
wave, wave_sample_rate = torchaudio.load(filename) |
|
|
|
if wave_sample_rate != sample_rate: |
|
logging.info( |
|
f"Expected sample rate: {sample_rate}. Given: {wave_sample_rate}. " |
|
f"Resampling to {sample_rate}." |
|
) |
|
|
|
wave = torchaudio.functional.resample( |
|
wave, |
|
orig_freq=wave_sample_rate, |
|
new_freq=sample_rate, |
|
) |
|
wave = wave[0] |
|
|
|
hyp = get_pretrained_model(repo_id).decode_waves( |
|
[wave], |
|
decoding_method=decoding_method, |
|
num_active_paths=num_active_paths, |
|
)[0] |
|
|
|
date_time = now.strftime("%Y-%m-%d %H:%M:%S.%f") |
|
end = time.time() |
|
|
|
duration = wave.shape[0] / sample_rate |
|
rtf = (end - start) / duration |
|
|
|
logging.info(f"Finished at {date_time} s. Elapsed: {end - start: .3f} s") |
|
logging.info(f"Duration {duration: .3f} s") |
|
logging.info(f"RTF {rtf: .3f}") |
|
logging.info(f"hyp:\n{hyp}") |
|
|
|
return hyp |
|
|
|
|
|
title = "# Automatic Speech Recognition with Next-gen Kaldi" |
|
description = """ |
|
This space shows how to do automatic speech recognition with Next-gen Kaldi. |
|
|
|
See more information by visiting the following links: |
|
|
|
- <https://github.com/k2-fsa/icefall> |
|
- <https://github.com/k2-fsa/sherpa> |
|
- <https://github.com/k2-fsa/k2> |
|
- <https://github.com/lhotse-speech/lhotse> |
|
""" |
|
|
|
|
|
def update_model_dropdown(language: str): |
|
if language in language_to_models: |
|
choices = language_to_models[language] |
|
return gr.Dropdown.update(choices=choices, value=choices[0]) |
|
|
|
raise ValueError(f"Unsupported language: {language}") |
|
|
|
|
|
demo = gr.Blocks() |
|
|
|
with demo: |
|
gr.Markdown(title) |
|
language_choices = list(language_to_models.keys()) |
|
|
|
language_radio = gr.Radio( |
|
label="Language", |
|
choices=language_choices, |
|
value=language_choices[0], |
|
) |
|
model_dropdown = gr.Dropdown( |
|
choices=language_to_models[language_choices[0]], |
|
label="Select a model", |
|
value=language_to_models[language_choices[0]][0], |
|
) |
|
|
|
language_radio.change( |
|
update_model_dropdown, |
|
inputs=language_radio, |
|
outputs=model_dropdown, |
|
) |
|
|
|
decoding_method_radio = gr.Radio( |
|
label="Decoding method", |
|
choices=["greedy_search", "modified_beam_search"], |
|
value="greedy_search", |
|
) |
|
|
|
num_active_paths_slider = gr.Slider( |
|
minimum=1, |
|
value=4, |
|
step=1, |
|
label="Number of active paths for modified_beam_search", |
|
) |
|
|
|
with gr.Tabs(): |
|
with gr.TabItem("Upload from disk"): |
|
uploaded_file = gr.Audio( |
|
source="upload", |
|
type="filepath", |
|
optional=False, |
|
label="Upload from disk", |
|
) |
|
upload_button = gr.Button("Submit for recognition") |
|
uploaded_output = gr.Textbox(label="Recognized speech from uploaded file") |
|
|
|
with gr.TabItem("Record from microphone"): |
|
microphone = gr.Audio( |
|
source="microphone", |
|
type="filepath", |
|
optional=False, |
|
label="Record from microphone", |
|
) |
|
|
|
record_button = gr.Button("Submit for recognition") |
|
recorded_output = gr.Textbox(label="Recognized speech from recordings") |
|
|
|
upload_button.click( |
|
process, |
|
inputs=[ |
|
uploaded_file, |
|
language_radio, |
|
model_dropdown, |
|
decoding_method_radio, |
|
num_active_paths_slider, |
|
], |
|
outputs=uploaded_output, |
|
) |
|
record_button.click( |
|
process, |
|
inputs=[ |
|
microphone, |
|
language_radio, |
|
model_dropdown, |
|
decoding_method_radio, |
|
num_active_paths_slider, |
|
], |
|
outputs=recorded_output, |
|
) |
|
gr.Markdown(description) |
|
|
|
if __name__ == "__main__": |
|
formatter = "%(asctime)s %(levelname)s [%(filename)s:%(lineno)d] %(message)s" |
|
|
|
logging.basicConfig(format=formatter, level=logging.INFO) |
|
|
|
demo.launch() |
|
|