EurekaPotato's picture
Upload folder using huggingface_hub
077be11 verified
"""
XGBoost Busy Detector - Hugging Face Inference Endpoint Handler
Custom handler for HF Inference Endpoints.
Loads XGBoost model, applies normalization, runs evidence accumulation scoring,
and returns busy_score + confidence + recommendation.
Derived from: src/normalization.py, src/scoring_engine.py, src/model.py
"""
from typing import Dict, Any, Tuple
import json
import math
import numpy as np
import pickle
from pathlib import Path
class EndpointHandler:
"""HF Inference Endpoint handler for XGBoost busy detection."""
def __init__(self, path: str = "."):
model_dir = Path(path)
# --- Load XGBoost model ---
model_path = None
for candidate in [
model_dir / "model.pkl",
model_dir / "busy_detector_v1.pkl",
model_dir / "busy_detector_5k.pkl",
]:
if candidate.exists():
model_path = candidate
break
if model_path is None:
raise FileNotFoundError(
f"No model file found in {model_dir}. "
"Expected model.pkl, busy_detector_v1.pkl, or busy_detector_5k.pkl"
)
with open(model_path, "rb") as f:
saved = pickle.load(f)
# Handle both raw model and dict-wrapped model
if isinstance(saved, dict):
self.model = saved.get("model") or saved.get("booster")
self.feature_names = saved.get("feature_names")
else:
self.model = saved
self.feature_names = None
print(f"✓ XGBoost model loaded from {model_path}")
# --- Load feature ranges ---
ranges_path = model_dir / "feature_ranges.json"
with open(ranges_path) as f:
ranges_data = json.load(f)
self.voice_ranges = ranges_data["voice_ranges"]
self.text_ranges = ranges_data["text_ranges"]
self.voice_order = ranges_data["voice_feature_order"]
self.text_order = ranges_data["text_feature_order"]
# --- Load scoring rules ---
rules_path = model_dir / "scoring_rules.json"
with open(rules_path) as f:
self.scoring = json.load(f)
self.weights = self.scoring["weights"]
self.thresholds = self.scoring["thresholds"]
print("✓ Feature ranges and scoring rules loaded")
# --------------------------------------------------------------------- #
# Public interface
# --------------------------------------------------------------------- #
def __call__(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""
Entrypoint for HF Inference Endpoints.
Expected input (JSON):
{
"inputs": {
"audio_features": { "v1_snr": 15.0, ... },
"text_features": { "t1_explicit_busy": 0.8, ... }
}
}
Returns:
{
"busy_score": 0.72,
"confidence": 0.85,
"recommendation": "EXIT",
"ml_probability": 0.65,
"evidence_details": [...]
}
"""
# HF wraps payload in "inputs"
inputs = data.get("inputs", data)
audio_features = inputs.get("audio_features", {})
text_features = inputs.get("text_features", {})
# 1. Normalize
normalized = self._normalize_features(audio_features, text_features)
# 2. XGBoost inference
import xgboost as xgb
dmatrix = xgb.DMatrix(normalized.reshape(1, -1))
ml_prob = float(self.model.predict(dmatrix)[0])
# 3. Evidence accumulation scoring
final_score, confidence, details = self._score_with_evidence(
ml_prob, audio_features, text_features
)
# 4. Recommendation
recommendation = self._get_recommendation(final_score)
return {
"busy_score": round(final_score, 4),
"confidence": round(confidence, 4),
"recommendation": recommendation,
"ml_probability": round(ml_prob, 4),
"evidence_details": details,
}
# --------------------------------------------------------------------- #
# Normalization (mirrors src/normalization.py FeatureNormalizer)
# --------------------------------------------------------------------- #
def _normalize_value(self, value: float, min_val: float, max_val: float) -> float:
if max_val == min_val:
return 0.0
value = max(min_val, min(max_val, value))
return (value - min_val) / (max_val - min_val)
def _normalize_features(
self,
audio_features: Dict[str, float],
text_features: Dict[str, float],
) -> np.ndarray:
"""Min-max normalize all 26 features and concatenate."""
voice_norm = []
for feat in self.voice_order:
val = audio_features.get(feat, 0.0)
lo, hi = self.voice_ranges[feat]
voice_norm.append(self._normalize_value(val, lo, hi))
text_norm = []
for feat in self.text_order:
val = text_features.get(feat, 0.0)
lo, hi = self.text_ranges[feat]
text_norm.append(self._normalize_value(val, lo, hi))
return np.array(voice_norm + text_norm, dtype=np.float32)
# --------------------------------------------------------------------- #
# Evidence scoring (mirrors src/scoring_engine.py ScoringEngine)
# --------------------------------------------------------------------- #
@staticmethod
def _sigmoid(x: float) -> float:
return 1.0 / (1.0 + math.exp(-x))
@staticmethod
def _logit(p: float) -> float:
p = max(0.01, min(0.99, p))
return math.log(p / (1.0 - p))
def _score_with_evidence(
self,
ml_prob: float,
audio_features: Dict[str, float],
text_features: Dict[str, float],
) -> Tuple[float, float, list]:
"""Evidence accumulation scoring exactly matching ScoringEngine.calculate_score."""
evidence = 0.0
details = []
# --- Text evidence ---
explicit = text_features.get("t1_explicit_busy", 0.0)
if explicit > 0.5:
pts = self.weights["explicit_busy"] * explicit
evidence += pts
details.append(f"Explicit Intent (+{pts:.1f})")
explicit_free = text_features.get("t0_explicit_free", 0.0)
if explicit_free > 0.5:
pts = self.weights["explicit_free"] * explicit_free
evidence += pts
details.append(f"Explicit Free ({pts:.1f})")
short_ratio = text_features.get("t3_short_ratio", 0.0)
if short_ratio > 0.3:
pts = self.weights["short_answers"] * short_ratio
evidence += pts
details.append(f"Brief Responses (+{pts:.1f})")
deflection = text_features.get("t6_deflection", 0.0)
if deflection > 0.1:
pts = self.weights["deflection"] * deflection
evidence += pts
details.append(f"Deflection (+{pts:.1f})")
# --- Audio evidence ---
traffic = audio_features.get("v2_noise_traffic", 0.0)
if traffic > 0.5:
pts = self.weights["traffic_noise"] * traffic
evidence += pts
details.append(f"Traffic Context (+{pts:.1f})")
rate = audio_features.get("v3_speech_rate", 0.0)
if rate > 3.5:
pts = self.weights["rushed_speech"]
evidence += pts
details.append(f"Rushed Speech (+{pts:.1f})")
pitch_std = audio_features.get("v5_pitch_std", 0.0)
if pitch_std > 80.0:
evidence += 0.5
details.append("Voice Stress (+0.5)")
emotion_stress = audio_features.get("v11_emotion_stress", 0.0)
if emotion_stress > 0.6:
pts = self.weights["emotion_stress"] * emotion_stress
evidence += pts
details.append(f"Emotional Stress (+{pts:.1f})")
emotion_energy = audio_features.get("v12_emotion_energy", 0.0)
if emotion_energy > 0.7:
pts = self.weights["emotion_energy"] * emotion_energy
evidence += pts
details.append(f"High Energy (+{pts:.1f})")
# --- ML baseline ---
ml_evidence = self._logit(ml_prob) * self.weights["ml_model_factor"]
evidence += ml_evidence
details.append(f"ML Baseline ({ml_evidence:+.1f})")
# --- Final ---
final_score = self._sigmoid(evidence)
confidence = float(math.tanh(abs(evidence) / 2.0))
return final_score, confidence, details
def _get_recommendation(self, score: float) -> str:
if score < self.thresholds["continue"]:
return "CONTINUE"
elif score < self.thresholds["check_in"]:
return "CHECK_IN"
else:
return "EXIT"