Upload regime_detector.py
Browse files- regime_detector.py +80 -0
regime_detector.py
ADDED
|
@@ -0,0 +1,80 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
"""Regime Detection with Hidden Markov Model + Strategy Switching."""
|
| 2 |
+
import numpy as np
|
| 3 |
+
import pandas as pd
|
| 4 |
+
from sklearn.mixture import GaussianMixture
|
| 5 |
+
from typing import Dict, List, Optional
|
| 6 |
+
import warnings
|
| 7 |
+
warnings.filterwarnings('ignore')
|
| 8 |
+
|
| 9 |
+
try:
|
| 10 |
+
from hmmlearn.hmm import GaussianHMM
|
| 11 |
+
HMM_AVAILABLE = True
|
| 12 |
+
except ImportError:
|
| 13 |
+
HMM_AVAILABLE = False
|
| 14 |
+
|
| 15 |
+
|
| 16 |
+
class RegimeDetectorHMM:
|
| 17 |
+
"""Market regime detection with strategy parameters per regime."""
|
| 18 |
+
|
| 19 |
+
def __init__(self, n_regimes: int = 3):
|
| 20 |
+
self.n_regimes = n_regimes
|
| 21 |
+
self.model = None
|
| 22 |
+
self.regime_names = {}
|
| 23 |
+
self.regime_history = []
|
| 24 |
+
|
| 25 |
+
def fit(self, returns: pd.Series, volatility: Optional[pd.Series] = None):
|
| 26 |
+
features = pd.DataFrame({'returns': returns.fillna(0)})
|
| 27 |
+
features['volatility'] = volatility.fillna(0) if volatility is not None else returns.fillna(0).rolling(21).std() * np.sqrt(252)
|
| 28 |
+
features = features.dropna()
|
| 29 |
+
|
| 30 |
+
if HMM_AVAILABLE and len(features) > 63:
|
| 31 |
+
self.model = GaussianHMM(n_components=self.n_regimes, covariance_type='full', n_iter=100, random_state=42)
|
| 32 |
+
self.model.fit(features.values)
|
| 33 |
+
means = self.model.means_[:, 0]
|
| 34 |
+
order = np.argsort(means)[::-1]
|
| 35 |
+
self.regime_names = {order[0]: 'bull', order[1]: 'neutral', order[2]: 'bear'}
|
| 36 |
+
else:
|
| 37 |
+
self.model = None
|
| 38 |
+
|
| 39 |
+
def predict(self, returns: pd.Series, volatility: Optional[pd.Series] = None) -> pd.Series:
|
| 40 |
+
features = pd.DataFrame({'returns': returns.fillna(0)})
|
| 41 |
+
features['volatility'] = volatility.fillna(0) if volatility is not None else returns.fillna(0).rolling(21).std() * np.sqrt(252)
|
| 42 |
+
features = features.dropna()
|
| 43 |
+
|
| 44 |
+
if self.model is not None and len(features) > 10:
|
| 45 |
+
states = self.model.predict(features.values)
|
| 46 |
+
regime_series = pd.Series([self.regime_names.get(s, 'neutral') for s in states], index=features.index)
|
| 47 |
+
else:
|
| 48 |
+
regime_series = pd.Series('neutral', index=features.index)
|
| 49 |
+
for idx in features.index:
|
| 50 |
+
r = features.loc[idx, 'returns']
|
| 51 |
+
v = features.loc[idx, 'volatility']
|
| 52 |
+
v_med = features['volatility'].median()
|
| 53 |
+
if v > v_med * 1.5: regime_series.loc[idx] = 'high_vol'
|
| 54 |
+
elif r > 0.001: regime_series.loc[idx] = 'bull'
|
| 55 |
+
elif r < -0.001: regime_series.loc[idx] = 'bear'
|
| 56 |
+
else: regime_series.loc[idx] = 'neutral'
|
| 57 |
+
|
| 58 |
+
self.regime_history = regime_series
|
| 59 |
+
return regime_series
|
| 60 |
+
|
| 61 |
+
def get_regime_strategy(self, regime: str) -> Dict:
|
| 62 |
+
strategies = {
|
| 63 |
+
'bull': {'risk_aversion': 1.0, 'momentum_weight': 0.7, 'mean_reversion_weight': 0.1, 'max_leverage': 1.5, 'hedge_ratio': 0.0},
|
| 64 |
+
'bear': {'risk_aversion': 3.0, 'momentum_weight': 0.2, 'mean_reversion_weight': 0.5, 'max_leverage': 0.5, 'hedge_ratio': 0.5},
|
| 65 |
+
'high_vol': {'risk_aversion': 4.0, 'momentum_weight': 0.3, 'mean_reversion_weight': 0.3, 'max_leverage': 0.3, 'hedge_ratio': 0.7},
|
| 66 |
+
'neutral': {'risk_aversion': 2.0, 'momentum_weight': 0.4, 'mean_reversion_weight': 0.4, 'max_leverage': 1.0, 'hedge_ratio': 0.2}
|
| 67 |
+
}
|
| 68 |
+
return strategies.get(regime, strategies['neutral'])
|
| 69 |
+
|
| 70 |
+
def get_regime_stats(self, returns: pd.Series) -> pd.DataFrame:
|
| 71 |
+
if len(self.regime_history) == 0:
|
| 72 |
+
return pd.DataFrame()
|
| 73 |
+
stats = []
|
| 74 |
+
for regime in self.regime_history.unique():
|
| 75 |
+
mask = self.regime_history == regime
|
| 76 |
+
r = returns.reindex(self.regime_history.index)[mask].dropna()
|
| 77 |
+
if len(r) > 0:
|
| 78 |
+
stats.append({'regime': regime, 'n_days': len(r), 'mean_return': r.mean() * 252,
|
| 79 |
+
'volatility': r.std() * np.sqrt(252), 'max_drawdown': (r.cumsum() - r.cumsum().cummax()).min()})
|
| 80 |
+
return pd.DataFrame(stats)
|