Fast_api / app.py
mulasagg's picture
API optimizations
aef3b1e
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import os
import shutil
import uuid
import tempfile
import datetime
import time
from contextlib import contextmanager
# Import analysis functions (assumed to be modified to accept transcript)
from fluency.fluency_api import main as analyze_fluency_main
from tone_modulation.tone_api import main as analyze_tone_main
from vcs.vcs_api import main as analyze_vcs_main
from vers.vers_api import main as analyze_vers_main
from voice_confidence_score.voice_confidence_api import main as analyze_voice_confidence_main
from vps.vps_api import main as analyze_vps_main
from ves.ves import calc_voice_engagement_score
from transcribe import transcribe_audio
from filler_count.filler_score import analyze_fillers
from emotion.emo_predict import predict_emotion
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Replace with specific domains in production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
ALLOWED_EXTENSIONS = {'.wav', '.mp3', '.m4a', '.mp4', '.flac'}
@contextmanager
def temp_file_handler(upload_file: UploadFile):
"""Context manager to handle temporary file creation and cleanup."""
temp_dir = "temp_uploads"
os.makedirs(temp_dir, exist_ok=True)
temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(upload_file.filename)[1]}"
temp_filepath = os.path.join(temp_dir, temp_filename)
try:
with open(temp_filepath, "wb") as buffer:
shutil.copyfileobj(upload_file.file, buffer)
yield temp_filepath
finally:
if os.path.exists(temp_filepath):
os.remove(temp_filepath)
def validate_file_extension(filename: str):
"""Validate if the file extension is allowed."""
if not os.path.splitext(filename)[1].lower() in ALLOWED_EXTENSIONS:
raise HTTPException(
status_code=400,
detail="Invalid file type. Only .wav, .mp3, .m4a, .mp4, and .flac files are supported."
)
async def process_audio_file(upload_file: UploadFile, analysis_func, **kwargs):
"""Generic function to process an audio file with a given analysis function."""
validate_file_extension(upload_file.filename)
with temp_file_handler(upload_file) as temp_filepath:
try:
result = analysis_func(temp_filepath, **kwargs)
return JSONResponse(content=result)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}")
@app.post("/analyze_fluency/")
async def analyze_fluency(file: UploadFile):
return await process_audio_file(file, analyze_fluency_main, model_size="base")
@app.post('/analyze_tone/')
async def analyze_tone(file: UploadFile):
return await process_audio_file(file, analyze_tone_main)
@app.post('/analyze_vcs/')
async def analyze_vcs(file: UploadFile):
return await process_audio_file(file, analyze_vcs_main)
@app.post('/analyze_vers/')
async def analyze_vers(file: UploadFile):
return await process_audio_file(file, analyze_vers_main)
@app.post('/voice_confidence/')
async def analyze_voice_confidence(file: UploadFile):
return await process_audio_file(file, analyze_voice_confidence_main)
@app.post('/analyze_vps/')
async def analyze_vps(file: UploadFile):
return await process_audio_file(file, analyze_vps_main)
@app.post('/voice_engagement_score/')
async def analyze_voice_engagement_score(file: UploadFile):
return await process_audio_file(file, calc_voice_engagement_score)
@app.post('/analyze_fillers/')
async def analyze_fillers_count(file: UploadFile):
return await process_audio_file(file, analyze_fillers)
@app.post('/transcribe/')
async def transcribe(file: UploadFile):
validate_file_extension(file.filename)
start_time = time.time()
with temp_file_handler(file) as temp_filepath:
try:
transcript, language, _ = transcribe_audio(temp_filepath, model_size="base")
end_time = time.time()
response = {
"transcription": transcript,
"transcription_time": end_time - start_time,
"language": language
}
return JSONResponse(content=response)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Transcription failed: {str(e)}")
@app.post('/analyze_all/')
async def analyze_all(file: UploadFile):
"""Endpoint to analyze all aspects of an uploaded audio file with single transcription."""
print(f"Received request at {datetime.datetime.now()} for file: {file.filename}")
validate_file_extension(file.filename)
with temp_file_handler(file) as temp_filepath:
try:
# Generate transcript once
transcript, language, _ = transcribe_audio(temp_filepath, model_size="base")
# Pass transcript to analysis functions that support it
analyze_all_start = time.time()
# Compute filler count
filler_start = time.time()
filler_count = analyze_fillers(temp_filepath)
filler_count_number = filler_count.get("total_fillers", 0)
filler_end = time.time()
print(f"Filler analysis time: {filler_end - filler_start} seconds")
fluency_start = time.time()
fluency_result = analyze_fluency_main(temp_filepath, model_size="base", filler_count = filler_count_number)
fluency_score = fluency_result['fluency_score']
fluency_end = time.time()
print(f"Fluency analysis time: {fluency_end - fluency_start} seconds")
tone_start = time.time()
tone_result = analyze_tone_main(temp_filepath)
tone_end = time.time()
print(f"Tone analysis time: {tone_end - tone_start} seconds")
vcs_start = time.time()
vcs_result = analyze_vcs_main(temp_filepath)
vcs_end = time.time()
print(f"VCS analysis time: {vcs_end - vcs_start} seconds")
vers_start = time.time()
vers_result = analyze_vers_main(temp_filepath, model_size="base", filler_count = filler_count_number)
vers_end = time.time()
print(f"VERS analysis time: {vers_end - vers_start} seconds")
voice_confidence_start = time.time()
voice_confidence_result = analyze_voice_confidence_main(temp_filepath, model_size="base", filler_count = filler_count_number, fluency_score = fluency_score)
print("voice_confidence_result:", voice_confidence_result)
voice_confidence_end = time.time()
print(f"Voice confidence analysis time: {voice_confidence_end - voice_confidence_start} seconds")
vps_start = time.time()
vps_result = analyze_vps_main(temp_filepath)
vps_end = time.time()
print(f"VPS analysis time: {vps_end - vps_start} seconds")
ves_start = time.time()
ves_result = calc_voice_engagement_score(temp_filepath)
ves_end = time.time()
print(f"VES analysis time: {ves_end - ves_start} seconds")
emotion_start = time.time()
emotion = predict_emotion(temp_filepath)
emotion_end = time.time()
print(f"Emotion analysis time: {emotion_end - emotion_start} seconds")
# Calculate average score
avg_score = (
fluency_result['fluency_score'] +
tone_result['speech_dynamism_score'] +
vcs_result['Voice Clarity Sore'] +
vers_result['VERS Score'] +
voice_confidence_result['voice_confidence_score'] +
vps_result['VPS'] +
ves_result['ves']
) / 7
analyze_all_end = time.time()
# Combine results
combined_result = {
"fluency": fluency_result,
"tone": tone_result,
"vcs": vcs_result,
"vers": vers_result,
"voice_confidence": voice_confidence_result,
"vps": vps_result,
"ves": ves_result,
"filler_words": filler_count,
"transcript": transcript,
"Detected Language": language,
"emotion": emotion,
"sank_score": avg_score,
"analysis_time": analyze_all_end - analyze_all_start,
}
return JSONResponse(content=combined_result)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}")