Fast_api / fluency /compute_fluency.py
mulasagg's picture
API optimizations
aef3b1e
"""
Compute fluency score from audio file using SRS and PAS calculations
"""
import librosa
import numpy as np
from typing import Dict, Any, Union
from .fluency import calc_srs, calculate_pas, calculate_fluency, get_fluency_insight
from filler_count.filler_score import analyze_fillers
from typing import Dict, Any
import numpy as np
import librosa
import pyworld
def compute_fluency_score(file_path: str, whisper_model, filler_count= None) -> Dict[str, Any]:
"""
Compute fluency score and its components from a speech sample.
Args:
file_path (str): Path to the audio file.
whisper_model: Transcription model (e.g., OpenAI Whisper or faster-whisper)
Returns:
dict: A dictionary containing fluency score, SRS, PAS, and component scores.
"""
# Transcribe audio
result = whisper_model.transcribe(file_path, word_timestamps=False, fp16=False)
transcript = result.get("text", "").strip()
segments = result.get("segments", [])
# Validate early
if not transcript or not segments:
raise ValueError("Empty transcript or segments from Whisper.")
if filler_count is None:
# Detect filler words
result = analyze_fillers(file_path,"base", transcript)
filler_score = result.get("filler_score", 0)
filler_count = result.get("total_fillers", 0)
# Load audio
y, sr = librosa.load(file_path, sr=None)
duration = len(y) / sr if sr else 0.0
if duration <= 0:
raise ValueError("Audio duration invalid or zero.")
# Calculate pitch variation (in semitones) using pyworld
_f0, t = pyworld.harvest(y.astype(np.float64), sr, f0_floor=80.0, f0_ceil=400.0, frame_period=1000 * 256 / sr)
f0 = pyworld.stonemask(y.astype(np.float64), _f0, t, sr)
voiced_f0 = f0[f0 > 0]
voiced_f0 = voiced_f0[
(voiced_f0 > np.percentile(voiced_f0, 5)) &
(voiced_f0 < np.percentile(voiced_f0, 95))
]
pitch_variation = 0.0
if voiced_f0.size > 0:
median_f0 = np.median(voiced_f0)
median_f0 = max(median_f0, 1e-6)
semitone_diffs = 12 * np.log2(voiced_f0 / median_f0)
pitch_variation = float(np.std(semitone_diffs))
# Analyze pauses
long_pause_count = 0
if segments:
for i in range(len(segments) - 1):
pause_dur = segments[i + 1]["start"] - segments[i]["end"]
if pause_dur > 1.0:
long_pause_count += 1
# Check beginning and end pauses
if segments[0]["start"] > 1.0:
long_pause_count += 1
if duration - segments[-1]["end"] > 1.0:
long_pause_count += 1
# Calculate WPM
word_count = len(transcript.split())
words_per_min = (word_count / duration) * 60.0 if duration > 0 else 0.0
# Calculate SRS - Speech Rate Stability
srs_score = calc_srs(
wpm=words_per_min,
filler_count=filler_count,
long_pause_count=long_pause_count,
pitch_variation=pitch_variation
)
# Calculate PAS - Pause Appropriateness Score
pas_result = calculate_pas(
transcript=transcript,
segments=segments,
filler_count=filler_count,
duration=duration
)
pas_score = pas_result["PAS"]
# Calculate final fluency score
fluency_result = calculate_fluency(srs=srs_score, pas=pas_score)
fluency_score = fluency_result["score"]
return {
"fluency_score": fluency_score,
"SRS": srs_score,
"PAS": pas_score,
"pitch_variation": pitch_variation,
"filler_count": filler_count,
"long_pause_count": long_pause_count,
"WPM": words_per_min,
"transcript": transcript
}