norhan12's picture
Update app.py
836f35d verified
from fastapi import FastAPI, HTTPException, Body
from pydantic import BaseModel, HttpUrl
import os
import uuid
import shutil
import json
import requests
import logging
from process_interview import process_interview
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
app = FastAPI()
# Configuration
TEMP_DIR = "./temp_files"
OUTPUT_DIR = "./static/outputs"
JSON_DIR = os.path.join(OUTPUT_DIR, "json")
PDF_DIR = os.path.join(OUTPUT_DIR, "pdf")
PROCESSED_DIR = "./processed_audio" # Matches process_interview.py output
os.makedirs(TEMP_DIR, exist_ok=True)
os.makedirs(JSON_DIR, exist_ok=True)
os.makedirs(PDF_DIR, exist_ok=True)
os.makedirs(PROCESSED_DIR, exist_ok=True)
app.mount("/static/outputs", StaticFiles(directory=OUTPUT_DIR), name="static_outputs")
# Logging setup
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
VALID_EXTENSIONS = ('.wav', '.mp3', '.m4a', '.flac', '.webm', '.ogg', '.aac')
MAX_FILE_SIZE_MB = 300
BASE_URL = os.getenv("BASE_URL", "https://norhan12-evalbot-interview-analysis.hf.space")
class ProcessResponse(BaseModel):
summary: str
json_url: str
pdf_url: str
class ProcessAudioRequest(BaseModel):
file_url: HttpUrl
user_id: str
@app.post("/process-audio", response_model=ProcessResponse)
async def process_audio(request: ProcessAudioRequest = Body(...)):
file_url = request.file_url
user_id = request.user_id
try:
file_ext = os.path.splitext(str(file_url))[1].lower()
if file_ext not in VALID_EXTENSIONS:
raise HTTPException(status_code=400, detail=f"Invalid file extension: {file_ext}")
local_filename = f"{user_id}_{uuid.uuid4().hex}{file_ext}"
local_path = os.path.join(TEMP_DIR, local_filename)
logger.info(f"Downloading file from {file_url} to {local_path}")
resp = requests.get(str(file_url), stream=True, timeout=30)
if resp.status_code != 200:
raise HTTPException(status_code=400, detail=f"Failed to download file from {file_url}: Status {resp.status_code}")
with open(local_path, "wb") as f:
for chunk in resp.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
file_size_mb = os.path.getsize(local_path) / (1024 * 1024)
if file_size_mb > MAX_FILE_SIZE_MB:
os.remove(local_path)
raise HTTPException(status_code=400, detail=f"File too large: {file_size_mb:.2f} MB")
logger.info(f"Processing audio file: {local_path}")
result = process_interview(local_path, user_id=user_id)
if not result or 'json_path' not in result or 'pdf_path' not in result:
os.remove(local_path)
raise HTTPException(status_code=500, detail="Processing failed")
json_dest = os.path.basename(result['json_path'])
pdf_dest = os.path.basename(result['pdf_path'])
shutil.copyfile(result['json_path'], os.path.join(JSON_DIR, json_dest))
shutil.copyfile(result['pdf_path'], os.path.join(PDF_DIR, pdf_dest))
with open(result['json_path'], "r") as jf:
analysis_data = json.load(jf)
voice = analysis_data.get('voice_analysis', {})
interpretation = voice.get('interpretation', {})
speakers = analysis_data.get('speakers', [])
total_duration = analysis_data.get('text_analysis', {}).get('total_duration', 0.0)
summary = (
f"User ID: {user_id}\n"
f"Speakers: {', '.join(speakers)}\n"
f"Duration: {total_duration:.2f} sec\n"
f"Confidence: {interpretation.get('confidence_level', 'N/A')}\n"
f"Anxiety: {interpretation.get('anxiety_level', 'N/A')}"
)
json_url = f"{BASE_URL}/static/outputs/json/{json_dest}"
pdf_url = f"{BASE_URL}/static/outputs/pdf/{pdf_dest}"
# Clean up temporary and original processed files
os.remove(local_path)
os.remove(result['json_path'])
os.remove(result['pdf_path'])
return ProcessResponse(summary=summary, json_url=json_url, pdf_url=pdf_url)
except requests.RequestException as e:
raise HTTPException(status_code=400, detail=f"Download error: {str(e)}")
except Exception as e:
if os.path.exists(local_path):
os.remove(local_path)
raise HTTPException(status_code=500, detail=f"Processing error: {str(e)}")
@app.get("/static/outputs/json/{filename}")
async def get_json_file(filename: str):
file_path = os.path.join(JSON_DIR, filename)
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail="JSON file not found")
return FileResponse(file_path, media_type="application/json", filename=filename)
@app.get("/static/outputs/pdf/{filename}")
async def get_pdf_file(filename: str):
file_path = os.path.join(PDF_DIR, filename)
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail="PDF file not found")
return FileResponse(file_path, media_type="application/pdf", filename=filename)