Spaces:
Running
Running
| """ | |
| ClearWave AI β API Space (FastAPI) | |
| =================================== | |
| Pipeline: | |
| 1. Download audio from URL | |
| 2. Denoise β Cleanvoice SDK (tested & working) | |
| 3. Transcribe β Groq Whisper large-v3 / faster-whisper fallback | |
| 4. Translate β Helsinki-NLP / NLLB / Google fallback | |
| 5. Summarize β Extractive (position-scored) | |
| 6. Upload to Cloudinary | |
| Environment vars: CLEANVOICE_API_KEY, CLOUD_NAME, API_KEY, API_SECRET, GROQ_API_KEY | |
| """ | |
| import os | |
| import json | |
| import time | |
| import tempfile | |
| import logging | |
| import shutil | |
| import requests | |
| import cloudinary | |
| import cloudinary.uploader | |
| from fastapi import FastAPI, Request | |
| from fastapi.responses import StreamingResponse, JSONResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from denoiser import Denoiser # β Your working denoiser | |
| from transcriber import Transcriber # Your transcriber | |
| from translator import Translator # Your translator | |
| # ββ Cloudinary config βββββββββββββββββββββββββββββββββββββββββββββ | |
| cloudinary.config( | |
| cloud_name=os.environ.get("CLOUD_NAME"), | |
| api_key=os.environ.get("API_KEY"), | |
| api_secret=os.environ.get("API_SECRET"), | |
| ) | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # ββ Singletons (loaded once at startup) ββββββββββββββββββββββββββ | |
| denoiser = Denoiser() | |
| transcriber = Transcriber() | |
| translator = Translator() | |
| app = FastAPI(title="ClearWave AI API") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| def _safe_cleanup(out_dir): | |
| """Remove temp directory safely β never raises.""" | |
| try: | |
| if hasattr(denoiser, 'cleanup_temp_files'): | |
| denoiser.cleanup_temp_files(out_dir) | |
| elif out_dir and os.path.exists(out_dir): | |
| shutil.rmtree(out_dir, ignore_errors=True) | |
| except Exception as e: | |
| logger.warning(f"Cleanup warning (non-fatal): {e}") | |
| def run_pipeline(audio_path, src_lang="auto", tgt_lang="te", | |
| opt_fillers=True, opt_stutters=True, opt_silences=True): | |
| """ | |
| Generator β yields SSE-style dicts at each step. | |
| """ | |
| out_dir = tempfile.mkdtemp() | |
| word_segs = [] | |
| stats = {} | |
| try: | |
| # ββ Step 1: Cleanvoice API ββββββββββββββββββββββββββββββββ | |
| yield {"status": "processing", "step": 1, | |
| "message": "Step 1/4 β Enhancing audio with Cleanvoice..."} | |
| result = denoiser.process( | |
| audio_path, out_dir, | |
| fillers=opt_fillers, | |
| stutters=opt_stutters, | |
| long_silences=opt_silences, | |
| ) | |
| clean_audio = result["audio_path"] | |
| stats = { | |
| "noise_method": "Cleanvoice API", | |
| "fillers": opt_fillers, | |
| "stutters": opt_stutters, | |
| "silences": opt_silences, | |
| } | |
| logger.info("Cleanvoice enhancement complete") | |
| # ββ Step 2: Transcribe βββββββββββββββββββββββββββββββββββββ | |
| yield {"status": "processing", "step": 2, | |
| "message": "Step 2/4 β Transcribing..."} | |
| transcript, detected_lang, t_method = transcriber.transcribe(clean_audio, src_lang) | |
| word_segs = transcriber._last_segments | |
| # Clean transcript fillers too | |
| if opt_fillers: | |
| transcript = denoiser.clean_transcript_fillers(transcript) | |
| logger.info(f"Transcription: {len(transcript.split())} words, lang={detected_lang}") | |
| # ββ Step 3: Translate ββββββββββββββββββββββββββββββββββββββ | |
| translation = transcript | |
| tl_method = "same language" | |
| if tgt_lang != "auto" and detected_lang != tgt_lang: | |
| yield {"status": "processing", "step": 3, | |
| "message": "Step 3/4 β Translating..."} | |
| translation, tl_method = translator.translate(transcript, detected_lang, tgt_lang) | |
| logger.info(f"Translation done via {tl_method}") | |
| else: | |
| yield {"status": "processing", "step": 3, | |
| "message": "Step 3/4 β Skipping translation (same language)"} | |
| # ββ Step 4: Summarize & Cloudinary ββββββββββββββββββββββββ | |
| yield {"status": "processing", "step": 4, | |
| "message": "Step 4/4 β Summarizing & uploading..."} | |
| summary = translator.summarize(transcript) | |
| enhanced_url = None | |
| try: | |
| upload_result = cloudinary.uploader.upload( | |
| clean_audio, | |
| resource_type="video", | |
| folder="clearwave_enhanced", | |
| ) | |
| enhanced_url = upload_result["secure_url"] | |
| logger.info(f"Cloudinary upload: {enhanced_url}") | |
| except Exception as e: | |
| logger.error(f"Cloudinary failed: {e}") | |
| # β yield done INSIDE try so cleanup never interrupts it | |
| yield { | |
| "status": "done", | |
| "step": 4, | |
| "message": "β Complete!", | |
| "transcript": transcript, | |
| "translation": translation, | |
| "summary": summary, | |
| "enhancedAudio": enhanced_url, | |
| "stats": { | |
| "language": detected_lang.upper(), | |
| "denoiser": stats, | |
| "transcription_method": t_method, | |
| "translation_method": tl_method, | |
| "word_segments": len(word_segs), | |
| "transcript_words": len(transcript.split()), | |
| }, | |
| } | |
| except Exception as e: | |
| logger.error(f"Pipeline error: {e}", exc_info=True) | |
| yield {"status": "error", "message": f"Pipeline failed: {str(e)}"} | |
| finally: | |
| # β Cleanup is now in finally β runs after done is yielded, never crashes pipeline | |
| _safe_cleanup(out_dir) | |
| async def health(): | |
| return JSONResponse({"status": "ok", "service": "ClearWave AI API"}) | |
| async def process_url(request: Request): | |
| data = await request.json() | |
| audio_url = data.get("audioUrl") | |
| audio_id = data.get("audioId", "") | |
| src_lang = data.get("srcLang", "auto") | |
| tgt_lang = data.get("tgtLang", "te") | |
| opt_fillers = data.get("optFillers", True) | |
| opt_stutters = data.get("optStutters", True) | |
| opt_silences = data.get("optSilences", True) | |
| if not audio_url: | |
| return JSONResponse({"error": "audioUrl required"}, status_code=400) | |
| async def generate(): | |
| def sse(obj): | |
| return "data: " + json.dumps(obj) + "\n\n" | |
| yield sse({"status": "processing", "step": 0, "message": "Downloading audio..."}) | |
| # Download audio from URL | |
| try: | |
| resp = requests.get(audio_url, timeout=60, stream=True) | |
| resp.raise_for_status() | |
| lower_url = audio_url.lower().split("?")[0] | |
| suffix = ".mp3" | |
| if ".opus" in lower_url: suffix = ".opus" | |
| elif ".ogg" in lower_url: suffix = ".ogg" | |
| elif ".aac" in lower_url: suffix = ".aac" | |
| elif ".m4a" in lower_url: suffix = ".m4a" | |
| elif ".wav" in lower_url: suffix = ".wav" | |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix) | |
| for chunk in resp.iter_content(chunk_size=65536): | |
| if chunk: | |
| tmp.write(chunk) | |
| tmp.close() | |
| except Exception as e: | |
| yield sse({"status": "error", "message": f"Download failed: {e}"}) | |
| return | |
| # Run full pipeline | |
| for result in run_pipeline( | |
| tmp.name, src_lang, tgt_lang, | |
| opt_fillers, opt_stutters, opt_silences | |
| ): | |
| result["audioId"] = audio_id | |
| yield sse(result) | |
| try: | |
| os.unlink(tmp.name) | |
| except Exception: | |
| pass | |
| return StreamingResponse( | |
| generate(), | |
| media_type="text/event-stream", | |
| headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, | |
| ) |