| from __future__ import annotations |
|
|
| from typing import Any, Dict, Optional |
|
|
| from src.coherence.thresholds import AdaptiveThresholds |
|
|
|
|
| def detect_drift( |
| msci: Optional[float], |
| st_i: Optional[float], |
| st_a: Optional[float], |
| si_a: Optional[float] = None, |
| ) -> Dict[str, Any]: |
| """ |
| Drift logic (Phase-3C): |
| - global_drift is True if msci is FAIL OR >=2 metrics are FAIL. |
| - includes drift_score (fail_count / metrics_count) for debugging. |
| """ |
| drift: Dict[str, Any] = { |
| "visual_drift": False, |
| "audio_drift": False, |
| "global_drift": False, |
| } |
|
|
| if st_i is not None and st_a is not None and st_i + 0.15 < st_a: |
| drift["visual_drift"] = True |
| if st_i is not None and st_a is not None and st_a + 0.15 < st_i: |
| drift["audio_drift"] = True |
|
|
| metrics: Dict[str, float] = {} |
| if msci is not None: |
| metrics["msci"] = msci |
| if st_i is not None: |
| metrics["st_i"] = st_i |
| if st_a is not None: |
| metrics["st_a"] = st_a |
| if si_a is not None: |
| metrics["si_a"] = si_a |
|
|
| try: |
| if metrics: |
| thresholds = AdaptiveThresholds() |
| statuses = {k: thresholds.classify_value(k, v) for k, v in metrics.items()} |
| fail_count = sum(1 for s in statuses.values() if s == "FAIL") |
| msci_fail = statuses.get("msci") == "FAIL" |
| drift["global_drift"] = bool(msci_fail or fail_count >= 2) |
| drift["global_drift_score"] = float(fail_count / max(len(statuses), 1)) |
| drift["fail_count"] = fail_count |
| drift["msci_fail"] = msci_fail |
| drift["status"] = statuses |
| else: |
| drift["global_drift"] = False |
| drift["global_drift_score"] = 0.0 |
| drift["fail_count"] = 0 |
| drift["msci_fail"] = False |
| drift["status"] = {} |
| except Exception: |
| if msci is not None: |
| drift["global_drift"] = msci < 0.35 |
| drift["global_drift_score"] = 1.0 if drift["global_drift"] else 0.0 |
| else: |
| drift["global_drift"] = False |
| drift["global_drift_score"] = 0.0 |
|
|
| return drift |
|
|