import gradio as gr from pydub import AudioSegment import os import speech_recognition as sr import concurrent.futures def split_audio(audio_path, chunk_length_ms=60000, overlap_ms=2000): audio = AudioSegment.from_file(audio_path) chunks = [] for i in range(0, len(audio), chunk_length_ms - overlap_ms): chunks.append(audio[i:i + chunk_length_ms]) return chunks def convert_audio_to_wav(input_path, output_path): audio = AudioSegment.from_file(input_path) audio = audio.set_frame_rate(16000).set_channels(1) audio.export(output_path, format="wav") def transcribe_chunk_indexed(indexed_chunk_language): index, chunk, language = indexed_chunk_language recognizer = sr.Recognizer() try: with open(f"chunk_{index}.wav", "wb") as f: chunk.export(f.name, format="wav") with sr.AudioFile(f"chunk_{index}.wav") as source: audio_data = recognizer.record(source) text = recognizer.recognize_google(audio_data, language=language) os.remove(f"chunk_{index}.wav") return index, text except sr.RequestError: return index, "[Error: API unavailable or unresponsive]" except sr.UnknownValueError: return index, "[Error: Unable to recognize speech]" except Exception as e: return index, f"[Error: {str(e)}]" def transcribe_audio_with_google_parallel(audio_path, chunk_length_ms=60000, overlap_ms=2000, language="en-US"): chunks = split_audio(audio_path, chunk_length_ms, overlap_ms) indexed_chunks = [(i, chunk, language) for i, chunk in enumerate(chunks)] transcription = [""] * len(indexed_chunks) with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: futures = {executor.submit(transcribe_chunk_indexed, ic): ic[0] for ic in indexed_chunks} for future in concurrent.futures.as_completed(futures): idx, text = future.result() transcription[idx] = text return " ".join(transcription) def transcribe(audio_file_path, language): if audio_file_path is None: return "Please upload an audio file." try: converted_path = audio_file_path + "_converted.wav" convert_audio_to_wav(audio_file_path, converted_path) temp_path = converted_path except Exception as e: return f"Error processing audio: {e}" transcription = transcribe_audio_with_google_parallel(temp_path, chunk_length_ms=60000, overlap_ms=2000, language=language) try: os.remove(temp_path) except Exception: pass return transcription language_options = { "English (US)": "en-US", "Dutch": "nl-NL", "English (UK)": "en-GB", "Spanish": "es-ES", "French": "fr-FR", "German": "de-DE", "Hindi": "hi-IN", "Chinese (Mandarin)": "zh-CN", "Arabic": "ar-SA", "Turkish": "tr-TR", } with gr.Blocks() as demo: gr.Markdown("# Audio to Text Transcription") gr.Markdown("Upload an audio file, and we'll transcribe it into text using chunk processing.") with gr.Row(): audio_input = gr.Audio(type="filepath", label="Upload audio file (mp3, wav, m4a, ogg)") language_dropdown = gr.Dropdown(list(language_options.keys()), label="Select language", value="English (US)") transcribe_btn = gr.Button("Transcribe") output_text = gr.Textbox(label="Transcription Output", lines=15) def on_transcribe(audio_path, lang_name): lang_code = language_options[lang_name] return transcribe(audio_path, lang_code) transcribe_btn.click(on_transcribe, inputs=[audio_input, language_dropdown], outputs=output_text) demo.launch()