| | import os |
| | import tempfile |
| | import math |
| | import torch |
| | import soundfile as sf |
| | from transformers import pipeline |
| | import gradio as gr |
| | from pydub import AudioSegment |
| |
|
| | |
| | MODEL_CHOICES = { |
| | "Yoruba (EYEDOL/Yoruba-ASRNEW)": "EYEDOL/Yoruba-ASRNEW", |
| | "Naija English (EYEDOL/NAIJA_ENG-ASRNEW)": "EYEDOL/NAIJA_ENG-ASRNEW", |
| | } |
| |
|
| | |
| | DEVICE = 0 if torch.cuda.is_available() else -1 |
| |
|
| | |
| | PIPELINE_CACHE = {} |
| |
|
| | def get_asr_pipeline(model_id: str): |
| | """Return a cached pipeline for model_id or create a new one.""" |
| | if model_id in PIPELINE_CACHE: |
| | return PIPELINE_CACHE[model_id] |
| | |
| | asr = pipeline("automatic-speech-recognition", model=model_id, device=DEVICE) |
| | PIPELINE_CACHE[model_id] = asr |
| | return asr |
| |
|
| | |
| | def save_numpy_to_wav(np_tuple): |
| | samplerate, data = np_tuple |
| | tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") |
| | sf.write(tmp.name, data, samplerate) |
| | return tmp.name |
| |
|
| | def get_duration_seconds(path): |
| | try: |
| | info = sf.info(path) |
| | return info.duration |
| | except Exception: |
| | seg = AudioSegment.from_file(path) |
| | return len(seg) / 1000.0 |
| |
|
| | def split_audio_file(path, chunk_length_ms=25000, overlap_ms=500): |
| | audio = AudioSegment.from_file(path) |
| | duration_ms = len(audio) |
| | chunks = [] |
| | start = 0 |
| | while start < duration_ms: |
| | end = min(start + chunk_length_ms, duration_ms) |
| | chunk = audio[start:end] |
| | tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".wav") |
| | chunk.export(tmp.name, format="wav") |
| | chunks.append((tmp.name, start, end)) |
| | start += max(1, chunk_length_ms - overlap_ms) |
| | return chunks |
| |
|
| | def transcribe_file_with_pipeline(asr_pipeline, path, return_timestamps=False): |
| | |
| | if return_timestamps: |
| | return asr_pipeline(path, return_timestamps=True) |
| | else: |
| | return asr_pipeline(path) |
| |
|
| | def transcribe(audio_input, model_id, allow_longform_with_timestamps=False, chunk_length_seconds=25, overlap_seconds=0.5): |
| | """ |
| | audio_input: either (sr, numpy_array) from mic (type="numpy") or filepath from upload (type="filepath") |
| | model_id: Hugging Face model id string |
| | Returns dict: {"full_text": str, "segments": [{start_s,end_s,text}, ...]} |
| | """ |
| | if audio_input is None: |
| | return {"error": "No audio provided."} |
| |
|
| | |
| | created_tmp_input = False |
| | if isinstance(audio_input, tuple): |
| | audio_path = save_numpy_to_wav(audio_input) |
| | created_tmp_input = True |
| | else: |
| | audio_path = audio_input |
| |
|
| | duration_s = get_duration_seconds(audio_path) |
| | asr = get_asr_pipeline(model_id) |
| |
|
| | |
| | if duration_s <= 30: |
| | out = transcribe_file_with_pipeline(asr, audio_path, return_timestamps=False) |
| | text = out.get("text", out) if isinstance(out, dict) else str(out) |
| | segments = [{"start_s": 0.0, "end_s": duration_s, "text": text}] |
| | full_text = text |
| | if created_tmp_input: |
| | try: os.unlink(audio_path) |
| | except: pass |
| | return {"full_text": full_text, "segments": segments} |
| |
|
| | |
| | if allow_longform_with_timestamps: |
| | try: |
| | out = transcribe_file_with_pipeline(asr, audio_path, return_timestamps=True) |
| | |
| | full_text = out.get("text", None) if isinstance(out, dict) else str(out) |
| | segments = [] |
| |
|
| | if isinstance(out, dict): |
| | if "chunks" in out and isinstance(out["chunks"], list): |
| | for c in out["chunks"]: |
| | |
| | ts = c.get("timestamp", None) |
| | if isinstance(ts, list) and len(ts) == 2: |
| | start_s, end_s = ts[0], ts[1] |
| | else: |
| | start_s = c.get("start", None) |
| | end_s = c.get("end", None) |
| | segments.append({"start_s": start_s, "end_s": end_s, "text": c.get("text", "")}) |
| | elif "segments" in out and isinstance(out["segments"], list): |
| | for s in out["segments"]: |
| | segments.append({"start_s": s.get("start", None), "end_s": s.get("end", None), "text": s.get("text", "")}) |
| | elif "words" in out and isinstance(out["words"], list): |
| | for w in out["words"]: |
| | segments.append({"start_s": w.get("start", None), "end_s": w.get("end", None), "text": w.get("word", "")}) |
| | else: |
| | |
| | if full_text is None: |
| | full_text = str(out) |
| | segments = [{"start_s": 0.0, "end_s": duration_s, "text": full_text}] |
| | else: |
| | |
| | full_text = str(out) |
| | segments = [{"start_s": 0.0, "end_s": duration_s, "text": full_text}] |
| |
|
| | if created_tmp_input: |
| | try: os.unlink(audio_path) |
| | except: pass |
| | return {"full_text": full_text, "segments": segments} |
| | except Exception as e: |
| | |
| | print("Long-form timestamps failed; falling back to chunking:", e) |
| |
|
| | |
| | chunk_length_ms = int(chunk_length_seconds * 1000) |
| | overlap_ms = int(overlap_seconds * 1000) |
| | chunks = split_audio_file(audio_path, chunk_length_ms=chunk_length_ms, overlap_ms=overlap_ms) |
| | stitched = [] |
| | segments = [] |
| | for chunk_path, start_ms, end_ms in chunks: |
| | try: |
| | out = transcribe_file_with_pipeline(asr, chunk_path, return_timestamps=False) |
| | text = out.get("text", out) if isinstance(out, dict) else str(out) |
| | except Exception as e: |
| | text = f"[ERROR on chunk: {e}]" |
| | start_s = start_ms / 1000.0 |
| | end_s = end_ms / 1000.0 |
| | segments.append({"start_s": start_s, "end_s": end_s, "text": text}) |
| | stitched.append(text) |
| | try: os.unlink(chunk_path) |
| | except: pass |
| |
|
| | if created_tmp_input: |
| | try: os.unlink(audio_path) |
| | except: pass |
| |
|
| | full_text = " ".join([s for s in stitched if s]) |
| | return {"full_text": full_text, "segments": segments} |
| |
|
| | |
| | with gr.Blocks(title="EYEDOL ASR — Multi-model (Yoruba + Naija English)") as demo: |
| | gr.Markdown("## EYEDOL ASR Demo\nSelect model, upload audio or use the microphone. Supports long audio via chunking or model long-form timestamps.") |
| |
|
| | with gr.Row(): |
| | with gr.Column(scale=2): |
| | model_choice = gr.Dropdown(list(MODEL_CHOICES.keys()), value=list(MODEL_CHOICES.keys())[0], label="Choose model") |
| | mic_input = gr.Audio(label="Record (click Record → Stop)", type="numpy") |
| | file_input = gr.Audio(label="Or upload audio file", type="filepath") |
| | source = gr.Radio(["Use microphone input", "Use uploaded file"], value="Use microphone input", label="Input source") |
| | longform = gr.Checkbox(label="Try model's built-in long-form timestamps (if supported)", value=False) |
| | chunk_len = gr.Slider(minimum=10, maximum=120, value=25, step=5, label="Chunk length (seconds)") |
| | overlap = gr.Slider(minimum=0.0, maximum=5.0, value=0.5, step=0.5, label="Chunk overlap (seconds)") |
| | transcribe_btn = gr.Button("Transcribe") |
| | gr.Markdown("**Note:** If a model is private add `HF_TOKEN` as a secret in Space settings. GPU recommended for best performance.") |
| | with gr.Column(scale=3): |
| | full_text_out = gr.Textbox(label="Full transcription", lines=8) |
| | segments_out = gr.JSON(label="Segments (start_s, end_s, text)") |
| |
|
| | def handle_transcription(mic_input, file_input, source_choice, model_label, use_longform, chunk_len_s, overlap_s): |
| | model_id = MODEL_CHOICES.get(model_label) |
| | audio_src = mic_input if source_choice == "Use microphone input" else file_input |
| | res = transcribe(audio_src, model_id=model_id, allow_longform_with_timestamps=use_longform, chunk_length_seconds=chunk_len_s, overlap_seconds=overlap_s) |
| | if "error" in res: |
| | return res["error"], [] |
| | return res["full_text"], res["segments"] |
| |
|
| | transcribe_btn.click( |
| | fn=handle_transcription, |
| | inputs=[mic_input, file_input, source, model_choice, longform, chunk_len, overlap], |
| | outputs=[full_text_out, segments_out], |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | demo.launch() |
| |
|