Spaces:
Sleeping
Sleeping
import gradio as gr | |
import whisper | |
import yt_dlp | |
import os | |
import traceback | |
from pydub import AudioSegment | |
from threading import Thread | |
from queue import Queue | |
# Global variable to store the selected model | |
selected_model = None | |
def load_whisper_model(model_name): | |
global selected_model | |
selected_model = whisper.load_model(model_name) | |
return f"Loaded {model_name} model" | |
def chunk_audio(audio_file, chunk_size_ms=30000): | |
audio = AudioSegment.from_file(audio_file) | |
chunks = [audio[i:i+chunk_size_ms] for i in range(0, len(audio), chunk_size_ms)] | |
return chunks | |
def stream_transcription(audio_file): | |
segment_queue = Queue() | |
def transcribe_worker(): | |
try: | |
chunks = chunk_audio(audio_file) | |
for i, chunk in enumerate(chunks): | |
chunk_file = f"temp_chunk_{i}.wav" | |
chunk.export(chunk_file, format="wav") | |
result = selected_model.transcribe(chunk_file) | |
os.remove(chunk_file) | |
for segment in result['segments']: | |
segment_text = f"[{segment['start'] + i*30:.2f}s -> {segment['end'] + i*30:.2f}s] {segment['text']}\n" | |
segment_queue.put(segment_text) | |
segment_queue.put(None) # Signal end of transcription | |
except Exception as e: | |
segment_queue.put(f"Error: {str(e)}") | |
segment_queue.put(None) | |
Thread(target=transcribe_worker).start() | |
full_transcript = "" | |
while True: | |
segment_text = segment_queue.get() | |
if segment_text is None: | |
break | |
if segment_text.startswith("Error"): | |
yield segment_text | |
break | |
full_transcript += segment_text | |
yield full_transcript | |
def download_youtube_audio(youtube_url): | |
ydl_opts = { | |
'format': 'bestaudio/best', | |
'postprocessors': [{ | |
'key': 'FFmpegExtractAudio', | |
'preferredcodec': 'mp3', | |
'preferredquality': '192', | |
}], | |
'outtmpl': 'temp_audio.%(ext)s', | |
} | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
ydl.download([youtube_url]) | |
return "temp_audio.mp3" | |
def process_input(model, input_type, youtube_url=None, audio_file=None): | |
try: | |
yield "Loading Whisper model..." | |
load_whisper_model(model) | |
yield f"Loaded {model} model. " | |
if input_type == "YouTube URL": | |
if youtube_url: | |
yield "Downloading audio from YouTube..." | |
audio_file = download_youtube_audio(youtube_url) | |
yield "Download complete. Starting transcription...\n" | |
else: | |
yield "Please provide a valid YouTube URL." | |
return | |
elif input_type == "Audio File": | |
if not audio_file: | |
yield "Please upload an audio file." | |
return | |
else: | |
yield "Starting transcription...\n" | |
yield from stream_transcription(audio_file) | |
except Exception as e: | |
error_msg = f"An error occurred: {str(e)}\n" | |
error_msg += traceback.format_exc() | |
print(error_msg) | |
yield f"Error: {str(e)}" | |
finally: | |
if input_type == "YouTube URL" and audio_file: | |
os.remove(audio_file) | |
# Define the Gradio interface | |
with gr.Blocks() as iface: | |
gr.Markdown("# Whisper Transcription App") | |
gr.Markdown("Transcribe YouTube videos or audio files using OpenAI's Whisper model. Large files and long videos can take a very long time to process.") | |
with gr.Row(): | |
with gr.Column(): | |
model = gr.Radio( | |
choices=["tiny", "base", "small", "medium", "large"], | |
label="Whisper Model", | |
value="base" | |
) | |
gr.Markdown(""" | |
- tiny: very fast, less accurate | |
- base: medium speed and accuracy | |
- small: balanced speed and accuracy | |
- medium: more accurate, slower | |
- large: most accurate, very slow | |
""") | |
input_type = gr.Radio(["YouTube URL", "Audio File"], label="Input Type") | |
youtube_url = gr.Textbox(label="YouTube URL") | |
audio_file = gr.Audio(label="Audio File", type="filepath") | |
with gr.Row(): | |
submit_button = gr.Button("Submit") | |
clear_button = gr.Button("Clear") | |
with gr.Column(): | |
output = gr.Textbox(label="Transcription", lines=25) | |
submit_button.click( | |
fn=process_input, | |
inputs=[model, input_type, youtube_url, audio_file], | |
outputs=output, | |
api_name="transcribe" | |
) | |
def clear_outputs(): | |
return {youtube_url: "", audio_file: None, output: ""} | |
clear_button.click( | |
fn=clear_outputs, | |
inputs=[], | |
outputs=[youtube_url, audio_file, output], | |
api_name="clear" | |
) | |
# Launch the interface | |
iface.queue().launch(share=True) | |