from fastapi import FastAPI, UploadFile, File, HTTPException from fastapi.responses import JSONResponse from fastapi.middleware.cors import CORSMiddleware import os import shutil import uuid import tempfile import datetime import time from contextlib import contextmanager # Import analysis functions (assumed to be modified to accept transcript) from fluency.fluency_api import main as analyze_fluency_main from tone_modulation.tone_api import main as analyze_tone_main from vcs.vcs_api import main as analyze_vcs_main from vers.vers_api import main as analyze_vers_main from voice_confidence_score.voice_confidence_api import main as analyze_voice_confidence_main from vps.vps_api import main as analyze_vps_main from ves.ves import calc_voice_engagement_score from transcribe import transcribe_audio from filler_count.filler_score import analyze_fillers from emotion.emo_predict import predict_emotion app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], # Replace with specific domains in production allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) ALLOWED_EXTENSIONS = {'.wav', '.mp3', '.m4a', '.mp4', '.flac'} @contextmanager def temp_file_handler(upload_file: UploadFile): """Context manager to handle temporary file creation and cleanup.""" temp_dir = "temp_uploads" os.makedirs(temp_dir, exist_ok=True) temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(upload_file.filename)[1]}" temp_filepath = os.path.join(temp_dir, temp_filename) try: with open(temp_filepath, "wb") as buffer: shutil.copyfileobj(upload_file.file, buffer) yield temp_filepath finally: if os.path.exists(temp_filepath): os.remove(temp_filepath) def validate_file_extension(filename: str): """Validate if the file extension is allowed.""" if not os.path.splitext(filename)[1].lower() in ALLOWED_EXTENSIONS: raise HTTPException( status_code=400, detail="Invalid file type. Only .wav, .mp3, .m4a, .mp4, and .flac files are supported." ) async def process_audio_file(upload_file: UploadFile, analysis_func, **kwargs): """Generic function to process an audio file with a given analysis function.""" validate_file_extension(upload_file.filename) with temp_file_handler(upload_file) as temp_filepath: try: result = analysis_func(temp_filepath, **kwargs) return JSONResponse(content=result) except Exception as e: raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}") @app.post("/analyze_fluency/") async def analyze_fluency(file: UploadFile): return await process_audio_file(file, analyze_fluency_main, model_size="base") @app.post('/analyze_tone/') async def analyze_tone(file: UploadFile): return await process_audio_file(file, analyze_tone_main) @app.post('/analyze_vcs/') async def analyze_vcs(file: UploadFile): return await process_audio_file(file, analyze_vcs_main) @app.post('/analyze_vers/') async def analyze_vers(file: UploadFile): return await process_audio_file(file, analyze_vers_main) @app.post('/voice_confidence/') async def analyze_voice_confidence(file: UploadFile): return await process_audio_file(file, analyze_voice_confidence_main) @app.post('/analyze_vps/') async def analyze_vps(file: UploadFile): return await process_audio_file(file, analyze_vps_main) @app.post('/voice_engagement_score/') async def analyze_voice_engagement_score(file: UploadFile): return await process_audio_file(file, calc_voice_engagement_score) @app.post('/analyze_fillers/') async def analyze_fillers_count(file: UploadFile): return await process_audio_file(file, analyze_fillers) @app.post('/transcribe/') async def transcribe(file: UploadFile): validate_file_extension(file.filename) start_time = time.time() with temp_file_handler(file) as temp_filepath: try: transcript, language, _ = transcribe_audio(temp_filepath, model_size="base") end_time = time.time() response = { "transcription": transcript, "transcription_time": end_time - start_time, "language": language } return JSONResponse(content=response) except Exception as e: raise HTTPException(status_code=500, detail=f"Transcription failed: {str(e)}") @app.post('/analyze_all/') async def analyze_all(file: UploadFile): """Endpoint to analyze all aspects of an uploaded audio file with single transcription.""" print(f"Received request at {datetime.datetime.now()} for file: {file.filename}") validate_file_extension(file.filename) with temp_file_handler(file) as temp_filepath: try: # Generate transcript once transcript, language, _ = transcribe_audio(temp_filepath, model_size="base") # Pass transcript to analysis functions that support it analyze_all_start = time.time() # Compute filler count filler_start = time.time() filler_count = analyze_fillers(temp_filepath) filler_count_number = filler_count.get("total_fillers", 0) filler_end = time.time() print(f"Filler analysis time: {filler_end - filler_start} seconds") fluency_start = time.time() fluency_result = analyze_fluency_main(temp_filepath, model_size="base", filler_count = filler_count_number) fluency_score = fluency_result['fluency_score'] fluency_end = time.time() print(f"Fluency analysis time: {fluency_end - fluency_start} seconds") tone_start = time.time() tone_result = analyze_tone_main(temp_filepath) tone_end = time.time() print(f"Tone analysis time: {tone_end - tone_start} seconds") vcs_start = time.time() vcs_result = analyze_vcs_main(temp_filepath) vcs_end = time.time() print(f"VCS analysis time: {vcs_end - vcs_start} seconds") vers_start = time.time() vers_result = analyze_vers_main(temp_filepath, model_size="base", filler_count = filler_count_number) vers_end = time.time() print(f"VERS analysis time: {vers_end - vers_start} seconds") voice_confidence_start = time.time() voice_confidence_result = analyze_voice_confidence_main(temp_filepath, model_size="base", filler_count = filler_count_number, fluency_score = fluency_score) print("voice_confidence_result:", voice_confidence_result) voice_confidence_end = time.time() print(f"Voice confidence analysis time: {voice_confidence_end - voice_confidence_start} seconds") vps_start = time.time() vps_result = analyze_vps_main(temp_filepath) vps_end = time.time() print(f"VPS analysis time: {vps_end - vps_start} seconds") ves_start = time.time() ves_result = calc_voice_engagement_score(temp_filepath) ves_end = time.time() print(f"VES analysis time: {ves_end - ves_start} seconds") emotion_start = time.time() emotion = predict_emotion(temp_filepath) emotion_end = time.time() print(f"Emotion analysis time: {emotion_end - emotion_start} seconds") # Calculate average score avg_score = ( fluency_result['fluency_score'] + tone_result['speech_dynamism_score'] + vcs_result['Voice Clarity Sore'] + vers_result['VERS Score'] + voice_confidence_result['voice_confidence_score'] + vps_result['VPS'] + ves_result['ves'] ) / 7 analyze_all_end = time.time() # Combine results combined_result = { "fluency": fluency_result, "tone": tone_result, "vcs": vcs_result, "vers": vers_result, "voice_confidence": voice_confidence_result, "vps": vps_result, "ves": ves_result, "filler_words": filler_count, "transcript": transcript, "Detected Language": language, "emotion": emotion, "sank_score": avg_score, "analysis_time": analyze_all_end - analyze_all_start, } return JSONResponse(content=combined_result) except Exception as e: raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}")