Spaces:
Running
Running
| """ | |
| analytics/execution_layer.py | |
| Tier 5A — Execution Layer (Alpha Release) | |
| Post-model enrichment pass operating exclusively on already-computed outputs | |
| (model probs + book odds). No simulation logic, no probability calculations, | |
| no model changes. | |
| Entry point: enrich_with_execution_layer(df) → df with execution fields added. | |
| """ | |
| from __future__ import annotations | |
| import statistics | |
| from typing import Any | |
| import pandas as pd | |
| from analytics.no_vig_props import american_to_implied_prob | |
| # --------------------------------------------------------------------------- | |
| # Thresholds | |
| # --------------------------------------------------------------------------- | |
| OUTLIER_THRESHOLD = 0.03 # 3pp deviation from median → outlier | |
| STALE_THRESHOLD = 0.025 # 2.5pp worse than median → stale book | |
| AGGRESSIVE_THRESHOLD = 0.02 # 2pp better than median → aggressive/timing flag | |
| _TIMESTAMP_KEYS = ("last_update", "timestamp", "odds_timestamp", "updated_at") | |
| # --------------------------------------------------------------------------- | |
| # Helpers | |
| # --------------------------------------------------------------------------- | |
| def _safe_float(val: Any, default: float | None = None) -> float | None: | |
| if val is None: | |
| return default | |
| try: | |
| return float(val) | |
| except (TypeError, ValueError): | |
| return default | |
| def _safe_implied(odds: Any) -> float | None: | |
| if odds is None: | |
| return None | |
| try: | |
| return american_to_implied_prob(odds) | |
| except Exception: | |
| return None | |
| def _make_player_game_key(row: pd.Series) -> str: | |
| explicit_key = str(row.get("player_event_market_key") or "").strip() | |
| if explicit_key and explicit_key not in ("nan", "None", ""): | |
| return explicit_key | |
| event_id = str(row.get("event_id") or "").strip() | |
| player_name = str(row.get("player_name") or "").strip() | |
| market_family = str(row.get("market_family") or row.get("market") or "").strip() | |
| threshold = str(row.get("threshold") or "").strip() | |
| if event_id and event_id not in ("nan", "None", ""): | |
| return f"{event_id}|{player_name}|{market_family}|{threshold}" | |
| away = str(row.get("away_team") or "").strip() | |
| home = str(row.get("home_team") or "").strip() | |
| return f"{away}|{home}|{player_name}|{market_family}|{threshold}" | |
| def _make_game_key(row: pd.Series) -> str: | |
| event_id = str(row.get("event_id") or "").strip() | |
| if event_id and event_id not in ("nan", "None", ""): | |
| return event_id | |
| away = str(row.get("away_team") or "").strip() | |
| home = str(row.get("home_team") or "").strip() | |
| return f"{away}_{home}" | |
| # --------------------------------------------------------------------------- | |
| # Task 1 — Market Disagreement | |
| # --------------------------------------------------------------------------- | |
| def _compute_market_fields(df: pd.DataFrame) -> pd.DataFrame: | |
| """Add best_price, median_price, market_width, market_outlier_flag, stale_book_flag.""" | |
| df = df.copy() | |
| # Build scoped player-game keys | |
| keys = df.apply(_make_player_game_key, axis=1) | |
| df["_pg_key"] = keys | |
| # Pre-compute implied probs for each row | |
| df["_implied"] = df["odds_american"].apply(_safe_implied) | |
| # Group stats per scoped player-game key | |
| group_stats: dict[str, dict] = {} | |
| for key, grp in df.groupby("_pg_key"): | |
| implied_vals = [v for v in grp["_implied"].tolist() if v is not None] | |
| if not implied_vals: | |
| group_stats[key] = { | |
| "best": None, "worst": None, "median": None, "width": None | |
| } | |
| continue | |
| best = min(implied_vals) # lowest implied = best for bettor | |
| worst = max(implied_vals) | |
| med = statistics.median(implied_vals) | |
| width = abs(worst - best) | |
| group_stats[key] = {"best": best, "worst": worst, "median": med, "width": width} | |
| best_prices: list[float | None] = [] | |
| median_prices: list[float | None] = [] | |
| market_widths: list[float | None] = [] | |
| outlier_flags: list[bool] = [] | |
| stale_flags: list[bool] = [] | |
| for _, row in df.iterrows(): | |
| key = row["_pg_key"] | |
| stats = group_stats.get(key, {}) | |
| this_implied = row["_implied"] | |
| best_prices.append(stats.get("best")) | |
| median_prices.append(stats.get("median")) | |
| market_widths.append(stats.get("width")) | |
| med = stats.get("median") | |
| if this_implied is not None and med is not None: | |
| outlier_flags.append(abs(this_implied - med) > OUTLIER_THRESHOLD) | |
| stale_flags.append((this_implied - med) > STALE_THRESHOLD) | |
| else: | |
| outlier_flags.append(False) | |
| stale_flags.append(False) | |
| df["best_price"] = best_prices | |
| df["median_price"] = median_prices | |
| df["market_width"] = market_widths | |
| df["market_outlier_flag"] = outlier_flags | |
| df["stale_book_flag"] = stale_flags | |
| df.drop(columns=["_pg_key", "_implied"], inplace=True) | |
| return df | |
| # --------------------------------------------------------------------------- | |
| # Task 2 — Edge Quality Filters | |
| # --------------------------------------------------------------------------- | |
| def _compute_edge_quality(df: pd.DataFrame) -> pd.DataFrame: | |
| """Add execution_confidence_score, execution_volatility_score, execution_signal_strength_score, | |
| edge_raw, edge_filtered, edge_filter_flags.""" | |
| df = df.copy() | |
| conf_scores: list[float] = [] | |
| vol_scores: list[float] = [] | |
| sig_scores: list[float] = [] | |
| edge_raws: list[float | None] = [] | |
| edge_filtered_vals: list[float | None] = [] | |
| edge_flag_strs: list[str] = [] | |
| for _, row in df.iterrows(): | |
| source = str(row.get("model_hr_prob_source") or "unavailable") | |
| context_applied = bool(row.get("pregame_context_applied") or False) | |
| edge_raw = _safe_float(row.get("edge")) | |
| market_width = _safe_float(row.get("market_width"), default=0.0) | |
| # Context adj magnitude | |
| pitcher_adj = _safe_float(row.get("pregame_pitcher_context_adj"), default=0.0) | |
| park_adj = _safe_float(row.get("pregame_park_context_adj"), default=0.0) | |
| context_mag = abs(pitcher_adj or 0.0) + abs(park_adj or 0.0) | |
| # Confidence score | |
| if source == "internal_model_baseline": | |
| conf = 1.0 if context_applied else 0.7 | |
| elif source == "shared_pregame_engine": | |
| conf = 0.95 if context_applied else 0.80 | |
| else: | |
| conf = 0.3 | |
| # Volatility score (weighted blend, range [0, 1]) | |
| width_component = min(1.0, (market_width or 0.0) / 0.10) | |
| ctx_component = min(1.0, context_mag / 0.02) if context_mag > 0 else 0.0 | |
| vol = 0.7 * width_component + 0.3 * ctx_component | |
| # Signal strength score | |
| if source == "internal_model_baseline": | |
| sig = 0.7 + (0.3 if context_applied else 0.0) | |
| elif source == "shared_pregame_engine": | |
| sig = 0.85 + (0.15 if context_applied else 0.0) | |
| else: | |
| sig = 0.1 | |
| sig = min(1.0, sig) | |
| # Edge filtered + flags | |
| if edge_raw is None: | |
| edge_filt = None | |
| flags = "clean" | |
| else: | |
| edge_filt = edge_raw | |
| applied: list[str] = [] | |
| # Confidence penalty | |
| if conf < 0.5: | |
| scale = conf / 0.5 | |
| edge_filt = edge_filt * scale | |
| applied.append("conf_penalty") | |
| # Volatility penalty | |
| vol_pen = min(0.02, vol * 0.02) | |
| if vol_pen > 0: | |
| edge_filt = edge_filt - vol_pen | |
| applied.append("vol_penalty") | |
| # Weak signal suppression | |
| if sig < 0.3: | |
| edge_filt = edge_filt * 0.5 | |
| applied.append("weak_signal") | |
| flags = ",".join(applied) if applied else "clean" | |
| conf_scores.append(conf) | |
| vol_scores.append(vol) | |
| sig_scores.append(sig) | |
| edge_raws.append(edge_raw) | |
| edge_filtered_vals.append(edge_filt) | |
| edge_flag_strs.append(flags) | |
| df["execution_confidence_score"] = conf_scores | |
| df["execution_volatility_score"] = vol_scores | |
| df["execution_signal_strength_score"] = sig_scores | |
| df["edge_raw"] = edge_raws | |
| df["edge_filtered"] = edge_filtered_vals | |
| df["edge_filter_flags"] = edge_flag_strs | |
| return df | |
| # --------------------------------------------------------------------------- | |
| # Task 3 — Timing Heuristics | |
| # --------------------------------------------------------------------------- | |
| def _compute_timing_fields(df: pd.DataFrame) -> pd.DataFrame: | |
| """Add timing_flag, timing_reason.""" | |
| df = df.copy() | |
| timing_flags: list[bool] = [] | |
| timing_reasons: list[str] = [] | |
| for _, row in df.iterrows(): | |
| reasons: list[str] = [] | |
| # Aggressive price: this book > 2pp better than median (lower implied) | |
| this_implied = _safe_implied(row.get("odds_american")) | |
| median_price = _safe_float(row.get("median_price")) | |
| if ( | |
| this_implied is not None | |
| and median_price is not None | |
| and (median_price - this_implied) > AGGRESSIVE_THRESHOLD | |
| ): | |
| reasons.append("aggressive_price") | |
| # Timestamp presence | |
| has_ts = any( | |
| row.get(k) is not None and str(row.get(k)).strip() not in ("", "nan", "None") | |
| for k in _TIMESTAMP_KEYS | |
| ) | |
| if has_ts: | |
| reasons.append("has_timestamp") | |
| if not reasons: | |
| reasons.append("none") | |
| timing_flags.append(len(reasons) > 1 or (len(reasons) == 1 and reasons[0] != "none")) | |
| timing_reasons.append(",".join(reasons)) | |
| df["timing_flag"] = timing_flags | |
| df["timing_reason"] = timing_reasons | |
| return df | |
| # --------------------------------------------------------------------------- | |
| # Task 4 — Correlation Awareness | |
| # --------------------------------------------------------------------------- | |
| def _compute_correlation_fields(df: pd.DataFrame) -> pd.DataFrame: | |
| """Add correlation_flag, correlation_direction.""" | |
| df = df.copy() | |
| # Count distinct players per game | |
| game_keys = df.apply(_make_game_key, axis=1) | |
| df["_game_key"] = game_keys | |
| player_counts: dict[str, int] = {} | |
| for key, grp in df.groupby("_game_key"): | |
| player_counts[key] = grp["player_name"].nunique() | |
| corr_directions: list[str] = [] | |
| for _, row in df.iterrows(): | |
| key = row["_game_key"] | |
| count = player_counts.get(key, 1) | |
| corr_directions.append("positive_stacked" if count > 2 else "positive") | |
| df["correlation_flag"] = True # always True for HR props | |
| df["correlation_direction"] = corr_directions | |
| df.drop(columns=["_game_key"], inplace=True) | |
| return df | |
| # --------------------------------------------------------------------------- | |
| # Task 5 — Final Execution Score | |
| # --------------------------------------------------------------------------- | |
| def _compute_execution_score(df: pd.DataFrame) -> pd.DataFrame: | |
| """Add final_recommendation_score.""" | |
| df = df.copy() | |
| scores: list[float | None] = [] | |
| for _, row in df.iterrows(): | |
| edge_filtered = _safe_float(row.get("edge_filtered")) | |
| if edge_filtered is None: | |
| scores.append(None) | |
| continue | |
| confidence_score = _safe_float(row.get("execution_confidence_score"), default=0.3) | |
| volatility_score = _safe_float(row.get("execution_volatility_score"), default=0.0) | |
| market_width = _safe_float(row.get("market_width"), default=0.0) | |
| timing_flag = bool(row.get("timing_flag") or False) | |
| base = edge_filtered * (0.4 + (confidence_score or 0.0) * 0.6) | |
| vol_penalty = min(0.015, (volatility_score or 0.0) * 0.015) | |
| market_bonus = min(0.01, max(0.0, 0.01 - (market_width or 0.0) * 0.5)) | |
| timing_bonus = 0.005 if timing_flag else 0.0 | |
| score = base - vol_penalty + market_bonus + timing_bonus | |
| score = max(-0.30, min(0.30, score)) | |
| scores.append(score) | |
| df["final_recommendation_score"] = scores | |
| return df | |
| # --------------------------------------------------------------------------- | |
| # Public entry point | |
| # --------------------------------------------------------------------------- | |
| def enrich_with_execution_layer(df: pd.DataFrame) -> pd.DataFrame: | |
| """ | |
| Run all five execution-layer passes on the mapped props DataFrame. | |
| Passes (in order): | |
| 1. Market Disagreement — best_price, median_price, market_width, flags | |
| 2. Edge Quality — execution confidence, volatility, signal, edge_filtered | |
| 3. Timing Heuristics — timing_flag, timing_reason | |
| 4. Correlation — correlation_flag, correlation_direction | |
| 5. Execution Score — final_recommendation_score | |
| Returns the enriched DataFrame. Does not modify simulation logic or | |
| model probabilities. | |
| """ | |
| if df.empty: | |
| return df | |
| df = _compute_market_fields(df) | |
| df = _compute_edge_quality(df) | |
| df = _compute_timing_fields(df) | |
| df = _compute_correlation_fields(df) | |
| df = _compute_execution_score(df) | |
| return df | |