| | """ |
| | regime.py — Market regime detection with ADX, volatility compression, |
| | distance-from-mean filter, and regime confidence scoring. |
| | |
| | Key fixes vs prior version: |
| | - STRUCTURE_LOOKBACK halved (10) to reduce entry lag |
| | - True ATR (not EWM-only) with percentile-based compression detection |
| | - ADX for objective trend strength (replaces pure HH/HL heuristic) |
| | - Regime confidence: composite of trend + structure + vol alignment |
| | - Distance-from-mean filter to avoid entering extended moves |
| | """ |
| |
|
| | from typing import Dict, Any |
| |
|
| | import numpy as np |
| | import pandas as pd |
| |
|
| | from config import ( |
| | ATR_PERIOD, |
| | STRUCTURE_LOOKBACK, |
| | STRUCTURE_CONFIRM_BARS, |
| | VOLATILITY_EXPANSION_MULT, |
| | VOLATILITY_CONTRACTION_MULT, |
| | VOL_COMPRESSION_LOOKBACK, |
| | VOL_COMPRESSION_PERCENTILE, |
| | VOL_EXPANSION_CONFIRM_MULT, |
| | ADX_PERIOD, |
| | ADX_TREND_THRESHOLD, |
| | ADX_STRONG_THRESHOLD, |
| | DIST_FROM_MEAN_MA, |
| | DIST_FROM_MEAN_ATR_MAX, |
| | REGIME_CONFIDENCE_MIN, |
| | ) |
| |
|
| |
|
| | def compute_atr(df: pd.DataFrame, period: int = ATR_PERIOD) -> pd.Series: |
| | high, low, prev_close = df["high"], df["low"], df["close"].shift(1) |
| | tr = pd.concat( |
| | [high - low, (high - prev_close).abs(), (low - prev_close).abs()], |
| | axis=1, |
| | ).max(axis=1) |
| | |
| | return tr.ewm(alpha=1.0 / period, adjust=False).mean() |
| |
|
| |
|
| | def compute_adx(df: pd.DataFrame, period: int = ADX_PERIOD) -> pd.DataFrame: |
| | """ |
| | Returns DataFrame with columns: adx, di_plus, di_minus. |
| | Uses Wilder smoothing throughout to match standard ADX definition. |
| | """ |
| | high, low, close = df["high"], df["low"], df["close"] |
| | prev_high = high.shift(1) |
| | prev_low = low.shift(1) |
| | prev_close = close.shift(1) |
| |
|
| | dm_plus = (high - prev_high).clip(lower=0) |
| | dm_minus = (prev_low - low).clip(lower=0) |
| | |
| | mask = dm_plus >= dm_minus |
| | dm_plus = dm_plus.where(mask, 0.0) |
| | dm_minus = dm_minus.where(~mask, 0.0) |
| |
|
| | tr = pd.concat( |
| | [high - low, (high - prev_close).abs(), (low - prev_close).abs()], |
| | axis=1, |
| | ).max(axis=1) |
| |
|
| | alpha = 1.0 / period |
| | atr_w = tr.ewm(alpha=alpha, adjust=False).mean() |
| | sdm_plus = dm_plus.ewm(alpha=alpha, adjust=False).mean() |
| | sdm_minus = dm_minus.ewm(alpha=alpha, adjust=False).mean() |
| |
|
| | di_plus = 100 * sdm_plus / atr_w.replace(0, np.nan) |
| | di_minus = 100 * sdm_minus / atr_w.replace(0, np.nan) |
| | dx = 100 * (di_plus - di_minus).abs() / (di_plus + di_minus).replace(0, np.nan) |
| | adx = dx.ewm(alpha=alpha, adjust=False).mean() |
| |
|
| | return pd.DataFrame({"adx": adx, "di_plus": di_plus, "di_minus": di_minus}) |
| |
|
| |
|
| | def compute_structure(df: pd.DataFrame, lookback: int = STRUCTURE_LOOKBACK) -> pd.Series: |
| | roll_high = df["high"].rolling(lookback).max() |
| | roll_low = df["low"].rolling(lookback).min() |
| | half = max(1, lookback // 2) |
| | prev_high = roll_high.shift(half) |
| | prev_low = roll_low.shift(half) |
| |
|
| | hh = roll_high > prev_high |
| | hl = roll_low > prev_low |
| | lh = roll_high < prev_high |
| | ll = roll_low < prev_low |
| |
|
| | structure = pd.Series(0, index=df.index) |
| | structure[hh & hl] = 1 |
| | structure[lh & ll] = -1 |
| | return structure |
| |
|
| |
|
| | def compute_volatility_compression( |
| | atr_series: pd.Series, |
| | lookback: int = VOL_COMPRESSION_LOOKBACK, |
| | percentile: float = VOL_COMPRESSION_PERCENTILE, |
| | ) -> pd.Series: |
| | """ |
| | Returns True where current ATR is below the Nth percentile of its |
| | recent history — i.e., volatility is compressed (coiled). |
| | """ |
| | rolling_pct = atr_series.rolling(lookback).quantile(percentile / 100.0) |
| | return atr_series < rolling_pct |
| |
|
| |
|
| | def compute_volatility_expanding_from_compression( |
| | atr_series: pd.Series, |
| | compressed_series: pd.Series, |
| | mult: float = VOL_EXPANSION_CONFIRM_MULT, |
| | lookback: int = 5, |
| | ) -> pd.Series: |
| | """ |
| | Returns True where ATR is now expanding (current > recent_min * mult) |
| | AND was compressed within the last `lookback` bars. |
| | Catches the precise moment of volatility breakout from a base. |
| | """ |
| | recent_min_atr = atr_series.rolling(lookback).min().shift(1) |
| | expanding = atr_series > recent_min_atr * mult |
| | was_compressed = compressed_series.shift(1).rolling(lookback).max().fillna(0) > 0 |
| | return expanding & was_compressed |
| |
|
| |
|
| | def compute_distance_from_mean( |
| | df: pd.DataFrame, |
| | atr_series: pd.Series, |
| | ma_period: int = DIST_FROM_MEAN_MA, |
| | atr_max: float = DIST_FROM_MEAN_ATR_MAX, |
| | ) -> pd.Series: |
| | """ |
| | Returns ATR-normalised distance of close from its SMA. |
| | Values > atr_max mean price is too extended for a fresh long entry. |
| | """ |
| | sma = df["close"].rolling(ma_period).mean() |
| | distance_atr = (df["close"] - sma) / atr_series.replace(0, np.nan) |
| | return distance_atr |
| |
|
| |
|
| | def classify_trend( |
| | structure_series: pd.Series, |
| | adx_df: pd.DataFrame, |
| | lookback: int = STRUCTURE_CONFIRM_BARS, |
| | ) -> str: |
| | recent_struct = structure_series.iloc[-lookback:] |
| | bullish = (recent_struct == 1).sum() |
| | bearish = (recent_struct == -1).sum() |
| |
|
| | adx_val = float(adx_df["adx"].iloc[-1]) if not np.isnan(adx_df["adx"].iloc[-1]) else 0.0 |
| | di_plus = float(adx_df["di_plus"].iloc[-1]) if not np.isnan(adx_df["di_plus"].iloc[-1]) else 0.0 |
| | di_minus = float(adx_df["di_minus"].iloc[-1]) if not np.isnan(adx_df["di_minus"].iloc[-1]) else 0.0 |
| |
|
| | adx_trending = adx_val >= ADX_TREND_THRESHOLD |
| |
|
| | if adx_trending and di_plus > di_minus and bullish >= max(1, lookback // 2): |
| | return "bullish" |
| | if adx_trending and di_minus > di_plus and bearish >= max(1, lookback // 2): |
| | return "bearish" |
| | return "ranging" |
| |
|
| |
|
| | def compute_regime_confidence( |
| | trend: str, |
| | adx_val: float, |
| | structure: int, |
| | vol_expanding_from_base: bool, |
| | vol_ratio: float, |
| | dist_atr: float, |
| | ) -> float: |
| | """ |
| | Composite confidence [0, 1] requiring alignment across: |
| | - ADX trend strength |
| | - Price structure |
| | - Volatility expanding from compression |
| | - Price not extended |
| | |
| | Low confidence = system holds off even if other scores look good. |
| | """ |
| | score = 0.0 |
| |
|
| | |
| | if adx_val >= ADX_STRONG_THRESHOLD: |
| | score += 0.35 |
| | elif adx_val >= ADX_TREND_THRESHOLD: |
| | score += 0.20 |
| | else: |
| | score += 0.05 |
| |
|
| | |
| | if trend == "bullish" and structure == 1: |
| | score += 0.25 |
| | elif trend == "bearish" and structure == -1: |
| | score += 0.25 |
| | elif structure == 0: |
| | score += 0.10 |
| | else: |
| | score += 0.0 |
| |
|
| | |
| | if vol_expanding_from_base: |
| | score += 0.25 |
| | elif 1.0 < vol_ratio < VOLATILITY_EXPANSION_MULT: |
| | score += 0.10 |
| | else: |
| | score += 0.0 |
| |
|
| | |
| | abs_dist = abs(dist_atr) if not np.isnan(dist_atr) else 0.0 |
| | if abs_dist < 1.0: |
| | score += 0.15 |
| | elif abs_dist < DIST_FROM_MEAN_ATR_MAX: |
| | score += 0.07 |
| | else: |
| | score += 0.0 |
| |
|
| | return float(np.clip(score, 0.0, 1.0)) |
| |
|
| |
|
| | def detect_regime(df: pd.DataFrame) -> Dict[str, Any]: |
| | atr_series = compute_atr(df, ATR_PERIOD) |
| | adx_df = compute_adx(df, ADX_PERIOD) |
| | structure_series = compute_structure(df, STRUCTURE_LOOKBACK) |
| | compressed_series = compute_volatility_compression(atr_series) |
| | expanding_from_base = compute_volatility_expanding_from_compression( |
| | atr_series, compressed_series |
| | ) |
| | dist_atr_series = compute_distance_from_mean(df, atr_series) |
| |
|
| | last_atr = float(atr_series.iloc[-1]) |
| | last_close = float(df["close"].iloc[-1]) |
| | last_structure = int(structure_series.iloc[-1]) |
| | last_adx = float(adx_df["adx"].iloc[-1]) if not np.isnan(adx_df["adx"].iloc[-1]) else 0.0 |
| | last_di_plus = float(adx_df["di_plus"].iloc[-1]) if not np.isnan(adx_df["di_plus"].iloc[-1]) else 0.0 |
| | last_di_minus = float(adx_df["di_minus"].iloc[-1]) if not np.isnan(adx_df["di_minus"].iloc[-1]) else 0.0 |
| | last_compressed = bool(compressed_series.iloc[-1]) |
| | last_expanding_from_base = bool(expanding_from_base.iloc[-1]) |
| | last_dist_atr = float(dist_atr_series.iloc[-1]) if not np.isnan(dist_atr_series.iloc[-1]) else 0.0 |
| |
|
| | atr_ma = atr_series.rolling(ATR_PERIOD * 2).mean() |
| | last_atr_ma = float(atr_ma.iloc[-1]) if not np.isnan(atr_ma.iloc[-1]) else last_atr |
| | vol_ratio = last_atr / last_atr_ma if last_atr_ma > 0 else 1.0 |
| | vol_expanding = vol_ratio > VOLATILITY_EXPANSION_MULT |
| | vol_contracting = vol_ratio < VOLATILITY_CONTRACTION_MULT |
| | atr_pct = last_atr / last_close if last_close > 0 else 0.0 |
| |
|
| | trend = classify_trend(structure_series, adx_df, STRUCTURE_CONFIRM_BARS) |
| |
|
| | price_too_extended_long = last_dist_atr > DIST_FROM_MEAN_ATR_MAX |
| | price_too_extended_short = last_dist_atr < -DIST_FROM_MEAN_ATR_MAX |
| |
|
| | regime_confidence = compute_regime_confidence( |
| | trend=trend, |
| | adx_val=last_adx, |
| | structure=last_structure, |
| | vol_expanding_from_base=last_expanding_from_base, |
| | vol_ratio=vol_ratio, |
| | dist_atr=last_dist_atr, |
| | ) |
| |
|
| | |
| | if trend == "bullish" and not vol_expanding: |
| | regime_score = 1.0 |
| | elif trend == "bullish" and vol_expanding: |
| | regime_score = 0.55 |
| | elif trend == "ranging": |
| | regime_score = 0.25 |
| | elif trend == "bearish" and not vol_expanding: |
| | regime_score = 0.15 |
| | else: |
| | regime_score = 0.05 |
| |
|
| | if last_adx >= ADX_STRONG_THRESHOLD: |
| | regime_score = min(1.0, regime_score + 0.1) |
| | elif last_adx < ADX_TREND_THRESHOLD: |
| | regime_score = max(0.0, regime_score - 0.15) |
| |
|
| | if last_structure == 1: |
| | regime_score = min(1.0, regime_score + 0.1) |
| | elif last_structure == -1: |
| | regime_score = max(0.0, regime_score - 0.1) |
| |
|
| | atr_ma_20 = atr_series.rolling(20).mean().iloc[-1] |
| | atr_ma_50 = atr_series.rolling(50).mean().iloc[-1] if len(df) >= 50 else atr_ma_20 |
| | atr_trend_dir = "rising" if atr_ma_20 > atr_ma_50 else "falling" |
| |
|
| | return { |
| | "atr": last_atr, |
| | "atr_pct": atr_pct, |
| | "atr_pct_pct": round(atr_pct * 100, 3), |
| | "structure": last_structure, |
| | "trend": trend, |
| | "vol_ratio": round(vol_ratio, 3), |
| | "vol_expanding": vol_expanding, |
| | "vol_contracting": vol_contracting, |
| | "vol_compressed": last_compressed, |
| | "vol_expanding_from_base": last_expanding_from_base, |
| | "adx": round(last_adx, 2), |
| | "di_plus": round(last_di_plus, 2), |
| | "di_minus": round(last_di_minus, 2), |
| | "dist_atr": round(last_dist_atr, 3), |
| | "price_extended_long": price_too_extended_long, |
| | "price_extended_short": price_too_extended_short, |
| | "regime_confidence": round(regime_confidence, 4), |
| | "regime_score": round(float(np.clip(regime_score, 0.0, 1.0)), 4), |
| | "atr_trend": atr_trend_dir, |
| | "atr_series": atr_series, |
| | "structure_series": structure_series, |
| | "adx_series": adx_df, |
| | "compressed_series": compressed_series, |
| | "dist_atr_series": dist_atr_series, |
| | } |
| |
|