Goshawk_Hedge_Pro / regime.py
GoshawkVortexAI's picture
Update regime.py
1cdd6ba verified
"""
regime.py — Market regime detection with ADX, volatility compression,
distance-from-mean filter, and regime confidence scoring.
Key fixes vs prior version:
- STRUCTURE_LOOKBACK halved (10) to reduce entry lag
- True ATR (not EWM-only) with percentile-based compression detection
- ADX for objective trend strength (replaces pure HH/HL heuristic)
- Regime confidence: composite of trend + structure + vol alignment
- Distance-from-mean filter to avoid entering extended moves
"""
from typing import Dict, Any
import numpy as np
import pandas as pd
from config import (
ATR_PERIOD,
STRUCTURE_LOOKBACK,
STRUCTURE_CONFIRM_BARS,
VOLATILITY_EXPANSION_MULT,
VOLATILITY_CONTRACTION_MULT,
VOL_COMPRESSION_LOOKBACK,
VOL_COMPRESSION_PERCENTILE,
VOL_EXPANSION_CONFIRM_MULT,
ADX_PERIOD,
ADX_TREND_THRESHOLD,
ADX_STRONG_THRESHOLD,
DIST_FROM_MEAN_MA,
DIST_FROM_MEAN_ATR_MAX,
REGIME_CONFIDENCE_MIN,
)
def compute_atr(df: pd.DataFrame, period: int = ATR_PERIOD) -> pd.Series:
high, low, prev_close = df["high"], df["low"], df["close"].shift(1)
tr = pd.concat(
[high - low, (high - prev_close).abs(), (low - prev_close).abs()],
axis=1,
).max(axis=1)
# Use Wilder's smoothing (RMA) — matches TradingView / industry standard
return tr.ewm(alpha=1.0 / period, adjust=False).mean()
def compute_adx(df: pd.DataFrame, period: int = ADX_PERIOD) -> pd.DataFrame:
"""
Returns DataFrame with columns: adx, di_plus, di_minus.
Uses Wilder smoothing throughout to match standard ADX definition.
"""
high, low, close = df["high"], df["low"], df["close"]
prev_high = high.shift(1)
prev_low = low.shift(1)
prev_close = close.shift(1)
dm_plus = (high - prev_high).clip(lower=0)
dm_minus = (prev_low - low).clip(lower=0)
# Zero out when the other direction is larger
mask = dm_plus >= dm_minus
dm_plus = dm_plus.where(mask, 0.0)
dm_minus = dm_minus.where(~mask, 0.0)
tr = pd.concat(
[high - low, (high - prev_close).abs(), (low - prev_close).abs()],
axis=1,
).max(axis=1)
alpha = 1.0 / period
atr_w = tr.ewm(alpha=alpha, adjust=False).mean()
sdm_plus = dm_plus.ewm(alpha=alpha, adjust=False).mean()
sdm_minus = dm_minus.ewm(alpha=alpha, adjust=False).mean()
di_plus = 100 * sdm_plus / atr_w.replace(0, np.nan)
di_minus = 100 * sdm_minus / atr_w.replace(0, np.nan)
dx = 100 * (di_plus - di_minus).abs() / (di_plus + di_minus).replace(0, np.nan)
adx = dx.ewm(alpha=alpha, adjust=False).mean()
return pd.DataFrame({"adx": adx, "di_plus": di_plus, "di_minus": di_minus})
def compute_structure(df: pd.DataFrame, lookback: int = STRUCTURE_LOOKBACK) -> pd.Series:
roll_high = df["high"].rolling(lookback).max()
roll_low = df["low"].rolling(lookback).min()
half = max(1, lookback // 2)
prev_high = roll_high.shift(half)
prev_low = roll_low.shift(half)
hh = roll_high > prev_high
hl = roll_low > prev_low
lh = roll_high < prev_high
ll = roll_low < prev_low
structure = pd.Series(0, index=df.index)
structure[hh & hl] = 1
structure[lh & ll] = -1
return structure
def compute_volatility_compression(
atr_series: pd.Series,
lookback: int = VOL_COMPRESSION_LOOKBACK,
percentile: float = VOL_COMPRESSION_PERCENTILE,
) -> pd.Series:
"""
Returns True where current ATR is below the Nth percentile of its
recent history — i.e., volatility is compressed (coiled).
"""
rolling_pct = atr_series.rolling(lookback).quantile(percentile / 100.0)
return atr_series < rolling_pct
def compute_volatility_expanding_from_compression(
atr_series: pd.Series,
compressed_series: pd.Series,
mult: float = VOL_EXPANSION_CONFIRM_MULT,
lookback: int = 5,
) -> pd.Series:
"""
Returns True where ATR is now expanding (current > recent_min * mult)
AND was compressed within the last `lookback` bars.
Catches the precise moment of volatility breakout from a base.
"""
recent_min_atr = atr_series.rolling(lookback).min().shift(1)
expanding = atr_series > recent_min_atr * mult
was_compressed = compressed_series.shift(1).rolling(lookback).max().fillna(0) > 0
return expanding & was_compressed
def compute_distance_from_mean(
df: pd.DataFrame,
atr_series: pd.Series,
ma_period: int = DIST_FROM_MEAN_MA,
atr_max: float = DIST_FROM_MEAN_ATR_MAX,
) -> pd.Series:
"""
Returns ATR-normalised distance of close from its SMA.
Values > atr_max mean price is too extended for a fresh long entry.
"""
sma = df["close"].rolling(ma_period).mean()
distance_atr = (df["close"] - sma) / atr_series.replace(0, np.nan)
return distance_atr
def classify_trend(
structure_series: pd.Series,
adx_df: pd.DataFrame,
lookback: int = STRUCTURE_CONFIRM_BARS,
) -> str:
recent_struct = structure_series.iloc[-lookback:]
bullish = (recent_struct == 1).sum()
bearish = (recent_struct == -1).sum()
adx_val = float(adx_df["adx"].iloc[-1]) if not np.isnan(adx_df["adx"].iloc[-1]) else 0.0
di_plus = float(adx_df["di_plus"].iloc[-1]) if not np.isnan(adx_df["di_plus"].iloc[-1]) else 0.0
di_minus = float(adx_df["di_minus"].iloc[-1]) if not np.isnan(adx_df["di_minus"].iloc[-1]) else 0.0
adx_trending = adx_val >= ADX_TREND_THRESHOLD
if adx_trending and di_plus > di_minus and bullish >= max(1, lookback // 2):
return "bullish"
if adx_trending and di_minus > di_plus and bearish >= max(1, lookback // 2):
return "bearish"
return "ranging"
def compute_regime_confidence(
trend: str,
adx_val: float,
structure: int,
vol_expanding_from_base: bool,
vol_ratio: float,
dist_atr: float,
) -> float:
"""
Composite confidence [0, 1] requiring alignment across:
- ADX trend strength
- Price structure
- Volatility expanding from compression
- Price not extended
Low confidence = system holds off even if other scores look good.
"""
score = 0.0
# ADX contribution (0 to 0.35)
if adx_val >= ADX_STRONG_THRESHOLD:
score += 0.35
elif adx_val >= ADX_TREND_THRESHOLD:
score += 0.20
else:
score += 0.05
# Structure alignment (0 to 0.25)
if trend == "bullish" and structure == 1:
score += 0.25
elif trend == "bearish" and structure == -1:
score += 0.25
elif structure == 0:
score += 0.10
else:
score += 0.0
# Volatility expanding from base (0 to 0.25)
if vol_expanding_from_base:
score += 0.25
elif 1.0 < vol_ratio < VOLATILITY_EXPANSION_MULT:
score += 0.10
else:
score += 0.0
# Price not extended (0 to 0.15)
abs_dist = abs(dist_atr) if not np.isnan(dist_atr) else 0.0
if abs_dist < 1.0:
score += 0.15
elif abs_dist < DIST_FROM_MEAN_ATR_MAX:
score += 0.07
else:
score += 0.0
return float(np.clip(score, 0.0, 1.0))
def detect_regime(df: pd.DataFrame) -> Dict[str, Any]:
atr_series = compute_atr(df, ATR_PERIOD)
adx_df = compute_adx(df, ADX_PERIOD)
structure_series = compute_structure(df, STRUCTURE_LOOKBACK)
compressed_series = compute_volatility_compression(atr_series)
expanding_from_base = compute_volatility_expanding_from_compression(
atr_series, compressed_series
)
dist_atr_series = compute_distance_from_mean(df, atr_series)
last_atr = float(atr_series.iloc[-1])
last_close = float(df["close"].iloc[-1])
last_structure = int(structure_series.iloc[-1])
last_adx = float(adx_df["adx"].iloc[-1]) if not np.isnan(adx_df["adx"].iloc[-1]) else 0.0
last_di_plus = float(adx_df["di_plus"].iloc[-1]) if not np.isnan(adx_df["di_plus"].iloc[-1]) else 0.0
last_di_minus = float(adx_df["di_minus"].iloc[-1]) if not np.isnan(adx_df["di_minus"].iloc[-1]) else 0.0
last_compressed = bool(compressed_series.iloc[-1])
last_expanding_from_base = bool(expanding_from_base.iloc[-1])
last_dist_atr = float(dist_atr_series.iloc[-1]) if not np.isnan(dist_atr_series.iloc[-1]) else 0.0
atr_ma = atr_series.rolling(ATR_PERIOD * 2).mean()
last_atr_ma = float(atr_ma.iloc[-1]) if not np.isnan(atr_ma.iloc[-1]) else last_atr
vol_ratio = last_atr / last_atr_ma if last_atr_ma > 0 else 1.0
vol_expanding = vol_ratio > VOLATILITY_EXPANSION_MULT
vol_contracting = vol_ratio < VOLATILITY_CONTRACTION_MULT
atr_pct = last_atr / last_close if last_close > 0 else 0.0
trend = classify_trend(structure_series, adx_df, STRUCTURE_CONFIRM_BARS)
price_too_extended_long = last_dist_atr > DIST_FROM_MEAN_ATR_MAX
price_too_extended_short = last_dist_atr < -DIST_FROM_MEAN_ATR_MAX
regime_confidence = compute_regime_confidence(
trend=trend,
adx_val=last_adx,
structure=last_structure,
vol_expanding_from_base=last_expanding_from_base,
vol_ratio=vol_ratio,
dist_atr=last_dist_atr,
)
# Regime score: raw directional quality
if trend == "bullish" and not vol_expanding:
regime_score = 1.0
elif trend == "bullish" and vol_expanding:
regime_score = 0.55
elif trend == "ranging":
regime_score = 0.25
elif trend == "bearish" and not vol_expanding:
regime_score = 0.15
else:
regime_score = 0.05
if last_adx >= ADX_STRONG_THRESHOLD:
regime_score = min(1.0, regime_score + 0.1)
elif last_adx < ADX_TREND_THRESHOLD:
regime_score = max(0.0, regime_score - 0.15)
if last_structure == 1:
regime_score = min(1.0, regime_score + 0.1)
elif last_structure == -1:
regime_score = max(0.0, regime_score - 0.1)
atr_ma_20 = atr_series.rolling(20).mean().iloc[-1]
atr_ma_50 = atr_series.rolling(50).mean().iloc[-1] if len(df) >= 50 else atr_ma_20
atr_trend_dir = "rising" if atr_ma_20 > atr_ma_50 else "falling"
return {
"atr": last_atr,
"atr_pct": atr_pct,
"atr_pct_pct": round(atr_pct * 100, 3),
"structure": last_structure,
"trend": trend,
"vol_ratio": round(vol_ratio, 3),
"vol_expanding": vol_expanding,
"vol_contracting": vol_contracting,
"vol_compressed": last_compressed,
"vol_expanding_from_base": last_expanding_from_base,
"adx": round(last_adx, 2),
"di_plus": round(last_di_plus, 2),
"di_minus": round(last_di_minus, 2),
"dist_atr": round(last_dist_atr, 3),
"price_extended_long": price_too_extended_long,
"price_extended_short": price_too_extended_short,
"regime_confidence": round(regime_confidence, 4),
"regime_score": round(float(np.clip(regime_score, 0.0, 1.0)), 4),
"atr_trend": atr_trend_dir,
"atr_series": atr_series,
"structure_series": structure_series,
"adx_series": adx_df,
"compressed_series": compressed_series,
"dist_atr_series": dist_atr_series,
}