|
import gradio as gr |
|
import torch |
|
import time |
|
import librosa |
|
import soundfile |
|
import nemo.collections.asr as nemo_asr |
|
import tempfile |
|
import os |
|
import uuid |
|
|
|
from transformers import BlenderbotTokenizer, BlenderbotForConditionalGeneration |
|
import torch |
|
|
|
|
|
import os |
|
import csv |
|
import gradio as gr |
|
from gradio import inputs, outputs |
|
import huggingface_hub |
|
from huggingface_hub import Repository, hf_hub_download, upload_file |
|
from datetime import datetime |
|
|
|
|
|
|
|
|
|
|
|
DATASET_REPO_URL = "https://huggingface.co/datasets/awacke1/ASRLive.csv" |
|
DATASET_REPO_ID = "awacke1/ASRLive.csv" |
|
DATA_FILENAME = "ASRLive.csv" |
|
DATA_FILE = os.path.join("data", DATA_FILENAME) |
|
HF_TOKEN = os.environ.get("HF_TOKEN") |
|
|
|
PersistToDataset = False |
|
|
|
|
|
if PersistToDataset: |
|
try: |
|
hf_hub_download( |
|
repo_id=DATASET_REPO_ID, |
|
filename=DATA_FILENAME, |
|
cache_dir=DATA_DIRNAME, |
|
force_filename=DATA_FILENAME |
|
) |
|
except: |
|
print("file not found") |
|
repo = Repository( |
|
local_dir="data", clone_from=DATASET_REPO_URL, use_auth_token=HF_TOKEN |
|
) |
|
|
|
def store_message(name: str, message: str): |
|
if name and message: |
|
with open(DATA_FILE, "a") as csvfile: |
|
writer = csv.DictWriter(csvfile, fieldnames=["name", "message", "time"]) |
|
writer.writerow( |
|
{"name": name.strip(), "message": message.strip(), "time": str(datetime.now())} |
|
) |
|
|
|
commit_url = repo.push_to_hub() |
|
ret = "" |
|
with open(DATA_FILE, "r") as csvfile: |
|
reader = csv.DictReader(csvfile) |
|
|
|
for row in reader: |
|
ret += row |
|
ret += "\r\n" |
|
return ret |
|
|
|
|
|
mname = "facebook/blenderbot-400M-distill" |
|
model = BlenderbotForConditionalGeneration.from_pretrained(mname) |
|
tokenizer = BlenderbotTokenizer.from_pretrained(mname) |
|
|
|
def take_last_tokens(inputs, note_history, history): |
|
filterTokenCount = 128 |
|
if inputs['input_ids'].shape[1] > filterTokenCount: |
|
inputs['input_ids'] = torch.tensor([inputs['input_ids'][0][-filterTokenCount:].tolist()]) |
|
inputs['attention_mask'] = torch.tensor([inputs['attention_mask'][0][-filterTokenCount:].tolist()]) |
|
note_history = ['</s> <s>'.join(note_history[0].split('</s> <s>')[2:])] |
|
history = history[1:] |
|
return inputs, note_history, history |
|
|
|
def add_note_to_history(note, note_history): |
|
note_history.append(note) |
|
note_history = '</s> <s>'.join(note_history) |
|
return [note_history] |
|
|
|
|
|
|
|
SAMPLE_RATE = 16000 |
|
model = nemo_asr.models.EncDecRNNTBPEModel.from_pretrained("nvidia/stt_en_conformer_transducer_xlarge") |
|
model.change_decoding_strategy(None) |
|
model.eval() |
|
|
|
def process_audio_file(file): |
|
data, sr = librosa.load(file) |
|
if sr != SAMPLE_RATE: |
|
data = librosa.resample(data, orig_sr=sr, target_sr=SAMPLE_RATE) |
|
data = librosa.to_mono(data) |
|
return data |
|
|
|
|
|
def transcribe(audio, state = ""): |
|
if state is None: |
|
state = "" |
|
audio_data = process_audio_file(audio) |
|
with tempfile.TemporaryDirectory() as tmpdir: |
|
audio_path = os.path.join(tmpdir, f'audio_{uuid.uuid4()}.wav') |
|
soundfile.write(audio_path, audio_data, SAMPLE_RATE) |
|
transcriptions = model.transcribe([audio_path]) |
|
if type(transcriptions) == tuple and len(transcriptions) == 2: |
|
transcriptions = transcriptions[0] |
|
transcriptions = transcriptions[0] |
|
|
|
if PersistToDataset: |
|
ret = store_message(transcriptions, state) |
|
state = state + transcriptions + " " + ret |
|
else: |
|
state = state + transcriptions |
|
return state, state |
|
|
|
gr.Interface( |
|
fn=transcribe, |
|
inputs=[ |
|
gr.Audio(source="microphone", type='filepath', streaming=True), |
|
"state", |
|
], |
|
outputs=[ |
|
"textbox", |
|
"state" |
|
], |
|
layout="horizontal", |
|
theme="huggingface", |
|
title="🗣️ASR-Live🧠Memory💾", |
|
description=f"Live Automatic Speech Recognition (ASR) with Memory💾 Dataset.", |
|
allow_flagging='never', |
|
live=True, |
|
article=f"Result Output Saved to Memory💾 Dataset: [{DATASET_REPO_URL}]({DATASET_REPO_URL})" |
|
).launch(debug=True) |