|
from fastapi import FastAPI, UploadFile, File, HTTPException |
|
from fastapi.responses import JSONResponse |
|
from fastapi.middleware.cors import CORSMiddleware |
|
import os |
|
import shutil |
|
import uuid |
|
import tempfile |
|
import datetime |
|
import time |
|
from contextlib import contextmanager |
|
|
|
|
|
from fluency.fluency_api import main as analyze_fluency_main |
|
from tone_modulation.tone_api import main as analyze_tone_main |
|
from vcs.vcs_api import main as analyze_vcs_main |
|
from vers.vers_api import main as analyze_vers_main |
|
from voice_confidence_score.voice_confidence_api import main as analyze_voice_confidence_main |
|
from vps.vps_api import main as analyze_vps_main |
|
from ves.ves import calc_voice_engagement_score |
|
from transcribe import transcribe_audio |
|
from filler_count.filler_score import analyze_fillers |
|
from emotion.emo_predict import predict_emotion |
|
|
|
app = FastAPI() |
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["*"], |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
ALLOWED_EXTENSIONS = {'.wav', '.mp3', '.m4a', '.mp4', '.flac'} |
|
|
|
@contextmanager |
|
def temp_file_handler(upload_file: UploadFile): |
|
"""Context manager to handle temporary file creation and cleanup.""" |
|
temp_dir = "temp_uploads" |
|
os.makedirs(temp_dir, exist_ok=True) |
|
temp_filename = f"temp_{uuid.uuid4()}{os.path.splitext(upload_file.filename)[1]}" |
|
temp_filepath = os.path.join(temp_dir, temp_filename) |
|
|
|
try: |
|
with open(temp_filepath, "wb") as buffer: |
|
shutil.copyfileobj(upload_file.file, buffer) |
|
yield temp_filepath |
|
finally: |
|
if os.path.exists(temp_filepath): |
|
os.remove(temp_filepath) |
|
|
|
def validate_file_extension(filename: str): |
|
"""Validate if the file extension is allowed.""" |
|
if not os.path.splitext(filename)[1].lower() in ALLOWED_EXTENSIONS: |
|
raise HTTPException( |
|
status_code=400, |
|
detail="Invalid file type. Only .wav, .mp3, .m4a, .mp4, and .flac files are supported." |
|
) |
|
|
|
async def process_audio_file(upload_file: UploadFile, analysis_func, **kwargs): |
|
"""Generic function to process an audio file with a given analysis function.""" |
|
validate_file_extension(upload_file.filename) |
|
|
|
with temp_file_handler(upload_file) as temp_filepath: |
|
try: |
|
result = analysis_func(temp_filepath, **kwargs) |
|
return JSONResponse(content=result) |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}") |
|
|
|
@app.post("/analyze_fluency/") |
|
async def analyze_fluency(file: UploadFile): |
|
return await process_audio_file(file, analyze_fluency_main, model_size="base") |
|
|
|
@app.post('/analyze_tone/') |
|
async def analyze_tone(file: UploadFile): |
|
return await process_audio_file(file, analyze_tone_main) |
|
|
|
@app.post('/analyze_vcs/') |
|
async def analyze_vcs(file: UploadFile): |
|
return await process_audio_file(file, analyze_vcs_main) |
|
|
|
@app.post('/analyze_vers/') |
|
async def analyze_vers(file: UploadFile): |
|
return await process_audio_file(file, analyze_vers_main) |
|
|
|
@app.post('/voice_confidence/') |
|
async def analyze_voice_confidence(file: UploadFile): |
|
return await process_audio_file(file, analyze_voice_confidence_main) |
|
|
|
@app.post('/analyze_vps/') |
|
async def analyze_vps(file: UploadFile): |
|
return await process_audio_file(file, analyze_vps_main) |
|
|
|
@app.post('/voice_engagement_score/') |
|
async def analyze_voice_engagement_score(file: UploadFile): |
|
return await process_audio_file(file, calc_voice_engagement_score) |
|
|
|
@app.post('/analyze_fillers/') |
|
async def analyze_fillers_count(file: UploadFile): |
|
return await process_audio_file(file, analyze_fillers) |
|
|
|
@app.post('/transcribe/') |
|
async def transcribe(file: UploadFile): |
|
validate_file_extension(file.filename) |
|
|
|
start_time = time.time() |
|
with temp_file_handler(file) as temp_filepath: |
|
try: |
|
transcript, language, _ = transcribe_audio(temp_filepath, model_size="base") |
|
end_time = time.time() |
|
response = { |
|
"transcription": transcript, |
|
"transcription_time": end_time - start_time, |
|
"language": language |
|
} |
|
return JSONResponse(content=response) |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Transcription failed: {str(e)}") |
|
|
|
@app.post('/analyze_all/') |
|
async def analyze_all(file: UploadFile): |
|
"""Endpoint to analyze all aspects of an uploaded audio file with single transcription.""" |
|
print(f"Received request at {datetime.datetime.now()} for file: {file.filename}") |
|
validate_file_extension(file.filename) |
|
|
|
with temp_file_handler(file) as temp_filepath: |
|
try: |
|
|
|
transcript, language, _ = transcribe_audio(temp_filepath, model_size="base") |
|
|
|
|
|
analyze_all_start = time.time() |
|
|
|
|
|
filler_start = time.time() |
|
filler_count = analyze_fillers(temp_filepath) |
|
filler_count_number = filler_count.get("total_fillers", 0) |
|
filler_end = time.time() |
|
print(f"Filler analysis time: {filler_end - filler_start} seconds") |
|
|
|
fluency_start = time.time() |
|
fluency_result = analyze_fluency_main(temp_filepath, model_size="base", filler_count = filler_count_number) |
|
fluency_score = fluency_result['fluency_score'] |
|
fluency_end = time.time() |
|
print(f"Fluency analysis time: {fluency_end - fluency_start} seconds") |
|
|
|
tone_start = time.time() |
|
tone_result = analyze_tone_main(temp_filepath) |
|
tone_end = time.time() |
|
print(f"Tone analysis time: {tone_end - tone_start} seconds") |
|
|
|
vcs_start = time.time() |
|
vcs_result = analyze_vcs_main(temp_filepath) |
|
vcs_end = time.time() |
|
print(f"VCS analysis time: {vcs_end - vcs_start} seconds") |
|
|
|
vers_start = time.time() |
|
vers_result = analyze_vers_main(temp_filepath, model_size="base", filler_count = filler_count_number) |
|
vers_end = time.time() |
|
print(f"VERS analysis time: {vers_end - vers_start} seconds") |
|
|
|
voice_confidence_start = time.time() |
|
voice_confidence_result = analyze_voice_confidence_main(temp_filepath, model_size="base", filler_count = filler_count_number, fluency_score = fluency_score) |
|
print("voice_confidence_result:", voice_confidence_result) |
|
|
|
voice_confidence_end = time.time() |
|
print(f"Voice confidence analysis time: {voice_confidence_end - voice_confidence_start} seconds") |
|
|
|
vps_start = time.time() |
|
vps_result = analyze_vps_main(temp_filepath) |
|
vps_end = time.time() |
|
print(f"VPS analysis time: {vps_end - vps_start} seconds") |
|
ves_start = time.time() |
|
ves_result = calc_voice_engagement_score(temp_filepath) |
|
ves_end = time.time() |
|
print(f"VES analysis time: {ves_end - ves_start} seconds") |
|
emotion_start = time.time() |
|
emotion = predict_emotion(temp_filepath) |
|
emotion_end = time.time() |
|
print(f"Emotion analysis time: {emotion_end - emotion_start} seconds") |
|
|
|
|
|
avg_score = ( |
|
fluency_result['fluency_score'] + |
|
tone_result['speech_dynamism_score'] + |
|
vcs_result['Voice Clarity Sore'] + |
|
vers_result['VERS Score'] + |
|
voice_confidence_result['voice_confidence_score'] + |
|
vps_result['VPS'] + |
|
ves_result['ves'] |
|
) / 7 |
|
|
|
analyze_all_end = time.time() |
|
|
|
|
|
combined_result = { |
|
"fluency": fluency_result, |
|
"tone": tone_result, |
|
"vcs": vcs_result, |
|
"vers": vers_result, |
|
"voice_confidence": voice_confidence_result, |
|
"vps": vps_result, |
|
"ves": ves_result, |
|
"filler_words": filler_count, |
|
"transcript": transcript, |
|
"Detected Language": language, |
|
"emotion": emotion, |
|
"sank_score": avg_score, |
|
"analysis_time": analyze_all_end - analyze_all_start, |
|
} |
|
|
|
return JSONResponse(content=combined_result) |
|
|
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}") |
|
|