clearwave-ai / app.py
testingfaces's picture
Update app.py
05094c5 verified
"""
ClearWave AI β€” HuggingFace Spaces
Gradio UI + FastAPI routes
BACKGROUND JOB SYSTEM:
- POST /api/process-url β†’ returns {jobId} instantly (no timeout)
- GET /api/job/{jobId} β†’ poll for progress / result
- Jobs run in background threads β€” handles 1hr+ audio safely
- Job results stored in memory for 1 hour then auto-cleaned
- Gradio UI uses same background thread approach
FIXES IN THIS VERSION:
- Microphone recording tab added (gr.Audio microphone)
- Checkbox boolean fix (Gradio passes None sometimes)
- Debug logging for checkbox values
- Both upload file and microphone supported
"""
import os
import uuid
import json
import base64
import tempfile
import logging
import subprocess
import threading
import time
import requests
import numpy as np
import gradio as gr
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
from denoiser import Denoiser
from transcriber import Transcriber
from translator import Translator
denoiser = Denoiser()
transcriber = Transcriber()
translator = Translator()
LANGUAGES_DISPLAY = {
"Auto Detect": "auto",
"English": "en", "Telugu": "te",
"Hindi": "hi", "Tamil": "ta",
"Kannada": "kn", "Spanish": "es",
"French": "fr", "German": "de",
"Japanese": "ja", "Chinese": "zh",
"Arabic": "ar",
}
OUT_LANGS = {k: v for k, v in LANGUAGES_DISPLAY.items() if k != "Auto Detect"}
# ══════════════════════════════════════════════════════════════════════
# JOB STORE β€” in-memory job registry
# ══════════════════════════════════════════════════════════════════════
_jobs: dict = {}
_jobs_lock = threading.Lock()
JOB_TTL_SEC = 3600
def _new_job() -> str:
job_id = str(uuid.uuid4())
with _jobs_lock:
_jobs[job_id] = {
"status": "queued",
"step": 0,
"message": "Queued...",
"result": None,
"created_at": time.time(),
}
return job_id
def _update_job(job_id: str, **kwargs):
with _jobs_lock:
if job_id in _jobs:
_jobs[job_id].update(kwargs)
def _get_job(job_id: str) -> dict:
with _jobs_lock:
return dict(_jobs.get(job_id, {}))
def _cleanup_loop():
while True:
time.sleep(300)
now = time.time()
with _jobs_lock:
expired = [k for k, v in _jobs.items()
if now - v.get("created_at", 0) > JOB_TTL_SEC]
for k in expired:
del _jobs[k]
if expired:
logger.info(f"[Jobs] Cleaned {len(expired)} expired jobs")
threading.Thread(target=_cleanup_loop, daemon=True).start()
# ══════════════════════════════════════════════════════════════════════
# AUDIO FORMAT CONVERTER
# ══════════════════════════════════════════════════════════════════════
def convert_to_wav(audio_path: str) -> str:
if audio_path is None:
return audio_path
ext = os.path.splitext(audio_path)[1].lower()
if ext in [".wav", ".mp3", ".flac", ".ogg", ".aac"]:
return audio_path
try:
converted = audio_path + "_converted.wav"
result = subprocess.run([
"ffmpeg", "-y", "-i", audio_path,
"-ar", "16000", "-ac", "1", "-acodec", "pcm_s16le", converted
], capture_output=True)
if result.returncode == 0 and os.path.exists(converted):
logger.info(f"Converted {ext} β†’ .wav")
return converted
except Exception as e:
logger.warning(f"Conversion error: {e}")
return audio_path
# ══════════════════════════════════════════════════════════════════════
# CORE PIPELINE
# ══════════════════════════════════════════════════════════════════════
def run_pipeline(audio_path, src_lang="auto", tgt_lang="te",
opt_fillers=True, opt_stutters=True, opt_silences=True,
opt_breaths=True, opt_mouth=True, job_id=None):
def progress(step, message):
update = {"status": "processing", "step": step, "message": message}
if job_id:
_update_job(job_id, **update)
return update
out_dir = tempfile.mkdtemp()
try:
yield progress(1, "⏳ Step 1/5 β€” Denoising...")
denoise1 = denoiser.process(
audio_path, out_dir,
remove_fillers=False, remove_stutters=False,
remove_silences=opt_silences, remove_breaths=opt_breaths,
remove_mouth_sounds=opt_mouth, word_segments=None,
)
clean1 = denoise1['audio_path']
stats = denoise1['stats']
yield progress(2, "⏳ Step 2/5 β€” Transcribing...")
transcript, detected_lang, t_method = transcriber.transcribe(clean1, src_lang)
word_segs = transcriber._last_segments
# Log for debugging
print(f"[Pipeline] Options: fillers={opt_fillers}, stutters={opt_stutters}, "
f"silences={opt_silences}, breaths={opt_breaths}, mouth={opt_mouth}")
print(f"[Pipeline] Word segments available: {len(word_segs)}")
if (opt_fillers or opt_stutters) and word_segs:
yield progress(3, "⏳ Step 3/5 β€” Removing fillers & stutters...")
import soundfile as sf
audio_data, sr = sf.read(clean1)
if audio_data.ndim == 2:
audio_data = audio_data.mean(axis=1)
audio_data = audio_data.astype(np.float32)
if opt_fillers:
audio_data, n_f = denoiser._remove_fillers(audio_data, sr, word_segs)
stats['fillers_removed'] = n_f
transcript = denoiser.clean_transcript_fillers(transcript)
if opt_stutters:
audio_data, n_s = denoiser._remove_stutters(audio_data, sr, word_segs)
stats['stutters_removed'] = n_s
sf.write(clean1, audio_data, sr, subtype="PCM_24")
else:
stats['fillers_removed'] = 0
stats['stutters_removed'] = 0
translation = transcript
tl_method = "same language"
if tgt_lang != "auto" and detected_lang != tgt_lang:
yield progress(4, "⏳ Step 4/5 β€” Translating...")
translation, tl_method = translator.translate(transcript, detected_lang, tgt_lang)
yield progress(5, "⏳ Step 5/5 β€” Summarizing...")
summary = translator.summarize(transcript)
with open(clean1, "rb") as f:
enhanced_b64 = base64.b64encode(f.read()).decode("utf-8")
result = {
"status": "done",
"step": 5,
"message": "βœ… Done!",
"transcript": transcript,
"translation": translation,
"summary": summary,
"audioPath": clean1,
"enhancedAudio": enhanced_b64,
"stats": {
"language": detected_lang.upper(),
"noise_method": stats.get("noise_method", "noisereduce"),
"fillers_removed": stats.get("fillers_removed", 0),
"stutters_removed": stats.get("stutters_removed", 0),
"silences_removed_sec": stats.get("silences_removed_sec", 0),
"breaths_reduced": stats.get("breaths_reduced", False),
"mouth_sounds_removed": stats.get("mouth_sounds_removed", 0),
"transcription_method": t_method,
"translation_method": tl_method,
"processing_sec": stats.get("processing_sec", 0),
"word_segments": len(word_segs),
"transcript_words": len(transcript.split()),
},
}
if job_id:
_update_job(job_id, status="done", step=5,
message="βœ… Done!", result=result)
yield result
except Exception as e:
logger.error(f"Pipeline failed: {e}", exc_info=True)
err = {"status": "error", "message": f"❌ Error: {str(e)}"}
if job_id:
_update_job(job_id, **err)
yield err
# ══════════════════════════════════════════════════════════════════════
# BACKGROUND WORKER
# ══════════════════════════════════════════════════════════════════════
def _run_job_in_background(job_id, audio_path, src_lang, tgt_lang,
opt_fillers, opt_stutters, opt_silences,
opt_breaths, opt_mouth):
try:
for _ in run_pipeline(
audio_path, src_lang, tgt_lang,
opt_fillers, opt_stutters, opt_silences,
opt_breaths, opt_mouth, job_id=job_id
):
pass
except Exception as e:
_update_job(job_id, status="error", message=f"❌ {e}")
finally:
try:
os.unlink(audio_path)
except Exception:
pass
# ══════════════════════════════════════════════════════════════════════
# GRADIO UI
# ══════════════════════════════════════════════════════════════════════
def process_audio_gradio(upload_file, mic_audio, in_lang_name, out_lang_name,
opt_fillers, opt_stutters, opt_silences,
opt_breaths, opt_mouth):
# ── Resolve audio path β€” upload takes priority over mic ──────────
audio_path = None
if upload_file is not None:
if isinstance(upload_file, dict):
audio_path = upload_file.get("name") or upload_file.get("path", "")
else:
audio_path = upload_file
elif mic_audio is not None:
audio_path = mic_audio # gr.Audio(type="filepath") returns path directly
if audio_path is None:
yield ("❌ Please upload a file or record via microphone.", "", "", None, "", "")
return
# ── FIX: Force proper booleans β€” Gradio can pass None or strings ──
opt_fillers = bool(opt_fillers)
opt_stutters = bool(opt_stutters)
opt_silences = bool(opt_silences)
opt_breaths = bool(opt_breaths)
opt_mouth = bool(opt_mouth)
print(f"[Gradio] Checkboxes: fillers={opt_fillers}, stutters={opt_stutters}, "
f"silences={opt_silences}, breaths={opt_breaths}, mouth={opt_mouth}")
# ── Convert format if needed ──────────────────────────────────────
audio_path = convert_to_wav(audio_path)
src_lang = LANGUAGES_DISPLAY.get(in_lang_name, "auto")
tgt_lang = LANGUAGES_DISPLAY.get(out_lang_name, "te")
# ── Start background job ──────────────────────────────────────────
job_id = _new_job()
threading.Thread(
target=_run_job_in_background,
args=(job_id, audio_path, src_lang, tgt_lang,
opt_fillers, opt_stutters, opt_silences,
opt_breaths, opt_mouth),
daemon=True,
).start()
# ── Poll job store and stream progress to Gradio UI ───────────────
while True:
time.sleep(2)
job = _get_job(job_id)
if not job:
yield ("❌ Job not found.", "", "", None, "", "")
return
status = job.get("status")
message = job.get("message", "Processing...")
if status in ("queued", "downloading", "processing"):
yield (message, "", "", None, "", "")
elif status == "done":
result = job.get("result", {})
s = result.get("stats", {})
stats_str = "\n".join([
f"πŸŽ™οΈ Language : {s.get('language','?')}",
f"πŸ”Š Noise method : {s.get('noise_method','?')}",
f"πŸ—‘οΈ Fillers : {s.get('fillers_removed', 0)} removed",
f"πŸ” Stutters : {s.get('stutters_removed', 0)} removed",
f"🀫 Silences : {s.get('silences_removed_sec', 0):.1f}s removed",
f"πŸ“ Transcription : {s.get('transcription_method','?')}",
f"🌐 Translation : {s.get('translation_method','?')}",
f"⏱️ Total time : {s.get('processing_sec', 0):.1f}s",
])
yield (result.get("message", "βœ… Done!"),
result.get("transcript", ""),
result.get("translation", ""),
result.get("audioPath"),
stats_str,
result.get("summary", ""))
return
elif status == "error":
yield (job.get("message", "❌ Error"), "", "", None, "Failed.", "")
return
# ══════════════════════════════════════════════════════════════════════
# GRADIO BLOCKS UI
# ══════════════════════════════════════════════════════════════════════
with gr.Blocks(title="ClearWave AI") as demo:
gr.Markdown("# 🎡 ClearWave AI\n### Professional Audio Enhancement β€” handles 1hr+ audio!")
with gr.Row():
with gr.Column(scale=1):
# ── Two tabs: Upload file OR Record microphone ────────────
with gr.Tabs():
with gr.Tab("πŸ“ Upload File"):
audio_in = gr.File(
label="Upload Audio (MP3, WAV, MPEG, MP4, AAC, OGG, FLAC, AMR...)",
file_types=[
".mp3", ".wav", ".mpeg", ".mpg", ".mp4", ".m4a",
".aac", ".ogg", ".flac", ".opus", ".webm", ".amr",
".wma", ".aiff", ".aif", ".midi", ".mid",
],
)
with gr.Tab("πŸŽ™οΈ Record Microphone"):
mic_in = gr.Audio(
label="Record from Microphone",
sources=["microphone"],
type="filepath",
)
with gr.Row():
in_lang = gr.Dropdown(label="Input Language",
choices=list(LANGUAGES_DISPLAY.keys()),
value="Auto Detect")
out_lang = gr.Dropdown(label="Output Language",
choices=list(OUT_LANGS.keys()),
value="Telugu")
gr.Markdown("### πŸŽ›οΈ Options")
with gr.Row():
opt_fillers = gr.Checkbox(label="πŸ—‘οΈ Remove Fillers", value=True)
opt_stutters = gr.Checkbox(label="πŸ” Remove Stutters", value=True)
with gr.Row():
opt_silences = gr.Checkbox(label="🀫 Remove Silences", value=True)
opt_breaths = gr.Checkbox(label="πŸ’¨ Reduce Breaths", value=True)
with gr.Row():
opt_mouth = gr.Checkbox(label="πŸ‘„ Reduce Mouth Sounds", value=True)
process_btn = gr.Button("πŸš€ Process Audio", variant="primary", size="lg")
status_out = gr.Textbox(label="Status", interactive=False, lines=2)
with gr.Column(scale=2):
with gr.Tabs():
with gr.Tab("πŸ“ Text"):
with gr.Row():
transcript_out = gr.Textbox(label="Transcript", lines=14)
translation_out = gr.Textbox(label="Translation", lines=14)
with gr.Tab("πŸ”Š Clean Audio"):
clean_audio_out = gr.Audio(label="Enhanced Audio", type="filepath")
with gr.Tab("πŸ“Š Stats"):
stats_out = gr.Textbox(label="Stats", lines=12, interactive=False)
with gr.Tab("πŸ“‹ Summary"):
summary_out = gr.Textbox(label="Summary", lines=14)
process_btn.click(
fn=process_audio_gradio,
inputs=[audio_in, mic_in, in_lang, out_lang,
opt_fillers, opt_stutters, opt_silences,
opt_breaths, opt_mouth],
outputs=[status_out, transcript_out, translation_out,
clean_audio_out, stats_out, summary_out]
)
# ══════════════════════════════════════════════════════════════════════
# API ROUTES
# ══════════════════════════════════════════════════════════════════════
import json as _json
from fastapi import Request as _Request
from fastapi.responses import JSONResponse as _JSONResponse
@demo.app.get("/api/health")
async def api_health():
return _JSONResponse({
"status": "ok",
"service": "ClearWave AI on HuggingFace",
"jobs_active": len(_jobs),
})
@demo.app.post("/api/process-url")
async def api_process_url(request: _Request):
data = await request.json()
if "data" in data and isinstance(data["data"], dict):
data = data["data"]
audio_url = data.get("audioUrl")
audio_id = data.get("audioId", "")
src_lang = data.get("srcLang", "auto")
tgt_lang = data.get("tgtLang", "te")
opt_fillers = bool(data.get("optFillers", True))
opt_stutters = bool(data.get("optStutters", True))
opt_silences = bool(data.get("optSilences", True))
opt_breaths = bool(data.get("optBreaths", True))
opt_mouth = bool(data.get("optMouth", True))
if not audio_url:
return _JSONResponse({"error": "audioUrl is required"}, status_code=400)
job_id = _new_job()
_update_job(job_id, status="downloading", message="Downloading audio...")
def _download_and_run():
tmp_path = None
audio_path = None
try:
resp = requests.get(audio_url, timeout=300, stream=True)
resp.raise_for_status()
url_lower = audio_url.lower()
if "wav" in url_lower: suffix = ".wav"
elif "mpeg" in url_lower: suffix = ".mpeg"
elif "mp4" in url_lower: suffix = ".mp4"
elif "m4a" in url_lower: suffix = ".m4a"
else: suffix = ".mp3"
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
tmp_path = tmp.name
downloaded = 0
total = int(resp.headers.get("content-length", 0))
for chunk in resp.iter_content(chunk_size=65536):
if chunk:
tmp.write(chunk)
downloaded += len(chunk)
if total:
pct = int(downloaded * 100 / total)
_update_job(job_id, status="downloading",
message=f"Downloading... {pct}%")
tmp.close()
audio_path = convert_to_wav(tmp_path)
for _ in run_pipeline(
audio_path, src_lang, tgt_lang,
opt_fillers, opt_stutters, opt_silences,
opt_breaths, opt_mouth, job_id=job_id
):
pass
with _jobs_lock:
if job_id in _jobs and _jobs[job_id].get("result"):
_jobs[job_id]["result"]["audioId"] = audio_id
except Exception as e:
logger.error(f"Job {job_id} failed: {e}", exc_info=True)
_update_job(job_id, status="error", message=f"❌ Error: {str(e)}")
finally:
for p in [tmp_path, audio_path]:
try:
if p and os.path.exists(p):
os.unlink(p)
except Exception:
pass
threading.Thread(target=_download_and_run, daemon=True).start()
return _JSONResponse({
"jobId": job_id,
"audioId": audio_id,
"status": "queued",
"pollUrl": f"/api/job/{job_id}",
"message": "Job started! Poll pollUrl for progress.",
})
@demo.app.get("/api/job/{job_id}")
async def api_get_job(job_id: str):
job = _get_job(job_id)
if not job:
return _JSONResponse({"error": "Job not found"}, status_code=404)
response = {
"jobId": job_id,
"status": job.get("status"),
"step": job.get("step", 0),
"message": job.get("message", ""),
}
if job.get("status") == "done":
response["result"] = job.get("result", {})
return _JSONResponse(response)
@demo.app.get("/api/jobs")
async def api_list_jobs():
with _jobs_lock:
summary = {
k: {"status": v["status"], "step": v.get("step", 0),
"message": v.get("message", "")}
for k, v in _jobs.items()
}
return _JSONResponse({"jobs": summary, "total": len(summary)})
logger.info("βœ… Routes: /api/health, /api/process-url, /api/job/{id}, /api/jobs")
if __name__ == "__main__":
demo.launch()