#!/usr/bin/env python3 # # Copyright 2022-2023 Xiaomi Corp. (authors: Fangjun Kuang) # # See LICENSE for clarification regarding multiple authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # References: # https://gradio.app/docs/#dropdown import logging import os from typing import List, Optional, Tuple import gradio as gr import torchaudio import numpy as np import torch from model import create_recognizer, language_to_models title = "Next-gen Kaldi: Real-time streaming speech recognition" description = """ This space shows how to do **real-time** streaming speech recognition with **Next-gen Kaldi**. Please visit for non-streaming speech recognition with **Next-gen Kaldi**. It is running on CPU within a docker container provided by Hugging Face. **Caution**: You may see **significant delay** since HuggingFace sends your recorded data by chunks and the interval between chunks is unknown, e.g., may be 2 seconds. """ article = """ See more information by visiting the following links: - - - - If you want to deploy it locally, please see Usage instructions: (1) Select a language and a model from the dropdown box (2) Click the Record button to start (3) Speak (4) Click the Stop Recording button to stop (5) **Remember to click the Clear button before you re-click the Record button** (6) **Remember to click the Clear button before you re-click the Record button** (7) **Remember to click the Clear button before you re-click the Record button** """ def convert_to_wav(in_filename: str) -> str: """Convert the input audio file to a wave file""" out_filename = in_filename + ".wav" # logging.info(f"Converting '{in_filename}' to '{out_filename}'") _ = os.system( f"ffmpeg -hide_banner -loglevel error -i '{in_filename}' -ar 16000 '{out_filename}'" ) return out_filename def get_language_and_model() -> List[str]: """ Each entry is of the following format: language | repo_id """ ans = [] for language, repo_id_list in language_to_models.items(): for repo_id in repo_id_list: ans.append(f"{language} | {repo_id}") return ans language_model_list = get_language_and_model() def process( language_and_repo_id: str, audio: Optional[Tuple[int, np.ndarray]], state=None ): """ Args: language_and_repo_id: It contains "language | repo_id" audio: Path to the audio file. Not necessarily in wave format. state: If not None, it contains a list: - error message if any - language_and_repo_id - recognizer - stream - wasOk """ logging.info(f"begin one chunk.") language, repo_id = language_and_repo_id.split("|") language = language.strip() repo_id = repo_id.strip() if state is None: print("language", language) print("repo_id", repo_id) recognizer = create_recognizer(repo_id) stream = recognizer.create_stream() state = ["", language_and_repo_id, recognizer, stream, True] if not state[-1]: return state[0], state if audio is None: if "Error" in state[0]: return state[0], state else: recognizer = state[2] stream = state[3] return recognizer.get_result(stream).text.lower() if state[1] != language_and_repo_id: state[0] = ( "Error: Please don't change the language and model during recognition " + "or " + "please press the Clear button before you re-click Record or re-select " + "language and model.\n\n\n" + "Hint: Click Stop Recording and then press Clear to fix this error." ) state[-1] = False return state[0], state # filename = convert_to_wav(audio) samples = torch.from_numpy(audio[1]) samples = samples / np.iinfo(np.int16).max assert audio[0] == 16000, (audio[0], 16000) # samples, sample_rate = torchaudio.load(filename) # assert sample_rate == 16000, (sample_rate, 16000) logging.info(f"samples shape : {samples.shape}") # samples = samples.squeeze(0) duration = samples.numel() / 16000 # logging.info(f"duration: {duration} s") recognizer = state[2] stream = state[3] stream.accept_waveform(16000, samples) while recognizer.is_ready(stream): recognizer.decode_stream(stream) text = recognizer.get_result(stream).text.lower() logging.info(text) return text, state language_dropdown = gr.inputs.Dropdown( label="Select a language and a model", choices=language_model_list, default=language_model_list[0], ) itf1 = gr.Interface( title=title, description=description, article=article, fn=process, inputs=[ language_dropdown, gr.Audio( source="microphone", type="numpy", label="Press me to start recognition", #streaming=True, ), "state", ], outputs=[ gr.outputs.Textbox(type="str", label="result"), gr.outputs.State(label=""), ], #live=True, ) if __name__ == "__main__": formatter = "%(asctime)s %(levelname)s [%(filename)s:%(lineno)d] %(message)s" logging.basicConfig(format=formatter, level=logging.INFO) itf1.launch()