from fastapi import FastAPI, HTTPException, Body from pydantic import BaseModel, HttpUrl import os import uuid import shutil import json import requests import logging from process_interview import process_interview from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse app = FastAPI() # Configuration TEMP_DIR = "./temp_files" OUTPUT_DIR = "./static/outputs" JSON_DIR = os.path.join(OUTPUT_DIR, "json") PDF_DIR = os.path.join(OUTPUT_DIR, "pdf") PROCESSED_DIR = "./processed_audio" # Matches process_interview.py output os.makedirs(TEMP_DIR, exist_ok=True) os.makedirs(JSON_DIR, exist_ok=True) os.makedirs(PDF_DIR, exist_ok=True) os.makedirs(PROCESSED_DIR, exist_ok=True) app.mount("/static/outputs", StaticFiles(directory=OUTPUT_DIR), name="static_outputs") # Logging setup logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) VALID_EXTENSIONS = ('.wav', '.mp3', '.m4a', '.flac', '.webm', '.ogg', '.aac') MAX_FILE_SIZE_MB = 300 BASE_URL = os.getenv("BASE_URL", "https://norhan12-evalbot-interview-analysis.hf.space") class ProcessResponse(BaseModel): summary: str json_url: str pdf_url: str class ProcessAudioRequest(BaseModel): file_url: HttpUrl user_id: str @app.post("/process-audio", response_model=ProcessResponse) async def process_audio(request: ProcessAudioRequest = Body(...)): file_url = request.file_url user_id = request.user_id try: file_ext = os.path.splitext(str(file_url))[1].lower() if file_ext not in VALID_EXTENSIONS: raise HTTPException(status_code=400, detail=f"Invalid file extension: {file_ext}") local_filename = f"{user_id}_{uuid.uuid4().hex}{file_ext}" local_path = os.path.join(TEMP_DIR, local_filename) logger.info(f"Downloading file from {file_url} to {local_path}") resp = requests.get(str(file_url), stream=True, timeout=30) if resp.status_code != 200: raise HTTPException(status_code=400, detail=f"Failed to download file from {file_url}: Status {resp.status_code}") with open(local_path, "wb") as f: for chunk in resp.iter_content(chunk_size=8192): if chunk: f.write(chunk) file_size_mb = os.path.getsize(local_path) / (1024 * 1024) if file_size_mb > MAX_FILE_SIZE_MB: os.remove(local_path) raise HTTPException(status_code=400, detail=f"File too large: {file_size_mb:.2f} MB") logger.info(f"Processing audio file: {local_path}") result = process_interview(local_path, user_id=user_id) if not result or 'json_path' not in result or 'pdf_path' not in result: os.remove(local_path) raise HTTPException(status_code=500, detail="Processing failed") json_dest = os.path.basename(result['json_path']) pdf_dest = os.path.basename(result['pdf_path']) shutil.copyfile(result['json_path'], os.path.join(JSON_DIR, json_dest)) shutil.copyfile(result['pdf_path'], os.path.join(PDF_DIR, pdf_dest)) with open(result['json_path'], "r") as jf: analysis_data = json.load(jf) voice = analysis_data.get('voice_analysis', {}) interpretation = voice.get('interpretation', {}) speakers = analysis_data.get('speakers', []) total_duration = analysis_data.get('text_analysis', {}).get('total_duration', 0.0) summary = ( f"User ID: {user_id}\n" f"Speakers: {', '.join(speakers)}\n" f"Duration: {total_duration:.2f} sec\n" f"Confidence: {interpretation.get('confidence_level', 'N/A')}\n" f"Anxiety: {interpretation.get('anxiety_level', 'N/A')}" ) json_url = f"{BASE_URL}/static/outputs/json/{json_dest}" pdf_url = f"{BASE_URL}/static/outputs/pdf/{pdf_dest}" # Clean up temporary and original processed files os.remove(local_path) os.remove(result['json_path']) os.remove(result['pdf_path']) return ProcessResponse(summary=summary, json_url=json_url, pdf_url=pdf_url) except requests.RequestException as e: raise HTTPException(status_code=400, detail=f"Download error: {str(e)}") except Exception as e: if os.path.exists(local_path): os.remove(local_path) raise HTTPException(status_code=500, detail=f"Processing error: {str(e)}") @app.get("/static/outputs/json/{filename}") async def get_json_file(filename: str): file_path = os.path.join(JSON_DIR, filename) if not os.path.exists(file_path): raise HTTPException(status_code=404, detail="JSON file not found") return FileResponse(file_path, media_type="application/json", filename=filename) @app.get("/static/outputs/pdf/{filename}") async def get_pdf_file(filename: str): file_path = os.path.join(PDF_DIR, filename) if not os.path.exists(file_path): raise HTTPException(status_code=404, detail="PDF file not found") return FileResponse(file_path, media_type="application/pdf", filename=filename)