|
import torch |
|
import time |
|
import moviepy.editor as mp |
|
import psutil |
|
import gradio as gr |
|
import spaces |
|
from transformers import pipeline |
|
from transformers.pipelines.audio_utils import ffmpeg_read |
|
|
|
DEFAULT_MODEL_NAME = "distil-whisper/distil-large-v3" |
|
BATCH_SIZE = 8 |
|
|
|
print('start app') |
|
|
|
device = 0 if torch.cuda.is_available() else "cpu" |
|
if device == "cpu": |
|
DEFAULT_MODEL_NAME = "openai/whisper-tiny" |
|
|
|
def load_pipeline(model_name): |
|
return pipeline( |
|
task="automatic-speech-recognition", |
|
model=model_name, |
|
chunk_length_s=30, |
|
device=device, |
|
) |
|
|
|
pipe = load_pipeline(DEFAULT_MODEL_NAME) |
|
openai_pipe=load_pipeline("openai/whisper-large-v3") |
|
default_pipe = load_pipeline(DEFAULT_MODEL_NAME) |
|
|
|
|
|
|
|
|
|
from gpustat import GPUStatCollection |
|
|
|
def update_gpu_status(): |
|
if torch.cuda.is_available() == False: |
|
return "No Nvidia Device" |
|
try: |
|
gpu_stats = GPUStatCollection.new_query() |
|
for gpu in gpu_stats: |
|
|
|
gpu_id = gpu.index |
|
gpu_name = gpu.name |
|
gpu_utilization = gpu.utilization |
|
memory_used = gpu.memory_used |
|
memory_total = gpu.memory_total |
|
memory_utilization = (memory_used / memory_total) * 100 |
|
gpu_status=(f"GPU {gpu_id}: {gpu_name}, Utilization: {gpu_utilization}%, Memory Used: {memory_used}MB, Memory Total: {memory_total}MB, Memory Utilization: {memory_utilization:.2f}%") |
|
return gpu_status |
|
|
|
except Exception as e: |
|
print(f"Error getting GPU stats: {e}") |
|
return torch_update_gpu_status() |
|
|
|
def torch_update_gpu_status(): |
|
if torch.cuda.is_available(): |
|
gpu_info = torch.cuda.get_device_name(0) |
|
gpu_memory = torch.cuda.mem_get_info(0) |
|
total_memory = gpu_memory[1] / (1024 * 1024) |
|
free_memory=gpu_memory[0] /(1024 *1024) |
|
used_memory = (gpu_memory[1] - gpu_memory[0]) / (1024 * 1024) |
|
|
|
gpu_status = f"GPU: {gpu_info} Free Memory:{free_memory}MB Total Memory: {total_memory:.2f} MB Used Memory: {used_memory:.2f} MB" |
|
else: |
|
gpu_status = "No GPU available" |
|
return gpu_status |
|
|
|
def update_cpu_status(): |
|
import datetime |
|
|
|
current_time = datetime.datetime.now().time() |
|
|
|
time_str = current_time.strftime("%H:%M:%S") |
|
|
|
cpu_percent = psutil.cpu_percent() |
|
cpu_status = f"CPU Usage: {cpu_percent}% {time_str}" |
|
return cpu_status |
|
|
|
def update_status(): |
|
gpu_status = update_gpu_status() |
|
cpu_status = update_cpu_status() |
|
sys_status=gpu_status+"\n\n"+cpu_status |
|
return sys_status |
|
|
|
def refresh_status(): |
|
return update_status() |
|
|
|
|
|
@spaces.GPU |
|
def transcribe(audio_path, model_name): |
|
print(str(time.time())+' start transcribe ') |
|
|
|
if audio_path is None: |
|
raise gr.Error("No audio file submitted! Please upload or record an audio file before submitting your request.") |
|
|
|
audio_path=audio_path.strip() |
|
model_name=model_name.strip() |
|
|
|
global pipe |
|
if model_name != pipe.model.name_or_path: |
|
print("old model is:"+ pipe.model.name_or_path ) |
|
if model_name=="openai/whisper-large-v3": |
|
pipe=openai_pipe |
|
print(str(time.time())+" use openai model " + pipe.model.name_or_path) |
|
elif model_name==DEFAULT_MODEL_NAME: |
|
pipe=default_pipe |
|
print(str(time.time())+" use default model " + pipe.model.name_or_path) |
|
else: |
|
print(str(time.time())+' start load model ' + model_name) |
|
pipe = load_pipeline(model_name) |
|
print(str(time.time())+' finished load model ' + model_name) |
|
|
|
start_time = time.time() |
|
print(str(time.time())+' start processing and set recording start time point') |
|
|
|
audio = mp.AudioFileClip(audio_path) |
|
audio_duration = audio.duration |
|
print(str(time.time())+' start pipe ') |
|
text = pipe(audio_path, batch_size=BATCH_SIZE, generate_kwargs={"task": "transcribe"}, return_timestamps=True)["text"] |
|
end_time = time.time() |
|
|
|
transcription_time = end_time - start_time |
|
|
|
|
|
transcription_time_output = ( |
|
f"Transcription Time: {transcription_time:.2f} seconds\n" |
|
f"Audio Duration: {audio_duration:.2f} seconds\n" |
|
f"Model Used: {model_name}\n" |
|
f"Device Used: {'GPU' if torch.cuda.is_available() else 'CPU'}" |
|
) |
|
|
|
print(str(time.time())+' return transcribe '+ text ) |
|
|
|
return text, transcription_time_output |
|
|
|
@spaces.GPU |
|
def handle_upload_audio(audio_path,model_name,old_transcription=''): |
|
print('old_trans:' + old_transcription) |
|
(text,transcription_time_output)=transcribe(audio_path,model_name) |
|
return text+'\n\n'+old_transcription, transcription_time_output |
|
|
|
graudio=gr.Audio(type="filepath",show_download_button=True) |
|
grmodel_textbox=gr.Textbox( |
|
label="Model Name", |
|
value=DEFAULT_MODEL_NAME, |
|
placeholder="Enter the model name", |
|
info="Some available models: distil-whisper/distil-large-v3 distil-whisper/distil-medium.en Systran/faster-distil-whisper-large-v3 Systran/faster-whisper-large-v3 Systran/faster-whisper-medium openai/whisper-tiny, openai/whisper-base, openai/whisper-medium, openai/whisper-large-v3", |
|
) |
|
groutputs=[gr.TextArea(label="Transcription",elem_id="transcription_textarea",interactive=True,lines=20,show_copy_button=True), |
|
gr.TextArea(label="Transcription Info",interactive=True,show_copy_button=True)] |
|
|
|
mf_transcribe = gr.Interface( |
|
fn=handle_upload_audio, |
|
inputs=[ |
|
graudio, |
|
|
|
grmodel_textbox, |
|
], |
|
outputs=groutputs, |
|
theme="huggingface", |
|
title="Whisper Transcription", |
|
description=( |
|
"Scroll to Bottom to show system status. " |
|
"Transcribe long-form microphone or audio file after uploaded audio! " |
|
), |
|
allow_flagging="never", |
|
) |
|
|
|
|
|
demo = gr.Blocks() |
|
|
|
|
|
with demo: |
|
gr.TabbedInterface([mf_transcribe, ], ["Audio",]) |
|
|
|
with gr.Row(): |
|
refresh_button = gr.Button("Refresh Status") |
|
|
|
sys_status_output = gr.Textbox(label="System Status", interactive=False) |
|
|
|
|
|
|
|
refresh_button.click(refresh_status, None, [sys_status_output]) |
|
|
|
|
|
demo.load(update_status, inputs=None, outputs=[sys_status_output], every=2, queue=False) |
|
|
|
graudio.stop_recording(handle_upload_audio,inputs=[graudio,grmodel_textbox,groutputs[0]],outputs=groutputs) |
|
graudio.upload(handle_upload_audio,inputs=[graudio,grmodel_textbox,groutputs[0]],outputs=groutputs) |
|
|
|
|
|
|
|
demo.launch(share=True) |
|
|
|
print('launched\n\n') |
|
|