datnth1709's picture
update source
468a13d
raw
history blame
11 kB
import gradio as gr
import nltk
import librosa
from optimum.onnxruntime import ORTModelForSeq2SeqLM
from transformers import pipeline, TranslationPipeline, AutoTokenizer, TranslationPipeline
from transformers import Wav2Vec2Processor, Wav2Vec2ForCTC, Wav2Vec2Tokenizer
from transformers.file_utils import cached_path, hf_bucket_url
import os, zipfile
from datasets import load_dataset
import torch
import kenlm
import torchaudio
from pyctcdecode import Alphabet, BeamSearchDecoderCTC, LanguageModel
device = torch.device(0 if torch.cuda.is_available() else "cpu")
"""Vietnamese speech2text"""
cache_dir = './cache/'
processor = Wav2Vec2Processor.from_pretrained("nguyenvulebinh/wav2vec2-base-vietnamese-250h", cache_dir=cache_dir)
vi_model = Wav2Vec2ForCTC.from_pretrained("nguyenvulebinh/wav2vec2-base-vietnamese-250h", cache_dir=cache_dir)
lm_file = hf_bucket_url("nguyenvulebinh/wav2vec2-base-vietnamese-250h", filename='vi_lm_4grams.bin.zip')
lm_file = cached_path(lm_file,cache_dir=cache_dir)
with zipfile.ZipFile(lm_file, 'r') as zip_ref:
zip_ref.extractall(cache_dir)
lm_file = cache_dir + 'vi_lm_4grams.bin'
def get_decoder_ngram_model(tokenizer, ngram_lm_path):
vocab_dict = tokenizer.get_vocab()
sort_vocab = sorted((value, key) for (key, value) in vocab_dict.items())
vocab = [x[1] for x in sort_vocab][:-2]
vocab_list = vocab
# convert ctc blank character representation
vocab_list[tokenizer.pad_token_id] = ""
# replace special characters
vocab_list[tokenizer.unk_token_id] = ""
# vocab_list[tokenizer.bos_token_id] = ""
# vocab_list[tokenizer.eos_token_id] = ""
# convert space character representation
vocab_list[tokenizer.word_delimiter_token_id] = " "
# specify ctc blank char index, since conventially it is the last entry of the logit matrix
alphabet = Alphabet.build_alphabet(vocab_list, ctc_token_idx=tokenizer.pad_token_id)
lm_model = kenlm.Model(ngram_lm_path)
decoder = BeamSearchDecoderCTC(alphabet,
language_model=LanguageModel(lm_model))
return decoder
ngram_lm_model = get_decoder_ngram_model(processor.tokenizer, lm_file)
# define function to read in sound file
def speech_file_to_array_fn(path, max_seconds=10):
batch = {"file": path}
speech_array, sampling_rate = torchaudio.load(batch["file"])
if sampling_rate != 16000:
transform = torchaudio.transforms.Resample(orig_freq=sampling_rate,
new_freq=16000)
speech_array = transform(speech_array)
speech_array = speech_array[0]
if max_seconds > 0:
speech_array = speech_array[:max_seconds*16000]
batch["speech"] = speech_array.numpy()
batch["sampling_rate"] = 16000
return batch
# tokenize
def speech2text_vi(audio):
# read in sound file
# load dummy dataset and read soundfiles
ds = speech_file_to_array_fn(audio.name)
# infer model
input_values = processor(
ds["speech"],
sampling_rate=ds["sampling_rate"],
return_tensors="pt"
).input_values
# decode ctc output
logits = vi_model(input_values).logits[0]
pred_ids = torch.argmax(logits, dim=-1)
greedy_search_output = processor.decode(pred_ids)
beam_search_output = ngram_lm_model.decode(logits.cpu().detach().numpy(), beam_width=500)
return beam_search_output
"""English speech2text"""
nltk.download("punkt")
# Loading the model and the tokenizer
model_name = "facebook/wav2vec2-base-960h"
eng_tokenizer = Wav2Vec2Tokenizer.from_pretrained(model_name)
eng_model = Wav2Vec2ForCTC.from_pretrained(model_name)
def load_data(input_file):
""" Function for resampling to ensure that the speech input is sampled at 16KHz.
"""
# read the file
speech, sample_rate = librosa.load(input_file)
# make it 1-D
if len(speech.shape) > 1:
speech = speech[:, 0] + speech[:, 1]
# Resampling at 16KHz since wav2vec2-base-960h is pretrained and fine-tuned on speech audio sampled at 16 KHz.
if sample_rate != 16000:
speech = librosa.resample(speech, sample_rate, 16000)
return speech
def correct_casing(input_sentence):
""" This function is for correcting the casing of the generated transcribed text
"""
sentences = nltk.sent_tokenize(input_sentence)
return (' '.join([s.replace(s[0], s[0].capitalize(), 1) for s in sentences]))
def speech2text_en(input_file):
"""This function generates transcripts for the provided audio input
"""
speech = load_data(input_file)
# Tokenize
input_values = eng_tokenizer(speech, return_tensors="pt").input_values
# Take logits
logits = eng_model(input_values).logits
# Take argmax
predicted_ids = torch.argmax(logits, dim=-1)
# Get the words from predicted word ids
transcription = eng_tokenizer.decode(predicted_ids[0])
# Output is all upper case
transcription = correct_casing(transcription.lower())
return transcription
"""Machine translation"""
vien_model_checkpoint = "datnth1709/finetuned_HelsinkiNLP-opus-mt-vi-en_PhoMT"
envi_model_checkpoint = "datnth1709/finetuned_HelsinkiNLP-opus-mt-en-vi_PhoMT"
# vien_translator = pipeline("translation", model=vien_model_checkpoint)
# envi_translator = pipeline("translation", model=envi_model_checkpoint)
vien_tokenizer = AutoTokenizer.from_pretrained(vien_model_checkpoint, return_tensors="pt")
vien_model = ORTModelForSeq2SeqLM.from_pretrained(vien_model_checkpoint)
vien_translator = TranslationPipeline(model=vien_model, tokenizer=vien_tokenizer,clean_up_tokenization_spaces=True, device=device)
envi_tokenizer = AutoTokenizer.from_pretrained(envi_model_checkpoint, return_tensors="pt")
envi_model = ORTModelForSeq2SeqLM.from_pretrained(envi_model_checkpoint)
envi_translator = TranslationPipeline(model=envi_model, tokenizer=envi_tokenizer,clean_up_tokenization_spaces=True, device=device)
def translate_vi2en(Vietnamese):
return vien_translator(Vietnamese)[0]['translation_text']
def translate_en2vi(English):
return envi_translator(English)[0]['translation_text']
""" Inference"""
def inference_vien(audio):
vi_text = speech2text_vi(audio)
en_text = translate_vi2en(vi_text)
return vi_text, en_text
def inference_envi(audio):
en_text = speech2text_en(audio)
vi_text = translate_en2vi(en_text)
return en_text, vi_text
def transcribe_vi(audio, state_vi="", state_en=""):
ds = speech_file_to_array_fn(audio.name)
# infer model
input_values = processor(
ds["speech"],
sampling_rate=ds["sampling_rate"],
return_tensors="pt"
).input_values
# decode ctc output
logits = vi_model(input_values).logits[0]
pred_ids = torch.argmax(logits, dim=-1)
greedy_search_output = processor.decode(pred_ids)
beam_search_output = ngram_lm_model.decode(logits.cpu().detach().numpy(), beam_width=500)
state_vi += beam_search_output + " "
en_text = translate_vi2en(beam_search_output)
state_en += en_text + " "
return state_vi, state_en
def transcribe_en(audio, state_en="", state_vi=""):
speech = load_data(audio)
# Tokenize
input_values = eng_tokenizer(speech, return_tensors="pt").input_values
# Take logits
logits = eng_model(input_values).logits
# Take argmax
predicted_ids = torch.argmax(logits, dim=-1)
# Get the words from predicted word ids
transcription = eng_tokenizer.decode(predicted_ids[0])
# Output is all upper case
transcription = correct_casing(transcription.lower())
state_en += transcription + "+"
vi_text = translate_en2vi(transcription)
state_vi += vi_text + "+"
return state_en, state_vi
def transcribe_vi_1(audio, state_en=""):
ds = speech_file_to_array_fn(audio.name)
# infer model
input_values = processor(
ds["speech"],
sampling_rate=ds["sampling_rate"],
return_tensors="pt"
).input_values
# decode ctc output
logits = vi_model(input_values).logits[0]
pred_ids = torch.argmax(logits, dim=-1)
greedy_search_output = processor.decode(pred_ids)
beam_search_output = ngram_lm_model.decode(logits.cpu().detach().numpy(), beam_width=500)
en_text = translate_vi2en(beam_search_output)
state_en += en_text + " "
return state_en, state_en
def transcribe_en_1(audio, state_vi=""):
speech = load_data(audio)
# Tokenize
input_values = eng_tokenizer(speech, return_tensors="pt").input_values
# Take logits
logits = eng_model(input_values).logits
# Take argmax
predicted_ids = torch.argmax(logits, dim=-1)
# Get the words from predicted word ids
transcription = eng_tokenizer.decode(predicted_ids[0])
# Output is all upper case
transcription = correct_casing(transcription.lower())
vi_text = translate_en2vi(transcription)
state_vi += vi_text + "+"
return state_vi, state_vi
"""Gradio demo"""
vi_example_text = ["Có phải bạn đang muốn tìm mua nhà ở ngoại ô thành phố Hồ Chí Minh không?",
"Ánh mắt ta chạm nhau. Chỉ muốn ngắm anh lâu thật lâu.",
"Nếu như một câu nói có thể khiến em vui."]
vi_example_voice =[['vi_speech_01.wav'], ['vi_speech_02.wav'], ['vi_speech_03.wav']]
en_example_text = ["According to a study by Statista, the global AI market is set to grow up to 54 percent every single year.",
"As one of the world's greatest cities, Air New Zealand is proud to add the Big Apple to its list of 29 international destinations.",
"And yet, earlier this month, I found myself at Halloween Horror Nights at Universal Orlando Resort, one of the most popular Halloween events in the US among hardcore horror buffs."
]
en_example_voice =[['en_speech_01.wav'], ['en_speech_02.wav'], ['en_speech_03.wav']]
with gr.Blocks() as demo:
with gr.Tabs():
with gr.TabItem("Vi-En Realtime Translation"):
gr.Interface(
fn=transcribe_vi_1,
inputs=[
gr.Audio(source="microphone", label="Input Vietnamese Audio", type="file", streaming=True),
"state",
],
outputs= [
"text",
"state",
],
examples=vi_example_voice,
live=True).launch()
with gr.Tabs():
with gr.TabItem("En-Vi Realtime Translation"):
gr.Interface(
fn=transcribe_en_1,
inputs=[
gr.Audio(source="microphone", label="Input English Audio", type="filepath", streaming=True),
"state",
],
outputs= [
"text",
"state",
],
examples=en_example_voice,
live=True).launch()
if __name__ == "__main__":
demo.launch()