Spaces:
Runtime error
Runtime error
# imports | |
import os | |
import sys | |
import gradio as gr | |
import whisper | |
import torch | |
import traceback | |
import shutil | |
import yaml | |
import re | |
from pydub import AudioSegment | |
from huggingface_hub import snapshot_download | |
import json | |
import requests | |
import wave | |
from pynvml import * | |
import time | |
import mRASPloader | |
torch.cuda.empty_cache() | |
# TTS header and url | |
headers = {"Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoiYTI5NDFhMmEtYzA5ZS00YTcyLWI5ZGItODM5ODEzZDIwMGEwIiwidHlwZSI6ImFwaV90b2tlbiJ9.StBap5nQtNqjh1BMz9DledR5tg5FTWdUMVBrDwY6DjY"} | |
url ="https://api.edenai.run/v2/audio/text_to_speech" | |
# the model we are using for ASR, options are small, medium, large and largev2 (large and largev2 don't fit on huggingface cpu) | |
model = whisper.load_model("medium") | |
# A table to look up all the languages | |
language_id_lookup = { | |
"Arabic" : "ar", | |
"English" : "en", | |
"Chinese" : "zh", | |
"Spanish" : "es", | |
"Russian" : "ru", | |
"French" : "fr", | |
"German" : "de", | |
"Italian" : "it", | |
"Netherlands": "nl", | |
"Portuguese": "pt", | |
"Romanian" : "ro", | |
} | |
# A lookup table for ConST | |
LANG_GEN_SETUPS = { | |
"de": {"beam": 10, "lenpen": 0.7}, | |
"es": {"beam": 10, "lenpen": 0.1}, | |
"fr": {"beam": 10, "lenpen": 1.0}, | |
"it": {"beam": 10, "lenpen": 0.5}, | |
"nl": {"beam": 10, "lenpen": 0.4}, | |
"pt": {"beam": 10, "lenpen": 0.9}, | |
"ro": {"beam": 10, "lenpen": 1.0}, | |
"ru": {"beam": 10, "lenpen": 0.3}, | |
} | |
# A lookup table for TTS (edenai) | |
lang2voice = { | |
"Arabic" : ["ar-XA", "MALE"], | |
"English" : ["en-US", "FEMALE"], | |
"Chinese" : ["cmn-TW", "MALE"], | |
"Spanish" : ["es-ES","MALE"], | |
"Russian" : ["ru-RU,", "FEMALE"], | |
"French" : ["fr-FR", "FEMALE"], | |
"German" : ["de-DE", "MALE"], | |
"Italian" : ["it-IT", "FEMALE"], | |
"Netherlands": ["nl-NL", "MALE"], | |
"Portuguese": ["pt-BR", "FEMALE"], | |
"Romanian" : ["ro-RO", "MALE"], | |
} | |
# load whisper | |
os.system("pip install git+https://github.com/openai/whisper.git") | |
# load mRASP2 | |
# load ConST | |
#os.system("git clone https://github.com/ReneeYe/ConST") | |
#os.system("mv ConST ConST_git") | |
#os.system('mv -n ConST_git/* ./') | |
#os.system("rm -rf ConST_git") | |
#os.system("pip3 install --editable ./") | |
#os.system("mkdir -p data checkpoint") | |
huggingface_model_dir = snapshot_download(repo_id="ReneeYe/ConST_en2x_models") | |
print(huggingface_model_dir) | |
def restrict_src_options(model_type): | |
if model_type == 'Whisper+mRASP2': | |
return gr.Dropdown.update(visible= True), gr.Dropdown.update(visible= True), gr.Dropdown.update(visible= False), gr.Button.update(visible= True) | |
else: | |
return gr.Dropdown.update(visible= False), gr.Dropdown.update(visible= False), gr.Dropdown.update(visible= True), gr.Button.update(visible= False) | |
def switchLang(src_lang, tgt_lang): | |
return tgt_lang, src_lang | |
# The predict function. audio, language and mic_audio are all parameters directly passed by gradio | |
# which means they are user inputted. They are specified in gr.inputs[] block at the bottom. The | |
# gr.outputs[] block will specify the output type. | |
def predict(audio, src_language, tgt_language_mRASP, tgt_language_ConST, model_type, mic_audio=None): | |
# checks if mic_audio is used, otherwise feeds model uploaded audio | |
start_predict = time.time() | |
if mic_audio is not None: | |
input_audio = mic_audio | |
elif audio is not None: | |
input_audio = audio | |
else: | |
return "(please provide audio)" | |
transcript = "Undefined" | |
translation = "Undefined" | |
if model_type == 'Whisper+mRASP2': | |
transcript, translation = predictWithmRASP2(input_audio, src_language, tgt_language_mRASP) | |
language = tgt_language_mRASP | |
elif model_type == 'ConST': | |
predictWithConST(input_audio, tgt_language_ConST) | |
language = tgt_language_ConST | |
start_tts = time.time() | |
payload={ | |
"providers": "google", | |
"language": lang2voice[language][0], | |
"option": lang2voice[language][1], | |
"text": translation, | |
} | |
response = requests.post(url, json=payload, headers=headers) | |
result = json.loads(response.text) | |
os.system('wget -O output.wav "{}"'.format(result['google']['audio_resource_url'])) | |
tts_time = time.time() - start_tts | |
print(f"Took {tts_time} to do text to speech") | |
total_time = time.time() - start_predict | |
print(f"Took {total_time} to do entire prediction") | |
return transcript, translation, "output.wav" | |
def predictWithmRASP2(input_audio, src_language, tgt_language): | |
print("Called predictWithmRASP2") | |
# Uses the model's preprocessing methods to preprocess audio | |
asr_start = time.time() | |
audio = whisper.load_audio(input_audio) | |
audio = whisper.pad_or_trim(audio) | |
# Calculates the mel frequency spectogram | |
mel = whisper.log_mel_spectrogram(audio).to(model.device) | |
# if model is supposed to detect language, set outLanguage to None | |
# otherwise set to specified language | |
if(src_language == "Detect Language"): | |
src_language = None | |
else: | |
src_language = language_id_lookup[src_language.split()[0]] | |
tgt_language = language_id_lookup[tgt_language.split()[0]] | |
# Runs the audio through the whisper model and gets the DecodingResult object, which has the features: | |
# audio_features (Tensor), language, language_probs, tokens, text, avg_logprob, no_speech_prob, temperature, compression_ratio | |
# asr | |
options = whisper.DecodingOptions(fp16 = True, language = src_language) | |
result = whisper.decode(model, mel, options) | |
if src_language is None: | |
src_language = result.language | |
transcript = result.text | |
asr_time = time.time() - asr_start | |
mt_start_time = time.time() | |
# mt | |
with open("input." + src_language, 'w') as w: | |
w.write(result.text) | |
with open("input." + tgt_language, 'w') as w: | |
w.write('LANG_TOK_' + src_language.upper()) | |
#os.system("python3 fairseq/fairseq_cli/preprocess.py --dataset-impl raw \ | |
# --srcdict bpe_vocab --tgtdict bpe_vocab --testpref input -s {} -t {}".format( \ | |
# src_language, tgt_language)) | |
#previous way of doing it | |
old_way = """os.system("python3 fairseq/fairseq_cli/interactive.py ./data-bin \ | |
--user-dir mcolt \ | |
-s zh \ | |
-t en \ | |
--skip-invalid-size-inputs-valid-test \ | |
--path {} \ | |
--max-tokens 1024 \ | |
--task translation_w_langtok \ | |
--lang-prefix-tok \"LANG_TOK_{}\" \ | |
--max-source-positions 1024 \ | |
--max-target-positions 1024 \ | |
--nbest 1 \ | |
--bpe subword_nmt \ | |
--bpe-codes codes.bpe.32000 \ | |
--post-process --tokenizer moses \ | |
--input input.{} | grep -E '[D]-[0-9]+' > output".format( | |
model_name, tgt_language.upper(), src_language))""" | |
translation = mRASPloader.infer(cfg, models, task, max_positions, tokenizer, bpe, use_cuda, generator, src_dict, tgt_dict, align_dict, start_time, start_id, src_language, tgt_language) | |
translation = (' '.join(translation.split(' ')[1:])).strip() | |
mt_time = time.time() - mt_start_time | |
print(f"Took {mt_time} to do Machine Translation") | |
#print(model_name) | |
#with open("output", 'r') as r: | |
# translation = "Undefined" | |
# translation = (' '.join(r.readline().split(' ')[1:])).strip() | |
# print(translation) | |
# Returns the text | |
print("returning transcript: " + transcript + " and the translation: " + translation) | |
return transcript, translation | |
# Helper methods for ConST (as written in https://huggingface.co/spaces/ReneeYe/ConST-speech2text-translator/blob/main/app.py) | |
def convert_audio_to_16k_wav(audio_input): | |
sound = AudioSegment.from_file(audio_input) | |
sample_rate = sound.frame_rate | |
num_channels = sound.channels | |
num_frames = int(sound.frame_count()) | |
filename = audio_input.split("/")[-1] | |
print("original file is at:", audio_input) | |
if (num_channels > 1) or (sample_rate != 16000): # convert to mono-channel 16k wav | |
if num_channels > 1: | |
sound = sound.set_channels(1) | |
if sample_rate != 16000: | |
sound = sound.set_frame_rate(16000) | |
num_frames = int(sound.frame_count()) | |
filename = filename.replace(".wav", "") + "_16k.wav" | |
sound.export(f"data/{filename}", format="wav") | |
else: | |
shutil.copy(audio_input, f'data/{filename}') | |
return filename, num_frames | |
def prepare_tsv(file_name, n_frame, language, task="ST"): | |
tgt_lang = language_id_lookup[language] | |
with open("data/test_case.tsv", "w") as f: | |
f.write("id\taudio\tn_frames\ttgt_text\tspeaker\tsrc_lang\ttgt_lang\tsrc_text\n") | |
f.write(f"sample\t{file_name}\t{n_frame}\tThis is in {tgt_lang}.\tspk.1\ten\t{tgt_lang}\tThis is English.\n") | |
def get_vocab_and_yaml(language): | |
tgt_lang = language_id_lookup[language] | |
# get: spm_ende.model and spm_ende.txt, and save to data/xxx | |
# if exist, no need to download | |
shutil.copy(os.path.join(huggingface_model_dir, f"vocabulary/spm_en{tgt_lang}.model"), "./data") | |
shutil.copy(os.path.join(huggingface_model_dir, f"vocabulary/spm_en{tgt_lang}.txt"), "./data") | |
# write yaml file | |
abs_path = os.popen("pwd").read().strip() | |
yaml_dict = LANG_GEN_SETUPS[tgt_lang] | |
yaml_dict["input_channels"] = 1 | |
yaml_dict["use_audio_input"] = True | |
yaml_dict["prepend_tgt_lang_tag"] = True | |
yaml_dict["prepend_src_lang_tag"] = True | |
yaml_dict["audio_root"] = os.path.join(abs_path, "data") | |
yaml_dict["vocab_filename"] = f"spm_en{tgt_lang}.txt" | |
yaml_dict["bpe_tokenizer"] = {"bpe": "sentencepiece", | |
"sentencepiece_model": os.path.join(abs_path, f"data/spm_en{tgt_lang}.model")} | |
with open("data/config.yaml", "w") as f: | |
yaml.dump(yaml_dict, f) | |
def get_model(language): | |
# download models to checkpoint/xxx | |
return os.path.join(huggingface_model_dir, f"models/const_en{language_id_lookup[language]}.pt") | |
def generate(model_path): | |
os.system(f"python3 fairseq/fairseq_cli/generate.py data/ --gen-subset test_case --task speech_to_text --prefix-size 1 \ | |
--max-source-positions 4000000 \ | |
--config-yaml config.yaml --path {model_path} | tee temp.txt") | |
print("No problem with 1st line") | |
output = os.popen("grep ^D temp.txt | sort -n -k 2 -t '-' | cut -f 3") | |
return output.read().strip() | |
def post_processing(raw_sentence): | |
output_sentence = raw_sentence | |
if ":" in raw_sentence: | |
splited_sent = raw_sentence.split(":") | |
if len(splited_sent) == 2: | |
prefix = splited_sent[0].strip() | |
if len(prefix) <= 3: | |
output_sentence = splited_sent[1].strip() | |
elif ("(" in prefix) and (")" in prefix): | |
bgm = re.findall(r"\(.*?\)", prefix)[0] | |
if len(prefix.replace(bgm, "").strip()) <= 3: | |
output_sentence = splited_sent[1].strip() | |
elif len(splited_sent[1].strip()) > 8: | |
output_sentence = splited_sent[1].strip() | |
elif ("(" in raw_sentence) and (")" in raw_sentence): | |
bgm_list = re.findall(r"\(.*?\)", raw_sentence) | |
for bgm in bgm_list: | |
if len(raw_sentence.replace(bgm, "").strip()) > 5: | |
output_sentence = output_sentence.replace(bgm, "").strip() | |
if len(output_sentence) <= 5: | |
output_sentence = raw_sentence | |
return output_sentence | |
def remove_temp_files(audio_file): | |
os.remove("temp.txt") | |
os.remove("data/test_case.tsv") | |
os.remove(f"data/{audio_file}") | |
def error_output(language): | |
return f"Fail to translate the audio into {language}, you may use the examples I provide." | |
# Predicting the translation with ConST | |
def predictWithConST(audio_file, language): | |
try: | |
converted_audio_file, n_frame = convert_audio_to_16k_wav(audio_file) | |
prepare_tsv(converted_audio_file, n_frame, language) | |
get_vocab_and_yaml(language) | |
model_path = get_model(language) | |
print("This is the model path: " + model_path) | |
generate_model_path = generate(model_path) | |
print("No problem generating model path") | |
generated_output = post_processing(generate_model_path) | |
print("No problem generating output") | |
remove_temp_files(converted_audio_file) | |
print("No problem removing_temp") | |
return generated_output | |
except: | |
traceback.print_exc() | |
return error_output(language) | |
title = "Demo for Speech Translation (Whisper+mRASP2 and ConST)" | |
description = """ | |
<b>How to use:</b> Upload an audio file or record using the microphone. The audio is either processed by being inputted into the openai whisper model for transcription | |
and then mRASP2 for translation, or by ConST, which directly takes the audio input and produces text in the desired language. When using Whisper+mRASP2, | |
you can ask the model to detect a language, it will tell you what language it detected. ConST only supports translating from English to another language. | |
""" | |
# The gradio block | |
cfg = mRASPloader.createCFG() | |
print(cfg) | |
models, task, max_positions, tokenizer, bpe, use_cuda, generator, src_dict, tgt_dict, align_dict, start_time, start_id = mRASPloader.loadmRASP2(cfg) | |
demo = gr.Blocks() | |
with demo: | |
gr.Markdown("# " + title) | |
gr.Markdown("###" + description) | |
with gr.Row(): | |
with gr.Column(): | |
model_type = gr.Dropdown(['Whisper+mRASP2', 'ConST'], type = "value", value = 'Whisper+mRASP2', label = "Select the model you want to use.") | |
audio_file = gr.Audio(label="Upload Speech", source="upload", type="filepath") | |
src_language = gr.Dropdown(['Arabic', | |
'Chinese', | |
'English', | |
'Spanish', | |
'Russian', | |
'French', | |
'Detect Language'], value = 'English', label="Select the language of input") | |
tgt_language_mRASP = gr.Dropdown(['Arabic', | |
'Chinese', | |
'English', | |
'Spanish', | |
'Russian', | |
'French'], type="value", value='English', label="Select the language of output") | |
tgt_language_ConST = gr.Dropdown(['German', | |
'Spanish', | |
'French', | |
'Italian', | |
'Netherlands', | |
'Portugese', | |
'Romanian', | |
'Russian'], type = 'value', value='German', label="Select the language of output", visible= False) | |
switch_lang_button = gr.Button("Switch input and output languages") | |
mic_audio = gr.Audio(label="Record Speech", source="microphone", type="filepath") | |
model_type.change(fn = restrict_src_options, inputs=[model_type], outputs=[src_language, tgt_language_mRASP, tgt_language_ConST, switch_lang_button]) | |
submit_button = gr.Button("Submit") | |
with gr.Column(): | |
transcript = gr.Text(label= "Transcription") | |
translate = gr.Text(label= "Translation") | |
translated_speech = gr.Audio(label="Translation Speech") | |
submit_button.click(fn = predict, inputs=[audio_file, src_language, tgt_language_mRASP, tgt_language_ConST, model_type, mic_audio], outputs=[transcript, translate, translated_speech]) | |
switch_lang_button.click(switchLang, [src_language, tgt_language_mRASP], [src_language, tgt_language_mRASP]) | |
demo.launch(share= True) |