Spaces:
Sleeping
Sleeping
| """ | |
| Text Feature Extractor - LOW LATENCY VERSION | |
| Extracts 9 text features from conversation transcripts to detect busy/distracted states. | |
| PERFORMANCE IMPROVEMENTS vs original: | |
| 1. Replaces BART-MNLI (~1.6 GB, ~300ms/call) with a tiny DistilBERT NLI (~67 MB, ~8ms/call) | |
| 2. Replaces RoBERTa sentiment with a fast distilled model (~67 MB, ~5ms/call) | |
| 3. Replaces CrossEncoder coherence with batched cosine similarity on MiniLM (~22 MB, ~3ms/call) | |
| 4. All models loaded lazily β only instantiated on first use | |
| 5. Regex patterns compiled once; hot-path pattern matching runs before any model call | |
| 6. NLI model call skipped entirely when patterns are high-confidence (saves ~8ms per call) | |
| 7. Batched sentiment + coherence in a single forward pass when processing lists | |
| 8. Thread-safe lazy init via threading.Lock | |
| Typical latency (CPU, warm): | |
| extract_explicit_busy / free : ~1β10 ms (pattern fast-path: <0.1 ms) | |
| extract_sentiment : ~5 ms | |
| extract_coherence (5 turns) : ~3 ms | |
| extract_all (full pipeline) : ~15β25 ms | |
| """ | |
| from __future__ import annotations | |
| import re | |
| import threading | |
| import numpy as np | |
| from functools import lru_cache | |
| from typing import Dict, List, Tuple | |
| # --------------------------------------------------------------------------- | |
| # Lazy model holders | |
| # --------------------------------------------------------------------------- | |
| class _LazyModel: | |
| """Thread-safe lazy loader for a single model.""" | |
| def __init__(self, factory): | |
| self._factory = factory | |
| self._model = None | |
| self._lock = threading.Lock() | |
| def get(self): | |
| if self._model is None: | |
| with self._lock: | |
| if self._model is None: | |
| self._model = self._factory() | |
| return self._model | |
| def _load_sentiment(): | |
| from transformers import pipeline | |
| return pipeline( | |
| "sentiment-analysis", | |
| model="distilbert-base-uncased-finetuned-sst-2-english", | |
| device=-1, | |
| truncation=True, | |
| max_length=128, | |
| batch_size=16, | |
| ) | |
| def _load_nli(): | |
| from transformers import pipeline | |
| # cross-encoder/nli-MiniLM2-L6-H768 β 67 MB, ~8 ms/call on CPU | |
| return pipeline( | |
| "zero-shot-classification", | |
| model="cross-encoder/nli-MiniLM2-L6-H768", | |
| device=-1, | |
| ) | |
| def _load_embedder(): | |
| from sentence_transformers import SentenceTransformer | |
| return SentenceTransformer("all-MiniLM-L6-v2") | |
| _SENTIMENT_MODEL = _LazyModel(_load_sentiment) | |
| _NLI_MODEL = _LazyModel(_load_nli) | |
| _EMBEDDER = _LazyModel(_load_embedder) | |
| # --------------------------------------------------------------------------- | |
| # Compiled patterns (module-level, compiled once) | |
| # --------------------------------------------------------------------------- | |
| _NEG = re.compile( | |
| r"\b(not|no|never|n[\'']t|dont|don[\'']t|cannot|can[\'']t|wont|won[\'']t)" | |
| r"\s+\w*\s*(busy|free|available|talk|rush)", | |
| re.I, | |
| ) | |
| _BUSY_RE: List[re.Pattern] = [re.compile(p, re.I) for p in [ | |
| r"\b(i[\'']m|i am|im)\s+(busy|driving|working|cooking|rushing)\b", | |
| r"\bin a (meeting|call|hurry)\b", | |
| r"\bcan[\'']t talk\b", | |
| r"\bcall (you|me) back\b", | |
| r"\b(not a good|bad) time\b", | |
| ]] | |
| _FREE_RE: List[re.Pattern] = [re.compile(p, re.I) for p in [ | |
| r"\b(i[\'']m|i am|im)\s+(free|available)\b", | |
| r"\bcan talk\b", | |
| r"\bhave time\b", | |
| r"\bnot busy\b", | |
| r"\bgood time\b", | |
| r"\bnow works\b", | |
| r"\btell me (what you want|what you need|more)\b", | |
| r"\b(go ahead|fire away)\b", | |
| r"\b(yeah|yes),?\s*sure\b", | |
| r"\bsure,?\s*(what|go ahead|tell me)\b", | |
| r"\bi[\'']?m (listening|here)\b", | |
| r"\bwhat[\'']?s (on your mind|up)\b", | |
| ]] | |
| # Keyword sets for marker counts | |
| _KW_COGNITIVE = frozenset(["um", "uh", "like", "you know", "i mean", | |
| "kind of", "sort of", "basically", "actually"]) | |
| _KW_TIME = frozenset(["quickly", "hurry", "fast", "urgent", "asap", | |
| "right now", "immediately", "short", "brief"]) | |
| _KW_DEFLECT = frozenset(["later", "another time", "not now", "maybe", | |
| "i don't know", "whatever", "sure sure", "yeah yeah"]) | |
| # --------------------------------------------------------------------------- | |
| # Core helpers | |
| # --------------------------------------------------------------------------- | |
| def _pattern_busy_free(text: str) -> Tuple[float, float]: | |
| """ | |
| Fast regex-only decision. Returns (busy_score, free_score). | |
| Uses cached results β identical transcripts pay ~0 Β΅s. | |
| """ | |
| t = text.lower() | |
| neg = _NEG.search(t) | |
| if neg: | |
| m = neg.group(0) | |
| if any(w in m for w in ("busy", "rush")): | |
| return 0.0, 1.0 # "not busy" | |
| if any(w in m for w in ("free", "available", "talk")): | |
| return 1.0, 0.0 # "can't talk" | |
| if any(p.search(t) for p in _FREE_RE): | |
| return 0.0, 1.0 | |
| if any(p.search(t) for p in _BUSY_RE): | |
| return 1.0, 0.0 | |
| return -1.0, -1.0 # -1 = no pattern matched; caller should escalate | |
| def _nli_busy_free(text: str) -> Tuple[float, float]: | |
| """NLI call β only invoked when patterns give no signal.""" | |
| clf = _NLI_MODEL.get() | |
| result = clf( | |
| text[:256], # cap at 256 chars β ample for intent, halves latency | |
| candidate_labels=["person is busy or occupied", | |
| "person is free and available", | |
| "unclear or neutral"], | |
| hypothesis_template="This {}.", | |
| multi_label=False, | |
| ) | |
| top, score = result["labels"][0], result["scores"][0] | |
| if score > 0.55: | |
| if "busy" in top: | |
| return 1.0, 0.0 | |
| if "free" in top: | |
| return 0.0, 1.0 | |
| return 0.0, 0.0 | |
| # --------------------------------------------------------------------------- | |
| # Public API | |
| # --------------------------------------------------------------------------- | |
| class TextFeatureExtractor: | |
| """ | |
| Extract 9 text features for busy/distracted state detection. | |
| All model loading is lazy β importing this module has zero cost. | |
| Pass ``preload=True`` to warm all models at construction time | |
| (recommended for server deployments to avoid first-call latency spike). | |
| """ | |
| def __init__( | |
| self, | |
| use_intent_model: bool = True, | |
| marker_alpha: float = 1.0, | |
| marker_beta: float = 1.0, | |
| preload: bool = False, | |
| # coherence_model_name kept for API compat but ignored (always MiniLM) | |
| coherence_model_name: str = "all-MiniLM-L6-v2", | |
| ): | |
| self.use_intent_model = use_intent_model | |
| self.marker_alpha = float(marker_alpha) | |
| self.marker_beta = float(marker_beta) | |
| if preload: | |
| _ = _SENTIMENT_MODEL.get() | |
| _ = _EMBEDDER.get() | |
| if use_intent_model: | |
| _ = _NLI_MODEL.get() | |
| # ------------------------------------------------------------------ | |
| # T0 / T1 β Explicit free / busy | |
| # ------------------------------------------------------------------ | |
| def extract_explicit_busy(self, transcript: str) -> float: | |
| """T1: 1.0 if transcript signals busyness, else 0.0.""" | |
| if not transcript or len(transcript.strip()) < 3: | |
| return 0.0 | |
| busy, _free = _pattern_busy_free(transcript.strip()) | |
| if busy >= 0: # pattern gave a definitive answer | |
| return busy | |
| if self.use_intent_model: | |
| busy, _free = _nli_busy_free(transcript) | |
| return busy | |
| return 0.0 | |
| def extract_explicit_free(self, transcript: str) -> float: | |
| """T0: 1.0 if transcript signals availability, else 0.0.""" | |
| if not transcript or len(transcript.strip()) < 3: | |
| return 0.0 | |
| _busy, free = _pattern_busy_free(transcript.strip()) | |
| if free >= 0: | |
| return free | |
| if self.use_intent_model: | |
| _busy, free = _nli_busy_free(transcript) | |
| return free | |
| return 0.0 | |
| # ------------------------------------------------------------------ | |
| # T2 / T3 β Response patterns | |
| # ------------------------------------------------------------------ | |
| def extract_response_patterns(self, transcript_list: List[str]) -> Tuple[float, float]: | |
| """T2: avg word count per turn. T3: fraction of turns β€3 words.""" | |
| if not transcript_list: | |
| return 0.0, 0.0 | |
| wc = [len(r.split()) for r in transcript_list] | |
| short = sum(1 for w in wc if w <= 3) | |
| return float(np.mean(wc)), float(short / len(wc)) | |
| # ------------------------------------------------------------------ | |
| # T4 / T5 / T6 β Marker counts | |
| # ------------------------------------------------------------------ | |
| def extract_marker_counts(self, transcript: str) -> Tuple[float, float, float]: | |
| """T4: cognitive load. T5: time pressure. T6: deflection.""" | |
| if not transcript: | |
| return 0.0, 0.0, 0.0 | |
| t = transcript.lower() | |
| words = transcript.split() | |
| n = len(words) | |
| if n == 0: | |
| return 0.0, 0.0, 0.0 | |
| cog = sum(1 for kw in _KW_COGNITIVE if kw in t) | |
| time = sum(1 for kw in _KW_TIME if kw in t) | |
| defl = sum(1 for kw in _KW_DEFLECT if kw in t) | |
| return ( | |
| (cog + self.marker_alpha) / (n + self.marker_beta), | |
| time / n, | |
| defl / n, | |
| ) | |
| # ------------------------------------------------------------------ | |
| # T7 β Sentiment | |
| # ------------------------------------------------------------------ | |
| def extract_sentiment(self, transcript: str) -> float: | |
| """T7: sentiment polarity in [-1, +1].""" | |
| if not transcript or not transcript.strip(): | |
| return 0.0 | |
| try: | |
| result = _SENTIMENT_MODEL.get()(transcript[:256])[0] | |
| label, score = result["label"].lower(), result["score"] | |
| if "positive" in label: | |
| return float(score) | |
| if "negative" in label: | |
| return float(-score) | |
| return 0.0 | |
| except Exception: | |
| return 0.0 | |
| def extract_sentiment_batch(self, texts: List[str]) -> List[float]: | |
| """Batch variant β amortises tokenisation overhead across turns.""" | |
| if not texts: | |
| return [] | |
| capped = [t[:256] for t in texts if t and t.strip()] | |
| if not capped: | |
| return [0.0] * len(texts) | |
| try: | |
| results = _SENTIMENT_MODEL.get()(capped) | |
| out = [] | |
| for r in results: | |
| label, score = r["label"].lower(), r["score"] | |
| if "positive" in label: | |
| out.append(float(score)) | |
| elif "negative" in label: | |
| out.append(float(-score)) | |
| else: | |
| out.append(0.0) | |
| return out | |
| except Exception: | |
| return [0.0] * len(texts) | |
| # ------------------------------------------------------------------ | |
| # T8 β Coherence (batched cosine similarity β no cross-encoder needed) | |
| # ------------------------------------------------------------------ | |
| def extract_coherence(self, question: str, responses: List[str]) -> float: | |
| """ | |
| T8: cosine-similarity coherence in [0, 1]. | |
| Single forward pass for all responses β O(1) model calls. | |
| """ | |
| if not question or not responses: | |
| return 0.5 | |
| try: | |
| embedder = _EMBEDDER.get() | |
| # Encode question + all responses in one batched call | |
| all_texts = [question] + responses | |
| embeddings = embedder.encode( | |
| all_texts, | |
| convert_to_numpy=True, | |
| normalize_embeddings=True, # unit vectors β dot = cosine | |
| batch_size=32, | |
| show_progress_bar=False, | |
| ) | |
| q_emb = embeddings[0] | |
| r_emb = embeddings[1:] | |
| sims = r_emb @ q_emb # batched dot product (already normalised) | |
| return float(np.clip(np.mean(sims), 0.0, 1.0)) | |
| except Exception: | |
| return 0.5 | |
| # ------------------------------------------------------------------ | |
| # T9 β Latency (always 0 for single-side audio) | |
| # ------------------------------------------------------------------ | |
| def extract_latency(events=None) -> float: # noqa: ARG004 | |
| """T9: always 0.0 (single-side audio β no agent timestamps).""" | |
| return 0.0 | |
| # ------------------------------------------------------------------ | |
| # Combined extractor | |
| # ------------------------------------------------------------------ | |
| def extract_all( | |
| self, | |
| transcript_list: List[str], | |
| full_transcript: str = "", | |
| question: str = "", | |
| events=None, | |
| ) -> Dict[str, float]: | |
| """ | |
| Extract all 9 features in a single call. | |
| Args: | |
| transcript_list : Individual response turns (strings). | |
| full_transcript : Full concatenated text (auto-built if omitted). | |
| question : Agent's question, used for T8 coherence. | |
| events : Unused (kept for API compatibility). | |
| Returns: | |
| Dict[str, float] with keys t0_explicit_free β¦ t9_latency. | |
| """ | |
| if not full_transcript: | |
| full_transcript = " ".join(transcript_list) | |
| t = full_transcript.strip() | |
| # T0 / T1 β shared pattern call | |
| busy_pat, free_pat = _pattern_busy_free(t) if t else (-1.0, -1.0) | |
| if busy_pat < 0 and self.use_intent_model and t: | |
| busy_nli, free_nli = _nli_busy_free(t) | |
| else: | |
| busy_nli = busy_pat if busy_pat >= 0 else 0.0 | |
| free_nli = free_pat if free_pat >= 0 else 0.0 | |
| t0 = free_nli if free_pat < 0 else free_pat | |
| t1 = busy_nli if busy_pat < 0 else busy_pat | |
| # T2 / T3 | |
| t2, t3 = self.extract_response_patterns(transcript_list) | |
| # T4 / T5 / T6 | |
| t4, t5, t6 = self.extract_marker_counts(t) | |
| # T7 β use full transcript for sentiment | |
| t7 = self.extract_sentiment(t) | |
| # T8 β coherence | |
| t8 = self.extract_coherence(question, transcript_list) if question else 0.5 | |
| return { | |
| "t0_explicit_free" : float(t0), | |
| "t1_explicit_busy" : float(t1), | |
| "t2_avg_resp_len" : t2, | |
| "t3_short_ratio" : t3, | |
| "t4_cognitive_load": t4, | |
| "t5_time_pressure" : t5, | |
| "t6_deflection" : t6, | |
| "t7_sentiment" : t7, | |
| "t8_coherence" : t8, | |
| "t9_latency" : 0.0, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Quick smoke-test | |
| # --------------------------------------------------------------------------- | |
| if __name__ == "__main__": | |
| import time | |
| print("Initialising (lazy β no models loaded yet)...") | |
| extractor = TextFeatureExtractor(use_intent_model=True) | |
| tests = [ | |
| "I'm driving right now", | |
| "I'm not busy at all", | |
| "Can't talk, in a meeting", | |
| "I can talk now", | |
| "Not a good time", | |
| "I have time to chat", | |
| "Sure, go ahead", | |
| "Tell me what you need", | |
| ] | |
| print("\n--- Intent classification ---") | |
| for text in tests: | |
| t0 = time.perf_counter() | |
| busy = extractor.extract_explicit_busy(text) | |
| free = extractor.extract_explicit_free(text) | |
| ms = (time.perf_counter() - t0) * 1000 | |
| print(f" [{ms:5.1f}ms] '{text}' busy={busy:.0f} free={free:.0f}") | |
| print("\n--- Full feature extraction ---") | |
| t0 = time.perf_counter() | |
| features = extractor.extract_all( | |
| transcript_list=["I'm not busy", "I can talk now"], | |
| full_transcript="I'm not busy. I can talk now.", | |
| question="How are you doing today?", | |
| ) | |
| ms = (time.perf_counter() - t0) * 1000 | |
| print(f" Total: {ms:.1f} ms") | |
| for k, v in features.items(): | |
| print(f" {k}: {v:.3f}") |