devilent2's picture
Update app.py
ecdd55f verified
import torch
import time
import moviepy.editor as mp
import psutil
import gradio as gr
import spaces
from transformers import pipeline
from transformers.pipelines.audio_utils import ffmpeg_read
import base64
import requests
DEFAULT_MODEL_NAME = "distil-whisper/distil-large-v3"
DEFAULT_MODEL_NAME = "openai/whisper-large-v3"
BATCH_SIZE = 8
print('start app')
device = 0 if torch.cuda.is_available() else "cpu"
if device == "cpu":
DEFAULT_MODEL_NAME = "openai/whisper-tiny"
def load_pipeline(model_name):
return pipeline(
task="automatic-speech-recognition",
model=model_name,
chunk_length_s=30,
device=device,
)
pipe = load_pipeline(DEFAULT_MODEL_NAME)
openai_pipe=load_pipeline("openai/whisper-large-v3")
default_pipe = load_pipeline(DEFAULT_MODEL_NAME)
#pipe = None
from gpustat import GPUStatCollection
def update_gpu_status():
if torch.cuda.is_available() == False:
return "No Nvidia Device"
try:
gpu_stats = GPUStatCollection.new_query()
for gpu in gpu_stats:
# Assuming you want to monitor the first GPU, index 0
gpu_id = gpu.index
gpu_name = gpu.name
gpu_utilization = gpu.utilization
memory_used = gpu.memory_used
memory_total = gpu.memory_total
memory_utilization = (memory_used / memory_total) * 100
gpu_status=(f"GPU {gpu_id}: {gpu_name}, Utilization: {gpu_utilization}%, Memory Used: {memory_used}MB, Memory Total: {memory_total}MB, Memory Utilization: {memory_utilization:.2f}%")
return gpu_status
except Exception as e:
print(f"Error getting GPU stats: {e}")
return torch_update_gpu_status()
def torch_update_gpu_status():
if torch.cuda.is_available():
gpu_info = torch.cuda.get_device_name(0)
gpu_memory = torch.cuda.mem_get_info(0)
total_memory = gpu_memory[1] / (1024 * 1024)
free_memory=gpu_memory[0] /(1024 *1024)
used_memory = (gpu_memory[1] - gpu_memory[0]) / (1024 * 1024)
gpu_status = f"GPU: {gpu_info} Free Memory:{free_memory}MB Total Memory: {total_memory:.2f} MB Used Memory: {used_memory:.2f} MB"
else:
gpu_status = "No GPU available"
return gpu_status
def update_cpu_status():
import datetime
# Get the current time
current_time = datetime.datetime.now().time()
# Convert the time to a string
time_str = current_time.strftime("%H:%M:%S")
cpu_percent = psutil.cpu_percent()
cpu_status = f"CPU Usage: {cpu_percent}% {time_str}"
return cpu_status
@spaces.GPU
def update_status():
gpu_status = update_gpu_status()
cpu_status = update_cpu_status()
sys_status=gpu_status+"\n\n"+cpu_status
return sys_status
def refresh_status():
return update_status()
@spaces.GPU
def transcribe(audio_path, model_name):
print(str(time.time())+' start transcribe ')
if audio_path is None:
raise gr.Error("No audio file submitted! Please upload or record an audio file before submitting your request.")
if model_name is None:
model_name=DEFAULT_MODEL_NAME
audio_path=audio_path.strip()
model_name=model_name.strip()
global pipe
if model_name != pipe.model.name_or_path:
print("old model is:"+ pipe.model.name_or_path )
if model_name=="openai/whisper-large-v3":
pipe=openai_pipe
print(str(time.time())+" use openai model " + pipe.model.name_or_path)
elif model_name==DEFAULT_MODEL_NAME:
pipe=default_pipe
print(str(time.time())+" use default model " + pipe.model.name_or_path)
else:
print(str(time.time())+' start load model ' + model_name)
pipe = load_pipeline(model_name)
print(str(time.time())+' finished load model ' + model_name)
start_time = time.time() # Record the start time
print(str(time.time())+' start processing and set recording start time point')
# Load the audio file and calculate its duration
audio = mp.AudioFileClip(audio_path)
audio_duration = audio.duration
print(str(time.time())+' start pipe ')
text = pipe(audio_path, batch_size=BATCH_SIZE, generate_kwargs={"task": "transcribe"}, return_timestamps=True)["text"]
end_time = time.time() # Record the end time
transcription_time = end_time - start_time # Calculate the transcription time
# Create the transcription time output with additional information
transcription_time_output = (
f"Transcription Time: {transcription_time:.2f} seconds\n"
f"Audio Duration: {audio_duration:.2f} seconds\n"
f"Model Used: {model_name}\n"
f"Device Used: {'GPU' if torch.cuda.is_available() else 'CPU'}"
)
print(str(time.time())+' return transcribe '+ text )
return text, transcription_time_output
@spaces.GPU
def handle_upload_audio(audio_path,model_name,old_transcription=''):
print('old_trans:' + old_transcription)
(text,transcription_time_output)=transcribe(audio_path,model_name)
return text+'\n\n'+old_transcription, transcription_time_output
def handle_base64_audio(base64_data, model_name, old_transcription=''):
# Decode base64 data and save it as a temporary audio file
binary_data = base64.b64decode(base64_data)
audio_path = "temp_audio.wav"
with open(audio_path, "wb") as f:
f.write(binary_data)
# Transcribe the audio file
(text, transcription_time_output) = transcribe(audio_path, model_name)
# Remove the temporary audio file
import os
os.remove(audio_path)
return text + '\n\n' + old_transcription, transcription_time_output
graudio=gr.Audio(type="filepath",show_download_button=True)
grmodel_textbox=gr.Textbox(
label="Model Name",
value=DEFAULT_MODEL_NAME,
placeholder="Enter the model name",
info="Some available models: distil-whisper/distil-large-v3 distil-whisper/distil-medium.en Systran/faster-distil-whisper-large-v3 Systran/faster-whisper-large-v3 Systran/faster-whisper-medium openai/whisper-tiny, openai/whisper-base, openai/whisper-medium, openai/whisper-large-v3",
)
groutputs=[gr.TextArea(label="Transcription",elem_id="transcription_textarea",interactive=True,lines=20,show_copy_button=True),
gr.TextArea(label="Transcription Info",interactive=True,show_copy_button=True)]
mf_transcribe = gr.Interface(
fn=handle_upload_audio,
inputs=[
graudio, #"numpy" or filepath
#gr.Radio(["transcribe", "translate"], label="Task", value="transcribe"),
grmodel_textbox,
],
outputs=groutputs,
theme="huggingface",
title="Whisper Transcription",
description=(
"Scroll to Bottom to show system status. "
"Transcribe long-form microphone or audio file after uploaded audio! "
"Notice: the space need some time to get a gpu to run, so there may be a delay "
),
allow_flagging="never",
)
grmodel_textbox_64=gr.Textbox(
label="Model Name",
value=DEFAULT_MODEL_NAME,
placeholder="Enter the model name",
info="Some available models: distil-whisper/distil-large-v3 distil-whisper/distil-medium.en Systran/faster-distil-whisper-large-v3 Systran/faster-whisper-large-v3 Systran/faster-whisper-medium openai/whisper-tiny, openai/whisper-base, openai/whisper-medium, openai/whisper-large-v3",
)
groutputs_64=[gr.TextArea(label="Transcription 64",elem_id="transcription_textarea_64",interactive=True,lines=20,show_copy_button=True),
gr.TextArea(label="Transcription Info 64",interactive=True,show_copy_button=True)]
base_transcribe= gr.Interface(
fn=handle_base64_audio,
inputs=[
gr.Textbox(label="Base64 Audio Data URL", placeholder="Enter the base64 audio data URL"),
grmodel_textbox_64,
],
outputs=groutputs_64,
)
demo = gr.Blocks()
#@spaces.GPU
def onload():
return "System Status: "+update_status();
with demo:
tabbed_interface = gr.TabbedInterface(
[
mf_transcribe,
base_transcribe
],
["Audio", "Base64 Audio"],
)
with gr.Row():
refresh_button = gr.Button("Refresh Status")
sys_status_output = gr.Textbox(label="System Status", interactive=False)
# Link the refresh button to the refresh_status function
refresh_button.click(refresh_status, None, [sys_status_output])
graudio.stop_recording(handle_upload_audio, inputs=[graudio, grmodel_textbox, groutputs[0]], outputs=groutputs)
graudio.upload(handle_upload_audio, inputs=[graudio, grmodel_textbox, groutputs[0]], outputs=groutputs)
# Load the initial status using update_status function
demo.load(onload, inputs=None, outputs=sys_status_output, queue=False)
# Launch the Gradio app
demo.launch(share=True)
print('launched\n\n')