clearwave-api / main.py
Clearwave48's picture
Update main.py
6ec3b40 verified
"""
ClearWave AI β€” API Space (FastAPI)
===================================
Pipeline:
1. Download audio from URL
2. Denoise β†’ Cleanvoice SDK (tested & working)
3. Transcribe β†’ Groq Whisper large-v3 / faster-whisper fallback
4. Translate β†’ Helsinki-NLP / NLLB / Google fallback
5. Summarize β†’ Extractive (position-scored)
6. Upload to Cloudinary
Environment vars: CLEANVOICE_API_KEY, CLOUD_NAME, API_KEY, API_SECRET, GROQ_API_KEY
"""
import os
import json
import time
import tempfile
import logging
import shutil
import requests
import cloudinary
import cloudinary.uploader
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from denoiser import Denoiser # βœ… Your working denoiser
from transcriber import Transcriber # Your transcriber
from translator import Translator # Your translator
# ── Cloudinary config ─────────────────────────────────────────────
cloudinary.config(
cloud_name=os.environ.get("CLOUD_NAME"),
api_key=os.environ.get("API_KEY"),
api_secret=os.environ.get("API_SECRET"),
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ── Singletons (loaded once at startup) ──────────────────────────
denoiser = Denoiser()
transcriber = Transcriber()
translator = Translator()
app = FastAPI(title="ClearWave AI API")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
def _safe_cleanup(out_dir):
"""Remove temp directory safely β€” never raises."""
try:
if hasattr(denoiser, 'cleanup_temp_files'):
denoiser.cleanup_temp_files(out_dir)
elif out_dir and os.path.exists(out_dir):
shutil.rmtree(out_dir, ignore_errors=True)
except Exception as e:
logger.warning(f"Cleanup warning (non-fatal): {e}")
def run_pipeline(audio_path, src_lang="auto", tgt_lang="te",
opt_fillers=True, opt_stutters=True, opt_silences=True):
"""
Generator β€” yields SSE-style dicts at each step.
"""
out_dir = tempfile.mkdtemp()
word_segs = []
stats = {}
try:
# ── Step 1: Cleanvoice API ────────────────────────────────
yield {"status": "processing", "step": 1,
"message": "Step 1/4 β€” Enhancing audio with Cleanvoice..."}
result = denoiser.process(
audio_path, out_dir,
fillers=opt_fillers,
stutters=opt_stutters,
long_silences=opt_silences,
)
clean_audio = result["audio_path"]
stats = {
"noise_method": "Cleanvoice API",
"fillers": opt_fillers,
"stutters": opt_stutters,
"silences": opt_silences,
}
logger.info("Cleanvoice enhancement complete")
# ── Step 2: Transcribe ─────────────────────────────────────
yield {"status": "processing", "step": 2,
"message": "Step 2/4 β€” Transcribing..."}
transcript, detected_lang, t_method = transcriber.transcribe(clean_audio, src_lang)
word_segs = transcriber._last_segments
# Clean transcript fillers too
if opt_fillers:
transcript = denoiser.clean_transcript_fillers(transcript)
logger.info(f"Transcription: {len(transcript.split())} words, lang={detected_lang}")
# ── Step 3: Translate ──────────────────────────────────────
translation = transcript
tl_method = "same language"
if tgt_lang != "auto" and detected_lang != tgt_lang:
yield {"status": "processing", "step": 3,
"message": "Step 3/4 β€” Translating..."}
translation, tl_method = translator.translate(transcript, detected_lang, tgt_lang)
logger.info(f"Translation done via {tl_method}")
else:
yield {"status": "processing", "step": 3,
"message": "Step 3/4 β€” Skipping translation (same language)"}
# ── Step 4: Summarize & Cloudinary ────────────────────────
yield {"status": "processing", "step": 4,
"message": "Step 4/4 β€” Summarizing & uploading..."}
summary = translator.summarize(transcript)
enhanced_url = None
try:
upload_result = cloudinary.uploader.upload(
clean_audio,
resource_type="video",
folder="clearwave_enhanced",
)
enhanced_url = upload_result["secure_url"]
logger.info(f"Cloudinary upload: {enhanced_url}")
except Exception as e:
logger.error(f"Cloudinary failed: {e}")
# βœ… yield done INSIDE try so cleanup never interrupts it
yield {
"status": "done",
"step": 4,
"message": "βœ… Complete!",
"transcript": transcript,
"translation": translation,
"summary": summary,
"enhancedAudio": enhanced_url,
"stats": {
"language": detected_lang.upper(),
"denoiser": stats,
"transcription_method": t_method,
"translation_method": tl_method,
"word_segments": len(word_segs),
"transcript_words": len(transcript.split()),
},
}
except Exception as e:
logger.error(f"Pipeline error: {e}", exc_info=True)
yield {"status": "error", "message": f"Pipeline failed: {str(e)}"}
finally:
# βœ… Cleanup is now in finally β€” runs after done is yielded, never crashes pipeline
_safe_cleanup(out_dir)
@app.get("/api/health")
async def health():
return JSONResponse({"status": "ok", "service": "ClearWave AI API"})
@app.post("/api/process-url")
async def process_url(request: Request):
data = await request.json()
audio_url = data.get("audioUrl")
audio_id = data.get("audioId", "")
src_lang = data.get("srcLang", "auto")
tgt_lang = data.get("tgtLang", "te")
opt_fillers = data.get("optFillers", True)
opt_stutters = data.get("optStutters", True)
opt_silences = data.get("optSilences", True)
if not audio_url:
return JSONResponse({"error": "audioUrl required"}, status_code=400)
async def generate():
def sse(obj):
return "data: " + json.dumps(obj) + "\n\n"
yield sse({"status": "processing", "step": 0, "message": "Downloading audio..."})
# Download audio from URL
try:
resp = requests.get(audio_url, timeout=60, stream=True)
resp.raise_for_status()
lower_url = audio_url.lower().split("?")[0]
suffix = ".mp3"
if ".opus" in lower_url: suffix = ".opus"
elif ".ogg" in lower_url: suffix = ".ogg"
elif ".aac" in lower_url: suffix = ".aac"
elif ".m4a" in lower_url: suffix = ".m4a"
elif ".wav" in lower_url: suffix = ".wav"
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
for chunk in resp.iter_content(chunk_size=65536):
if chunk:
tmp.write(chunk)
tmp.close()
except Exception as e:
yield sse({"status": "error", "message": f"Download failed: {e}"})
return
# Run full pipeline
for result in run_pipeline(
tmp.name, src_lang, tgt_lang,
opt_fillers, opt_stutters, opt_silences
):
result["audioId"] = audio_id
yield sse(result)
try:
os.unlink(tmp.name)
except Exception:
pass
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)