import torch import time import moviepy.editor as mp import psutil import gradio as gr import spaces from transformers import pipeline from transformers.pipelines.audio_utils import ffmpeg_read import base64 import requests DEFAULT_MODEL_NAME = "distil-whisper/distil-large-v3" DEFAULT_MODEL_NAME = "openai/whisper-large-v3" BATCH_SIZE = 8 print('start app') device = 0 if torch.cuda.is_available() else "cpu" if device == "cpu": DEFAULT_MODEL_NAME = "openai/whisper-tiny" def load_pipeline(model_name): return pipeline( task="automatic-speech-recognition", model=model_name, chunk_length_s=30, device=device, ) pipe = load_pipeline(DEFAULT_MODEL_NAME) openai_pipe=load_pipeline("openai/whisper-large-v3") default_pipe = load_pipeline(DEFAULT_MODEL_NAME) #pipe = None from gpustat import GPUStatCollection def update_gpu_status(): if torch.cuda.is_available() == False: return "No Nvidia Device" try: gpu_stats = GPUStatCollection.new_query() for gpu in gpu_stats: # Assuming you want to monitor the first GPU, index 0 gpu_id = gpu.index gpu_name = gpu.name gpu_utilization = gpu.utilization memory_used = gpu.memory_used memory_total = gpu.memory_total memory_utilization = (memory_used / memory_total) * 100 gpu_status=(f"GPU {gpu_id}: {gpu_name}, Utilization: {gpu_utilization}%, Memory Used: {memory_used}MB, Memory Total: {memory_total}MB, Memory Utilization: {memory_utilization:.2f}%") return gpu_status except Exception as e: print(f"Error getting GPU stats: {e}") return torch_update_gpu_status() def torch_update_gpu_status(): if torch.cuda.is_available(): gpu_info = torch.cuda.get_device_name(0) gpu_memory = torch.cuda.mem_get_info(0) total_memory = gpu_memory[1] / (1024 * 1024) free_memory=gpu_memory[0] /(1024 *1024) used_memory = (gpu_memory[1] - gpu_memory[0]) / (1024 * 1024) gpu_status = f"GPU: {gpu_info} Free Memory:{free_memory}MB Total Memory: {total_memory:.2f} MB Used Memory: {used_memory:.2f} MB" else: gpu_status = "No GPU available" return gpu_status def update_cpu_status(): import datetime # Get the current time current_time = datetime.datetime.now().time() # Convert the time to a string time_str = current_time.strftime("%H:%M:%S") cpu_percent = psutil.cpu_percent() cpu_status = f"CPU Usage: {cpu_percent}% {time_str}" return cpu_status @spaces.GPU def update_status(): gpu_status = update_gpu_status() cpu_status = update_cpu_status() sys_status=gpu_status+"\n\n"+cpu_status return sys_status def refresh_status(): return update_status() @spaces.GPU def transcribe(audio_path, model_name): print(str(time.time())+' start transcribe ') if audio_path is None: raise gr.Error("No audio file submitted! Please upload or record an audio file before submitting your request.") if model_name is None: model_name=DEFAULT_MODEL_NAME audio_path=audio_path.strip() model_name=model_name.strip() global pipe if model_name != pipe.model.name_or_path: print("old model is:"+ pipe.model.name_or_path ) if model_name=="openai/whisper-large-v3": pipe=openai_pipe print(str(time.time())+" use openai model " + pipe.model.name_or_path) elif model_name==DEFAULT_MODEL_NAME: pipe=default_pipe print(str(time.time())+" use default model " + pipe.model.name_or_path) else: print(str(time.time())+' start load model ' + model_name) pipe = load_pipeline(model_name) print(str(time.time())+' finished load model ' + model_name) start_time = time.time() # Record the start time print(str(time.time())+' start processing and set recording start time point') # Load the audio file and calculate its duration audio = mp.AudioFileClip(audio_path) audio_duration = audio.duration print(str(time.time())+' start pipe ') text = pipe(audio_path, batch_size=BATCH_SIZE, generate_kwargs={"task": "transcribe"}, return_timestamps=True)["text"] end_time = time.time() # Record the end time transcription_time = end_time - start_time # Calculate the transcription time # Create the transcription time output with additional information transcription_time_output = ( f"Transcription Time: {transcription_time:.2f} seconds\n" f"Audio Duration: {audio_duration:.2f} seconds\n" f"Model Used: {model_name}\n" f"Device Used: {'GPU' if torch.cuda.is_available() else 'CPU'}" ) print(str(time.time())+' return transcribe '+ text ) return text, transcription_time_output @spaces.GPU def handle_upload_audio(audio_path,model_name,old_transcription=''): print('old_trans:' + old_transcription) (text,transcription_time_output)=transcribe(audio_path,model_name) return text+'\n\n'+old_transcription, transcription_time_output def handle_base64_audio(base64_data, model_name, old_transcription=''): # Decode base64 data and save it as a temporary audio file binary_data = base64.b64decode(base64_data) audio_path = "temp_audio.wav" with open(audio_path, "wb") as f: f.write(binary_data) # Transcribe the audio file (text, transcription_time_output) = transcribe(audio_path, model_name) # Remove the temporary audio file import os os.remove(audio_path) return text + '\n\n' + old_transcription, transcription_time_output graudio=gr.Audio(type="filepath",show_download_button=True) grmodel_textbox=gr.Textbox( label="Model Name", value=DEFAULT_MODEL_NAME, placeholder="Enter the model name", info="Some available models: distil-whisper/distil-large-v3 distil-whisper/distil-medium.en Systran/faster-distil-whisper-large-v3 Systran/faster-whisper-large-v3 Systran/faster-whisper-medium openai/whisper-tiny, openai/whisper-base, openai/whisper-medium, openai/whisper-large-v3", ) groutputs=[gr.TextArea(label="Transcription",elem_id="transcription_textarea",interactive=True,lines=20,show_copy_button=True), gr.TextArea(label="Transcription Info",interactive=True,show_copy_button=True)] mf_transcribe = gr.Interface( fn=handle_upload_audio, inputs=[ graudio, #"numpy" or filepath #gr.Radio(["transcribe", "translate"], label="Task", value="transcribe"), grmodel_textbox, ], outputs=groutputs, theme="huggingface", title="Whisper Transcription", description=( "Scroll to Bottom to show system status. " "Transcribe long-form microphone or audio file after uploaded audio! " "Notice: the space need some time to get a gpu to run, so there may be a delay " ), allow_flagging="never", ) grmodel_textbox_64=gr.Textbox( label="Model Name", value=DEFAULT_MODEL_NAME, placeholder="Enter the model name", info="Some available models: distil-whisper/distil-large-v3 distil-whisper/distil-medium.en Systran/faster-distil-whisper-large-v3 Systran/faster-whisper-large-v3 Systran/faster-whisper-medium openai/whisper-tiny, openai/whisper-base, openai/whisper-medium, openai/whisper-large-v3", ) groutputs_64=[gr.TextArea(label="Transcription 64",elem_id="transcription_textarea_64",interactive=True,lines=20,show_copy_button=True), gr.TextArea(label="Transcription Info 64",interactive=True,show_copy_button=True)] base_transcribe= gr.Interface( fn=handle_base64_audio, inputs=[ gr.Textbox(label="Base64 Audio Data URL", placeholder="Enter the base64 audio data URL"), grmodel_textbox_64, ], outputs=groutputs_64, ) demo = gr.Blocks() #@spaces.GPU def onload(): return "System Status: "+update_status(); with demo: tabbed_interface = gr.TabbedInterface( [ mf_transcribe, base_transcribe ], ["Audio", "Base64 Audio"], ) with gr.Row(): refresh_button = gr.Button("Refresh Status") sys_status_output = gr.Textbox(label="System Status", interactive=False) # Link the refresh button to the refresh_status function refresh_button.click(refresh_status, None, [sys_status_output]) graudio.stop_recording(handle_upload_audio, inputs=[graudio, grmodel_textbox, groutputs[0]], outputs=groutputs) graudio.upload(handle_upload_audio, inputs=[graudio, grmodel_textbox, groutputs[0]], outputs=groutputs) # Load the initial status using update_status function demo.load(onload, inputs=None, outputs=sys_status_output, queue=False) # Launch the Gradio app demo.launch(share=True) print('launched\n\n')