Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import whisper | |
| import yt_dlp | |
| import os | |
| import traceback | |
| from pydub import AudioSegment | |
| from threading import Thread | |
| from queue import Queue | |
| # Global variable to store the selected model | |
| selected_model = None | |
| def load_whisper_model(model_name): | |
| global selected_model | |
| selected_model = whisper.load_model(model_name) | |
| return f"Loaded {model_name} model" | |
| def chunk_audio(audio_file, chunk_size_ms=30000): | |
| audio = AudioSegment.from_file(audio_file) | |
| chunks = [audio[i:i+chunk_size_ms] for i in range(0, len(audio), chunk_size_ms)] | |
| return chunks | |
| def stream_transcription(audio_file): | |
| segment_queue = Queue() | |
| def transcribe_worker(): | |
| try: | |
| chunks = chunk_audio(audio_file) | |
| for i, chunk in enumerate(chunks): | |
| chunk_file = f"temp_chunk_{i}.wav" | |
| chunk.export(chunk_file, format="wav") | |
| result = selected_model.transcribe(chunk_file) | |
| os.remove(chunk_file) | |
| for segment in result['segments']: | |
| segment_text = f"[{segment['start'] + i*30:.2f}s -> {segment['end'] + i*30:.2f}s] {segment['text']}\n" | |
| segment_queue.put(segment_text) | |
| segment_queue.put(None) # Signal end of transcription | |
| except Exception as e: | |
| segment_queue.put(f"Error: {str(e)}") | |
| segment_queue.put(None) | |
| Thread(target=transcribe_worker).start() | |
| full_transcript = "" | |
| while True: | |
| segment_text = segment_queue.get() | |
| if segment_text is None: | |
| break | |
| if segment_text.startswith("Error"): | |
| yield segment_text | |
| break | |
| full_transcript += segment_text | |
| yield full_transcript | |
| def download_youtube_audio(youtube_url): | |
| ydl_opts = { | |
| 'format': 'bestaudio/best', | |
| 'postprocessors': [{ | |
| 'key': 'FFmpegExtractAudio', | |
| 'preferredcodec': 'mp3', | |
| 'preferredquality': '192', | |
| }], | |
| 'outtmpl': 'temp_audio.%(ext)s', | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| ydl.download([youtube_url]) | |
| return "temp_audio.mp3" | |
| def process_input(model, input_type, youtube_url=None, audio_file=None): | |
| try: | |
| yield "Loading Whisper model..." | |
| load_whisper_model(model) | |
| yield f"Loaded {model} model. " | |
| if input_type == "YouTube URL": | |
| if youtube_url: | |
| yield "Downloading audio from YouTube..." | |
| audio_file = download_youtube_audio(youtube_url) | |
| yield "Download complete. Starting transcription...\n" | |
| else: | |
| yield "Please provide a valid YouTube URL." | |
| return | |
| elif input_type == "Audio File": | |
| if not audio_file: | |
| yield "Please upload an audio file." | |
| return | |
| else: | |
| yield "Starting transcription...\n" | |
| yield from stream_transcription(audio_file) | |
| except Exception as e: | |
| error_msg = f"An error occurred: {str(e)}\n" | |
| error_msg += traceback.format_exc() | |
| print(error_msg) | |
| yield f"Error: {str(e)}" | |
| finally: | |
| if input_type == "YouTube URL" and audio_file: | |
| os.remove(audio_file) | |
| # Define the Gradio interface | |
| with gr.Blocks() as iface: | |
| gr.Markdown("# Whisper Transcription App") | |
| gr.Markdown("Transcribe YouTube videos or audio files using OpenAI's Whisper model. Large files and long videos can take a very long time to process.") | |
| with gr.Row(): | |
| with gr.Column(): | |
| model = gr.Radio( | |
| choices=["tiny", "base", "small", "medium", "large"], | |
| label="Whisper Model", | |
| value="base" | |
| ) | |
| gr.Markdown(""" | |
| - tiny: very fast, less accurate | |
| - base: medium speed and accuracy | |
| - small: balanced speed and accuracy | |
| - medium: more accurate, slower | |
| - large: most accurate, very slow | |
| """) | |
| input_type = gr.Radio(["YouTube URL", "Audio File"], label="Input Type") | |
| youtube_url = gr.Textbox(label="YouTube URL") | |
| audio_file = gr.Audio(label="Audio File", type="filepath") | |
| with gr.Row(): | |
| submit_button = gr.Button("Submit") | |
| clear_button = gr.Button("Clear") | |
| with gr.Column(): | |
| output = gr.Textbox(label="Transcription", lines=25) | |
| submit_button.click( | |
| fn=process_input, | |
| inputs=[model, input_type, youtube_url, audio_file], | |
| outputs=output, | |
| api_name="transcribe" | |
| ) | |
| def clear_outputs(): | |
| return {youtube_url: "", audio_file: None, output: ""} | |
| clear_button.click( | |
| fn=clear_outputs, | |
| inputs=[], | |
| outputs=[youtube_url, audio_file, output], | |
| api_name="clear" | |
| ) | |
| # Launch the interface | |
| iface.queue().launch(share=True) | |