# imports import os import sys import gradio as gr import whisper import torch import traceback import shutil import yaml import re from pydub import AudioSegment from huggingface_hub import snapshot_download import json import requests import wave from pynvml import * import time import mRASPloader torch.cuda.empty_cache() # TTS header and url headers = {"Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoiYTI5NDFhMmEtYzA5ZS00YTcyLWI5ZGItODM5ODEzZDIwMGEwIiwidHlwZSI6ImFwaV90b2tlbiJ9.StBap5nQtNqjh1BMz9DledR5tg5FTWdUMVBrDwY6DjY"} url ="https://api.edenai.run/v2/audio/text_to_speech" # the model we are using for ASR, options are small, medium, large and largev2 (large and largev2 don't fit on huggingface cpu) model = whisper.load_model("medium") # A table to look up all the languages language_id_lookup = { "Arabic" : "ar", "English" : "en", "Chinese" : "zh", "Spanish" : "es", "Russian" : "ru", "French" : "fr", "German" : "de", "Italian" : "it", "Netherlands": "nl", "Portuguese": "pt", "Romanian" : "ro", } # A lookup table for ConST LANG_GEN_SETUPS = { "de": {"beam": 10, "lenpen": 0.7}, "es": {"beam": 10, "lenpen": 0.1}, "fr": {"beam": 10, "lenpen": 1.0}, "it": {"beam": 10, "lenpen": 0.5}, "nl": {"beam": 10, "lenpen": 0.4}, "pt": {"beam": 10, "lenpen": 0.9}, "ro": {"beam": 10, "lenpen": 1.0}, "ru": {"beam": 10, "lenpen": 0.3}, } # A lookup table for TTS (edenai) lang2voice = { "Arabic" : ["ar-XA", "MALE"], "English" : ["en-US", "FEMALE"], "Chinese" : ["cmn-TW", "MALE"], "Spanish" : ["es-ES","MALE"], "Russian" : ["ru-RU,", "FEMALE"], "French" : ["fr-FR", "FEMALE"], "German" : ["de-DE", "MALE"], "Italian" : ["it-IT", "FEMALE"], "Netherlands": ["nl-NL", "MALE"], "Portuguese": ["pt-BR", "FEMALE"], "Romanian" : ["ro-RO", "MALE"], } # load whisper os.system("pip install git+https://github.com/openai/whisper.git") # load mRASP2 # load ConST #os.system("git clone https://github.com/ReneeYe/ConST") #os.system("mv ConST ConST_git") #os.system('mv -n ConST_git/* ./') #os.system("rm -rf ConST_git") #os.system("pip3 install --editable ./") #os.system("mkdir -p data checkpoint") huggingface_model_dir = snapshot_download(repo_id="ReneeYe/ConST_en2x_models") print(huggingface_model_dir) def restrict_src_options(model_type): if model_type == 'Whisper+mRASP2': return gr.Dropdown.update(visible= True), gr.Dropdown.update(visible= True), gr.Dropdown.update(visible= False), gr.Button.update(visible= True) else: return gr.Dropdown.update(visible= False), gr.Dropdown.update(visible= False), gr.Dropdown.update(visible= True), gr.Button.update(visible= False) def switchLang(src_lang, tgt_lang): return tgt_lang, src_lang # The predict function. audio, language and mic_audio are all parameters directly passed by gradio # which means they are user inputted. They are specified in gr.inputs[] block at the bottom. The # gr.outputs[] block will specify the output type. def predict(audio, src_language, tgt_language_mRASP, tgt_language_ConST, model_type, mic_audio=None): # checks if mic_audio is used, otherwise feeds model uploaded audio start_predict = time.time() if mic_audio is not None: input_audio = mic_audio elif audio is not None: input_audio = audio else: return "(please provide audio)" transcript = "Undefined" translation = "Undefined" if model_type == 'Whisper+mRASP2': transcript, translation = predictWithmRASP2(input_audio, src_language, tgt_language_mRASP) language = tgt_language_mRASP elif model_type == 'ConST': predictWithConST(input_audio, tgt_language_ConST) language = tgt_language_ConST start_tts = time.time() payload={ "providers": "google", "language": lang2voice[language][0], "option": lang2voice[language][1], "text": translation, } response = requests.post(url, json=payload, headers=headers) result = json.loads(response.text) os.system('wget -O output.wav "{}"'.format(result['google']['audio_resource_url'])) tts_time = time.time() - start_tts print(f"Took {tts_time} to do text to speech") total_time = time.time() - start_predict print(f"Took {total_time} to do entire prediction") return transcript, translation, "output.wav" def predictWithmRASP2(input_audio, src_language, tgt_language): print("Called predictWithmRASP2") # Uses the model's preprocessing methods to preprocess audio asr_start = time.time() audio = whisper.load_audio(input_audio) audio = whisper.pad_or_trim(audio) # Calculates the mel frequency spectogram mel = whisper.log_mel_spectrogram(audio).to(model.device) # if model is supposed to detect language, set outLanguage to None # otherwise set to specified language if(src_language == "Detect Language"): src_language = None else: src_language = language_id_lookup[src_language.split()[0]] tgt_language = language_id_lookup[tgt_language.split()[0]] # Runs the audio through the whisper model and gets the DecodingResult object, which has the features: # audio_features (Tensor), language, language_probs, tokens, text, avg_logprob, no_speech_prob, temperature, compression_ratio # asr options = whisper.DecodingOptions(fp16 = True, language = src_language) result = whisper.decode(model, mel, options) if src_language is None: src_language = result.language transcript = result.text asr_time = time.time() - asr_start mt_start_time = time.time() # mt with open("input." + src_language, 'w') as w: w.write(result.text) with open("input." + tgt_language, 'w') as w: w.write('LANG_TOK_' + src_language.upper()) #os.system("python3 fairseq/fairseq_cli/preprocess.py --dataset-impl raw \ # --srcdict bpe_vocab --tgtdict bpe_vocab --testpref input -s {} -t {}".format( \ # src_language, tgt_language)) #previous way of doing it old_way = """os.system("python3 fairseq/fairseq_cli/interactive.py ./data-bin \ --user-dir mcolt \ -s zh \ -t en \ --skip-invalid-size-inputs-valid-test \ --path {} \ --max-tokens 1024 \ --task translation_w_langtok \ --lang-prefix-tok \"LANG_TOK_{}\" \ --max-source-positions 1024 \ --max-target-positions 1024 \ --nbest 1 \ --bpe subword_nmt \ --bpe-codes codes.bpe.32000 \ --post-process --tokenizer moses \ --input input.{} | grep -E '[D]-[0-9]+' > output".format( model_name, tgt_language.upper(), src_language))""" translation = mRASPloader.infer(cfg, models, task, max_positions, tokenizer, bpe, use_cuda, generator, src_dict, tgt_dict, align_dict, start_time, start_id, src_language, tgt_language) translation = (' '.join(translation.split(' ')[1:])).strip() mt_time = time.time() - mt_start_time print(f"Took {mt_time} to do Machine Translation") #print(model_name) #with open("output", 'r') as r: # translation = "Undefined" # translation = (' '.join(r.readline().split(' ')[1:])).strip() # print(translation) # Returns the text print("returning transcript: " + transcript + " and the translation: " + translation) return transcript, translation # Helper methods for ConST (as written in https://huggingface.co/spaces/ReneeYe/ConST-speech2text-translator/blob/main/app.py) def convert_audio_to_16k_wav(audio_input): sound = AudioSegment.from_file(audio_input) sample_rate = sound.frame_rate num_channels = sound.channels num_frames = int(sound.frame_count()) filename = audio_input.split("/")[-1] print("original file is at:", audio_input) if (num_channels > 1) or (sample_rate != 16000): # convert to mono-channel 16k wav if num_channels > 1: sound = sound.set_channels(1) if sample_rate != 16000: sound = sound.set_frame_rate(16000) num_frames = int(sound.frame_count()) filename = filename.replace(".wav", "") + "_16k.wav" sound.export(f"data/{filename}", format="wav") else: shutil.copy(audio_input, f'data/{filename}') return filename, num_frames def prepare_tsv(file_name, n_frame, language, task="ST"): tgt_lang = language_id_lookup[language] with open("data/test_case.tsv", "w") as f: f.write("id\taudio\tn_frames\ttgt_text\tspeaker\tsrc_lang\ttgt_lang\tsrc_text\n") f.write(f"sample\t{file_name}\t{n_frame}\tThis is in {tgt_lang}.\tspk.1\ten\t{tgt_lang}\tThis is English.\n") def get_vocab_and_yaml(language): tgt_lang = language_id_lookup[language] # get: spm_ende.model and spm_ende.txt, and save to data/xxx # if exist, no need to download shutil.copy(os.path.join(huggingface_model_dir, f"vocabulary/spm_en{tgt_lang}.model"), "./data") shutil.copy(os.path.join(huggingface_model_dir, f"vocabulary/spm_en{tgt_lang}.txt"), "./data") # write yaml file abs_path = os.popen("pwd").read().strip() yaml_dict = LANG_GEN_SETUPS[tgt_lang] yaml_dict["input_channels"] = 1 yaml_dict["use_audio_input"] = True yaml_dict["prepend_tgt_lang_tag"] = True yaml_dict["prepend_src_lang_tag"] = True yaml_dict["audio_root"] = os.path.join(abs_path, "data") yaml_dict["vocab_filename"] = f"spm_en{tgt_lang}.txt" yaml_dict["bpe_tokenizer"] = {"bpe": "sentencepiece", "sentencepiece_model": os.path.join(abs_path, f"data/spm_en{tgt_lang}.model")} with open("data/config.yaml", "w") as f: yaml.dump(yaml_dict, f) def get_model(language): # download models to checkpoint/xxx return os.path.join(huggingface_model_dir, f"models/const_en{language_id_lookup[language]}.pt") def generate(model_path): os.system(f"python3 fairseq/fairseq_cli/generate.py data/ --gen-subset test_case --task speech_to_text --prefix-size 1 \ --max-source-positions 4000000 \ --config-yaml config.yaml --path {model_path} | tee temp.txt") print("No problem with 1st line") output = os.popen("grep ^D temp.txt | sort -n -k 2 -t '-' | cut -f 3") return output.read().strip() def post_processing(raw_sentence): output_sentence = raw_sentence if ":" in raw_sentence: splited_sent = raw_sentence.split(":") if len(splited_sent) == 2: prefix = splited_sent[0].strip() if len(prefix) <= 3: output_sentence = splited_sent[1].strip() elif ("(" in prefix) and (")" in prefix): bgm = re.findall(r"\(.*?\)", prefix)[0] if len(prefix.replace(bgm, "").strip()) <= 3: output_sentence = splited_sent[1].strip() elif len(splited_sent[1].strip()) > 8: output_sentence = splited_sent[1].strip() elif ("(" in raw_sentence) and (")" in raw_sentence): bgm_list = re.findall(r"\(.*?\)", raw_sentence) for bgm in bgm_list: if len(raw_sentence.replace(bgm, "").strip()) > 5: output_sentence = output_sentence.replace(bgm, "").strip() if len(output_sentence) <= 5: output_sentence = raw_sentence return output_sentence def remove_temp_files(audio_file): os.remove("temp.txt") os.remove("data/test_case.tsv") os.remove(f"data/{audio_file}") def error_output(language): return f"Fail to translate the audio into {language}, you may use the examples I provide." # Predicting the translation with ConST def predictWithConST(audio_file, language): try: converted_audio_file, n_frame = convert_audio_to_16k_wav(audio_file) prepare_tsv(converted_audio_file, n_frame, language) get_vocab_and_yaml(language) model_path = get_model(language) print("This is the model path: " + model_path) generate_model_path = generate(model_path) print("No problem generating model path") generated_output = post_processing(generate_model_path) print("No problem generating output") remove_temp_files(converted_audio_file) print("No problem removing_temp") return generated_output except: traceback.print_exc() return error_output(language) title = "Demo for Speech Translation (Whisper+mRASP2 and ConST)" description = """ How to use: Upload an audio file or record using the microphone. The audio is either processed by being inputted into the openai whisper model for transcription and then mRASP2 for translation, or by ConST, which directly takes the audio input and produces text in the desired language. When using Whisper+mRASP2, you can ask the model to detect a language, it will tell you what language it detected. ConST only supports translating from English to another language. """ # The gradio block cfg = mRASPloader.createCFG() print(cfg) models, task, max_positions, tokenizer, bpe, use_cuda, generator, src_dict, tgt_dict, align_dict, start_time, start_id = mRASPloader.loadmRASP2(cfg) demo = gr.Blocks() with demo: gr.Markdown("# " + title) gr.Markdown("###" + description) with gr.Row(): with gr.Column(): model_type = gr.Dropdown(['Whisper+mRASP2', 'ConST'], type = "value", value = 'Whisper+mRASP2', label = "Select the model you want to use.") audio_file = gr.Audio(label="Upload Speech", source="upload", type="filepath") src_language = gr.Dropdown(['Arabic', 'Chinese', 'English', 'Spanish', 'Russian', 'French', 'Detect Language'], value = 'English', label="Select the language of input") tgt_language_mRASP = gr.Dropdown(['Arabic', 'Chinese', 'English', 'Spanish', 'Russian', 'French'], type="value", value='English', label="Select the language of output") tgt_language_ConST = gr.Dropdown(['German', 'Spanish', 'French', 'Italian', 'Netherlands', 'Portugese', 'Romanian', 'Russian'], type = 'value', value='German', label="Select the language of output", visible= False) switch_lang_button = gr.Button("Switch input and output languages") mic_audio = gr.Audio(label="Record Speech", source="microphone", type="filepath") model_type.change(fn = restrict_src_options, inputs=[model_type], outputs=[src_language, tgt_language_mRASP, tgt_language_ConST, switch_lang_button]) submit_button = gr.Button("Submit") with gr.Column(): transcript = gr.Text(label= "Transcription") translate = gr.Text(label= "Translation") translated_speech = gr.Audio(label="Translation Speech") submit_button.click(fn = predict, inputs=[audio_file, src_language, tgt_language_mRASP, tgt_language_ConST, model_type, mic_audio], outputs=[transcript, translate, translated_speech]) switch_lang_button.click(switchLang, [src_language, tgt_language_mRASP], [src_language, tgt_language_mRASP]) demo.launch(share= True)