Spaces:
Runtime error
Runtime error
import gradio as gr | |
import nltk | |
import librosa | |
from transformers import pipeline, TranslationPipeline, AutoTokenizer, TranslationPipeline | |
from transformers import Wav2Vec2Processor, Wav2Vec2ForCTC, Wav2Vec2Tokenizer | |
from transformers.file_utils import cached_path, hf_bucket_url | |
import os, zipfile | |
from datasets import load_dataset | |
import torch | |
import kenlm | |
import torchaudio | |
from pyctcdecode import Alphabet, BeamSearchDecoderCTC, LanguageModel | |
"""Vietnamese speech2text""" | |
cache_dir = './cache/' | |
processor = Wav2Vec2Processor.from_pretrained("nguyenvulebinh/wav2vec2-base-vietnamese-250h", cache_dir=cache_dir) | |
vi_model = Wav2Vec2ForCTC.from_pretrained("nguyenvulebinh/wav2vec2-base-vietnamese-250h", cache_dir=cache_dir) | |
lm_file = hf_bucket_url("nguyenvulebinh/wav2vec2-base-vietnamese-250h", filename='vi_lm_4grams.bin.zip') | |
lm_file = cached_path(lm_file,cache_dir=cache_dir) | |
with zipfile.ZipFile(lm_file, 'r') as zip_ref: | |
zip_ref.extractall(cache_dir) | |
lm_file = cache_dir + 'vi_lm_4grams.bin' | |
def get_decoder_ngram_model(tokenizer, ngram_lm_path): | |
vocab_dict = tokenizer.get_vocab() | |
sort_vocab = sorted((value, key) for (key, value) in vocab_dict.items()) | |
vocab = [x[1] for x in sort_vocab][:-2] | |
vocab_list = vocab | |
# convert ctc blank character representation | |
vocab_list[tokenizer.pad_token_id] = "" | |
# replace special characters | |
vocab_list[tokenizer.unk_token_id] = "" | |
# vocab_list[tokenizer.bos_token_id] = "" | |
# vocab_list[tokenizer.eos_token_id] = "" | |
# convert space character representation | |
vocab_list[tokenizer.word_delimiter_token_id] = " " | |
# specify ctc blank char index, since conventially it is the last entry of the logit matrix | |
alphabet = Alphabet.build_alphabet(vocab_list, ctc_token_idx=tokenizer.pad_token_id) | |
lm_model = kenlm.Model(ngram_lm_path) | |
decoder = BeamSearchDecoderCTC(alphabet, | |
language_model=LanguageModel(lm_model)) | |
return decoder | |
ngram_lm_model = get_decoder_ngram_model(processor.tokenizer, lm_file) | |
# define function to read in sound file | |
def speech_file_to_array_fn(path, max_seconds=10): | |
batch = {"file": path} | |
speech_array, sampling_rate = torchaudio.load(batch["file"]) | |
if sampling_rate != 16000: | |
transform = torchaudio.transforms.Resample(orig_freq=sampling_rate, | |
new_freq=16000) | |
speech_array = transform(speech_array) | |
speech_array = speech_array[0] | |
if max_seconds > 0: | |
speech_array = speech_array[:max_seconds*16000] | |
batch["speech"] = speech_array.numpy() | |
batch["sampling_rate"] = 16000 | |
return batch | |
# tokenize | |
def speech2text_vi(audio): | |
# read in sound file | |
# load dummy dataset and read soundfiles | |
ds = speech_file_to_array_fn(audio.name) | |
# infer model | |
input_values = processor( | |
ds["speech"], | |
sampling_rate=ds["sampling_rate"], | |
return_tensors="pt" | |
).input_values | |
# decode ctc output | |
logits = vi_model(input_values).logits[0] | |
pred_ids = torch.argmax(logits, dim=-1) | |
greedy_search_output = processor.decode(pred_ids) | |
beam_search_output = ngram_lm_model.decode(logits.cpu().detach().numpy(), beam_width=500) | |
return beam_search_output | |
"""English speech2text""" | |
nltk.download("punkt") | |
# Loading the model and the tokenizer | |
model_name = "facebook/s2t-small-librispeech-asr" | |
eng_tokenizer = Wav2Vec2Tokenizer.from_pretrained(model_name) | |
eng_model = Wav2Vec2ForCTC.from_pretrained(model_name) | |
def load_data(input_file): | |
""" Function for resampling to ensure that the speech input is sampled at 16KHz. | |
""" | |
# read the file | |
speech, sample_rate = librosa.load(input_file) | |
# make it 1-D | |
if len(speech.shape) > 1: | |
speech = speech[:, 0] + speech[:, 1] | |
# Resampling at 16KHz since wav2vec2-base-960h is pretrained and fine-tuned on speech audio sampled at 16 KHz. | |
if sample_rate != 16000: | |
speech = librosa.resample(speech, sample_rate, 16000) | |
return speech | |
def correct_casing(input_sentence): | |
""" This function is for correcting the casing of the generated transcribed text | |
""" | |
sentences = nltk.sent_tokenize(input_sentence) | |
return (' '.join([s.replace(s[0], s[0].capitalize(), 1) for s in sentences])) | |
def speech2text_en(input_file): | |
"""This function generates transcripts for the provided audio input | |
""" | |
speech = load_data(input_file) | |
# Tokenize | |
input_values = eng_tokenizer(speech, return_tensors="pt").input_values | |
# Take logits | |
logits = eng_model(input_values).logits | |
# Take argmax | |
predicted_ids = torch.argmax(logits, dim=-1) | |
# Get the words from predicted word ids | |
transcription = eng_tokenizer.decode(predicted_ids[0]) | |
# Output is all upper case | |
transcription = correct_casing(transcription.lower()) | |
return transcription | |
"""Machine translation""" | |
vien_model_checkpoint = "datnth1709/finetuned_HelsinkiNLP-opus-mt-vi-en_PhoMT" | |
envi_model_checkpoint = "datnth1709/finetuned_HelsinkiNLP-opus-mt-en-vi_PhoMT" | |
vien_translator = pipeline("translation", model=vien_model_checkpoint) | |
envi_translator = pipeline("translation", model=envi_model_checkpoint) | |
def translate_vi2en(Vietnamese): | |
return vien_translator(Vietnamese)[0]['translation_text'] | |
def translate_en2vi(English): | |
return envi_translator(English)[0]['translation_text'] | |
""" Inference""" | |
def inference_vien(audio): | |
vi_text = speech2text_vi(audio) | |
en_text = translate_vi2en(vi_text) | |
return vi_text, en_text | |
def inference_envi(audio): | |
en_text = speech2text_en(audio) | |
vi_text = translate_en2vi(en_text) | |
return en_text, vi_text | |
def transcribe_vi(audio, state_vi="", state_en=""): | |
ds = speech_file_to_array_fn(audio.name) | |
# infer model | |
input_values = processor( | |
ds["speech"], | |
sampling_rate=ds["sampling_rate"], | |
return_tensors="pt" | |
).input_values | |
# decode ctc output | |
logits = vi_model(input_values).logits[0] | |
pred_ids = torch.argmax(logits, dim=-1) | |
greedy_search_output = processor.decode(pred_ids) | |
beam_search_output = ngram_lm_model.decode(logits.cpu().detach().numpy(), beam_width=500) | |
state_vi += beam_search_output + " " | |
en_text = translate_vi2en(beam_search_output) | |
state_en += en_text + " " | |
return state_vi, state_en | |
def transcribe_en(audio, state_en="", state_vi=""): | |
speech = load_data(audio) | |
# Tokenize | |
input_values = eng_tokenizer(speech, return_tensors="pt").input_values | |
# Take logits | |
logits = eng_model(input_values).logits | |
# Take argmax | |
predicted_ids = torch.argmax(logits, dim=-1) | |
# Get the words from predicted word ids | |
transcription = eng_tokenizer.decode(predicted_ids[0]) | |
# Output is all upper case | |
transcription = correct_casing(transcription.lower()) | |
state_en += transcription + "+" | |
vi_text = translate_en2vi(transcription) | |
state_vi += vi_text + "+" | |
return state_en, state_vi | |
def transcribe_vi_1(audio, state_en=""): | |
ds = speech_file_to_array_fn(audio.name) | |
# infer model | |
input_values = processor( | |
ds["speech"], | |
sampling_rate=ds["sampling_rate"], | |
return_tensors="pt" | |
).input_values | |
# decode ctc output | |
logits = vi_model(input_values).logits[0] | |
pred_ids = torch.argmax(logits, dim=-1) | |
greedy_search_output = processor.decode(pred_ids) | |
beam_search_output = ngram_lm_model.decode(logits.cpu().detach().numpy(), beam_width=500) | |
en_text = translate_vi2en(beam_search_output) | |
state_en += en_text + " " | |
return state_en, state_en | |
def transcribe_en_1(audio, state_vi=""): | |
speech = load_data(audio) | |
# Tokenize | |
input_values = eng_tokenizer(speech, return_tensors="pt").input_values | |
# Take logits | |
logits = eng_model(input_values).logits | |
# Take argmax | |
predicted_ids = torch.argmax(logits, dim=-1) | |
# Get the words from predicted word ids | |
transcription = eng_tokenizer.decode(predicted_ids[0]) | |
# Output is all upper case | |
transcription = correct_casing(transcription.lower()) | |
vi_text = translate_en2vi(transcription) | |
state_vi += vi_text + "+" | |
return state_vi, state_vi | |
"""Gradio demo""" | |
vi_example_text = ["Có phải bạn đang muốn tìm mua nhà ở ngoại ô thành phố Hồ Chí Minh không?", | |
"Ánh mắt ta chạm nhau. Chỉ muốn ngắm anh lâu thật lâu.", | |
"Nếu như một câu nói có thể khiến em vui."] | |
vi_example_voice =[['vi_speech_01.wav'], ['vi_speech_02.wav'], ['vi_speech_03.wav']] | |
en_example_text = ["According to a study by Statista, the global AI market is set to grow up to 54 percent every single year.", | |
"As one of the world's greatest cities, Air New Zealand is proud to add the Big Apple to its list of 29 international destinations.", | |
"And yet, earlier this month, I found myself at Halloween Horror Nights at Universal Orlando Resort, one of the most popular Halloween events in the US among hardcore horror buffs." | |
] | |
en_example_voice =[['en_speech_01.wav'], ['en_speech_02.wav'], ['en_speech_03.wav']] | |
with gr.Blocks() as demo: | |
with gr.Tabs(): | |
with gr.TabItem("Vi-En Realtime Translation"): | |
gr.Interface( | |
fn=transcribe_vi_1, | |
inputs=[ | |
gr.Audio(source="microphone", label="Input Vietnamese Audio", type="file", streaming=True), | |
"state", | |
], | |
outputs= [ | |
"text", | |
"state", | |
], | |
examples=vi_example_voice, | |
live=True).launch() | |
with gr.Tabs(): | |
with gr.TabItem("En-Vi Realtime Translation"): | |
gr.Interface( | |
fn=transcribe_en_1, | |
inputs=[ | |
gr.Audio(source="microphone", label="Input English Audio", type="filepath", streaming=True), | |
"state", | |
], | |
outputs= [ | |
"text", | |
"state", | |
], | |
examples=en_example_voice, | |
live=True).launch() | |
if __name__ == "__main__": | |
demo.launch() |