Spaces:
Paused
Paused
| from fastapi import FastAPI, HTTPException, BackgroundTasks | |
| from pydantic import BaseModel | |
| import os | |
| import requests | |
| import subprocess | |
| import json | |
| import uuid | |
| import shutil | |
| from pathlib import Path | |
| from supabase import create_client, Client | |
| from openai import OpenAI | |
| import time | |
| from typing import Dict, Optional | |
| app = FastAPI() | |
| # Global state for background jobs | |
| # In a production environment, this should be a DB or Redis, but for HF Space singleton, a dict works | |
| jobs: Dict[str, dict] = {} | |
| class ProcessRequest(BaseModel): | |
| videoUrl: str | |
| projectId: str | |
| supabaseUrl: str | |
| supabaseKey: str | |
| openaiKey: str | |
| class JobStatus(BaseModel): | |
| job_id: str | |
| status: str | |
| progress: int | |
| message: str | |
| result: Optional[dict] = None | |
| error: Optional[str] = None | |
| def read_root(): | |
| return {"status": "Avatar Worker is Online", "active_jobs": len(jobs)} | |
| async def get_status(job_id: str): | |
| if job_id not in jobs: | |
| raise HTTPException(status_code=404, detail="Job not found") | |
| return jobs[job_id] | |
| import traceback | |
| def background_process(job_id: str, req: ProcessRequest): | |
| temp_dir = Path(f"/tmp/{uuid.uuid4()}") | |
| temp_dir.mkdir(parents=True, exist_ok=True) | |
| try: | |
| # 1. Download Video | |
| print(f"[{job_id}] Step 1: Downloading video from {req.videoUrl}") | |
| jobs[job_id].update({"status": "processing", "progress": 5, "message": "Downloading video..."}) | |
| video_path = temp_dir / "input_video.mp4" | |
| try: | |
| resp = requests.get(req.videoUrl, stream=True, timeout=300) | |
| resp.raise_for_status() | |
| with open(video_path, 'wb') as f: | |
| for chunk in resp.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| print(f"[{job_id}] Download complete. Size: {video_path.stat().st_size} bytes") | |
| except Exception as e: | |
| raise Exception(f"Download Error: {str(e)}") | |
| # 2. Extract Audio for STT | |
| print(f"[{job_id}] Step 2: Extracting audio...") | |
| jobs[job_id].update({"progress": 15, "message": "Extracting audio for AI analysis..."}) | |
| audio_path = temp_dir / "audio.mp3" | |
| try: | |
| subprocess.run([ | |
| "ffmpeg", "-i", str(video_path), | |
| "-vn", "-acodec", "libmp3lame", "-ar", "16000", "-ac", "1", "-y", | |
| str(audio_path) | |
| ], check=True, capture_output=True) | |
| audio_size = audio_path.stat().st_size | |
| print(f"[{job_id}] Audio extraction complete. Size: {audio_size} bytes") | |
| if audio_size > 25 * 1024 * 1024: | |
| print(f"[{job_id}] WARNING: Audio exceeds 25MB (Whisper limit).") | |
| except subprocess.CalledProcessError as e: | |
| raise Exception(f"FFmpeg Audio Error: {e.stderr.decode() if e.stderr else str(e)}") | |
| # 3. Initialize Clients | |
| print(f"[{job_id}] Step 3: Initializing API clients...") | |
| jobs[job_id].update({"progress": 25, "message": "Preparing AI engines..."}) | |
| try: | |
| supabase: Client = create_client(req.supabaseUrl, req.supabaseKey) | |
| openai_client = OpenAI(api_key=req.openaiKey) | |
| except Exception as e: | |
| raise Exception(f"Client Init Error: {str(e)}") | |
| # 4. Get Timestamps from OpenAI Whisper | |
| print(f"[{job_id}] Step 4: Calling OpenAI Whisper...") | |
| jobs[job_id].update({"progress": 35, "message": "Analyzing speech and timing..."}) | |
| try: | |
| with open(audio_path, "rb") as audio_file: | |
| transcript = openai_client.audio.transcriptions.create( | |
| file=audio_file, | |
| model="whisper-1", | |
| response_format="verbose_json", | |
| timestamp_granularities=["segment", "word"] | |
| ) | |
| segments = transcript.segments | |
| print(f"[{job_id}] Whisper analysis complete. Found {len(segments)} segments.") | |
| except Exception as e: | |
| print(f"[{job_id}] OpenAI/JSON Error: {traceback.format_exc()}") | |
| raise Exception(f"OpenAI Analysis Error: {str(e)}") | |
| if not segments: | |
| raise Exception("No speech detected in video") | |
| # 5. Slice Video and Upload | |
| print(f"[{job_id}] Step 5: Starting intelligent slice loop...") | |
| processed_slices = [] | |
| total_segments = len(segments) | |
| for i, segment in enumerate(segments): | |
| orig_start = segment.start | |
| orig_end = segment.end | |
| # Intelligent Midpoint Slicing: | |
| # We split the silence between segments 50/50, but with safety caps. | |
| # 5.1 Calculate End Padding (Next Segment Gap) | |
| if i + 1 < total_segments: | |
| gap_next = segments[i+1].start - orig_end | |
| # Split gap, ensure at least 0.05s overlap if tight, cap at 0.3s | |
| end_padding = max(0.05, min(0.3, gap_next / 2)) | |
| else: | |
| end_padding = 0.5 # Tail for the last segment | |
| # 5.2 Calculate Start Padding (Previous Segment Gap) | |
| if i > 0: | |
| gap_prev = orig_start - segments[i-1].end | |
| # Split gap, ensure at least 0.05s overlap if tight, cap at 0.1s | |
| start_padding = max(0.05, min(0.1, gap_prev / 2)) | |
| else: | |
| start_padding = 0.1 # Lead-in for the first segment | |
| start = max(0, orig_start - start_padding) | |
| end = orig_end + end_padding | |
| text = segment.text.strip() | |
| duration = end - start | |
| if duration < 0.2: continue | |
| step_progress = 40 + int((i / total_segments) * 50) | |
| jobs[job_id].update({"progress": step_progress, "message": f"Slicing segment {i+1}/{total_segments}..."}) | |
| output_filename = f"slice_{i}.mp4" | |
| output_path = temp_dir / output_filename | |
| try: | |
| # Precise Slicing with Audio Sync Optimization | |
| # -ss before -i is fast; -t after -i is precise duration. | |
| # -af aresample=async=1 ensures audio starts/ends correctly relative to the seek. | |
| subprocess.run([ | |
| "ffmpeg", "-ss", str(start), "-i", str(video_path), "-t", str(duration), "-y", | |
| "-c:v", "libx264", "-preset", "ultrafast", "-crf", "28", | |
| "-c:a", "aac", "-b:a", "128k", "-af", "aresample=async=1", | |
| "-map_metadata", "-1", "-avoid_negative_ts", "make_zero", | |
| str(output_path) | |
| ], check=True, capture_output=True) | |
| except subprocess.CalledProcessError as e: | |
| print(f"[{job_id}] Slicing Error at index {i}: {e.stderr.decode() if e.stderr else str(e)}") | |
| continue | |
| # Upload to Supabase | |
| try: | |
| storage_path = f"{req.projectId}/avatar_{int(time.time())}_{i}.mp4" | |
| with open(output_path, "rb") as f: | |
| supabase.storage.from_("projects").upload( | |
| path=storage_path, | |
| file=f, | |
| file_options={"content-type": "video/mp4", "x-upsert": "true"} | |
| ) | |
| public_url = supabase.storage.from_("projects").get_public_url(storage_path) | |
| processed_slices.append({"text": text, "url": public_url, "duration": duration}) | |
| except Exception as e: | |
| print(f"[{job_id}] Upload Error at index {i}: {str(e)}") | |
| # We can continue if one upload fails, or fail the whole job | |
| # Let's continue for now to be resilient | |
| continue | |
| print(f"[{job_id}] Loop complete. Slices: {len(processed_slices)}") | |
| jobs[job_id].update({ | |
| "status": "completed", | |
| "progress": 100, | |
| "message": "Processing complete!", | |
| "result": {"slices": processed_slices} | |
| }) | |
| except Exception as e: | |
| full_err = traceback.format_exc() | |
| print(f"[{job_id}] FATAL JOB ERROR: {full_err}") | |
| jobs[job_id].update({"status": "failed", "error": str(e)}) | |
| finally: | |
| shutil.rmtree(temp_dir, ignore_errors=True) | |
| async def process_video(req: ProcessRequest, background_tasks: BackgroundTasks): | |
| job_id = str(uuid.uuid4()) | |
| jobs[job_id] = { | |
| "job_id": job_id, | |
| "status": "queued", | |
| "progress": 0, | |
| "message": "Job received and queued", | |
| "result": None, | |
| "error": None | |
| } | |
| background_tasks.add_task(background_process, job_id, req) | |
| return {"job_id": job_id} | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |