| """Meta-Model: Learns which model/signal to trust dynamically. |
| |
| This mimics how Renaissance Technologies combines signals — a meta-learner |
| weights LSTM, Transformer, XGBoost, and sentiment based on recent performance, |
| regime, and volatility state. |
| """ |
| import numpy as np |
| import pandas as pd |
| import torch |
| import torch.nn as nn |
| from sklearn.ensemble import GradientBoostingRegressor |
| from typing import Dict, List, Optional, Tuple |
| import warnings |
| warnings.filterwarnings('ignore') |
|
|
|
|
| class MetaModel: |
| """Meta-learner that dynamically weights base model predictions.""" |
| |
| def __init__(self, |
| base_models: List[str] = None, |
| meta_learner_type: str = 'xgb', |
| lookback_window: int = 63, |
| device: str = 'cpu'): |
| """ |
| Args: |
| base_models: Names of base models (e.g., ['lstm','transformer','xgboost','sentiment']) |
| meta_learner_type: 'xgb', 'nn', or 'bayesian' |
| lookback_window: How many days of past performance to use as features |
| """ |
| self.base_models = base_models or ['lstm', 'transformer', 'xgboost', 'sentiment'] |
| self.meta_learner_type = meta_learner_type |
| self.lookback_window = lookback_window |
| self.device = torch.device(device) |
| |
| self.meta_model = None |
| self.performance_history = {m: [] for m in self.base_models} |
| self.weight_history = [] |
| self.is_fitted = False |
| |
| def _build_meta_features(self, |
| predictions: Dict[str, np.ndarray], |
| regime: Optional[str] = None, |
| volatility: Optional[float] = None, |
| recent_returns: Optional[np.ndarray] = None) -> np.ndarray: |
| """ |
| Build feature vector for meta-learner. |
| |
| Features include: |
| - Raw predictions from each base model |
| - Recent IC of each model |
| - Recent MSE of each model |
| - Volatility regime |
| - Recent market return |
| """ |
| n_samples = len(list(predictions.values())[0]) |
| features = [] |
| |
| |
| for model in self.base_models: |
| if model in predictions: |
| features.append(predictions[model]) |
| else: |
| features.append(np.zeros(n_samples)) |
| |
| |
| for model in self.base_models: |
| perf = self.performance_history.get(model, [0.0] * self.lookback_window) |
| |
| perf = perf[-self.lookback_window:] |
| while len(perf) < self.lookback_window: |
| perf = [0.0] + perf |
| |
| features.append(np.full(n_samples, np.mean(perf))) |
| features.append(np.full(n_samples, np.std(perf) if len(perf) > 1 else 0.0)) |
| features.append(np.full(n_samples, perf[-1] if perf else 0.0)) |
| |
| |
| if regime: |
| regime_map = {'bull': 1.0, 'bear': -1.0, 'high_vol': 0.0, 'neutral': 0.5} |
| regime_val = regime_map.get(regime, 0.5) |
| features.append(np.full(n_samples, regime_val)) |
| else: |
| features.append(np.zeros(n_samples)) |
| |
| |
| features.append(np.full(n_samples, volatility or 0.2)) |
| |
| |
| if recent_returns is not None and len(recent_returns) > 0: |
| features.append(np.full(n_samples, np.mean(recent_returns[-5:]))) |
| else: |
| features.append(np.zeros(n_samples)) |
| |
| return np.column_stack(features) |
| |
| def fit(self, |
| predictions_train: Dict[str, np.ndarray], |
| actual_train: np.ndarray, |
| regime_train: Optional[List[str]] = None, |
| volatility_train: Optional[np.ndarray] = None) -> Dict: |
| """ |
| Train meta-learner to predict actual returns from base model predictions. |
| |
| The meta-learner learns optimal weights for combining base models. |
| """ |
| n_samples = len(actual_train) |
| |
| |
| X_meta = self._build_meta_features( |
| predictions_train, |
| regime=regime_train[0] if regime_train else None, |
| volatility=volatility_train[0] if volatility_train is not None else None |
| ) |
| |
| if self.meta_learner_type == 'xgb': |
| self.meta_model = GradientBoostingRegressor( |
| n_estimators=100, |
| max_depth=4, |
| learning_rate=0.05, |
| subsample=0.8, |
| random_state=42 |
| ) |
| self.meta_model.fit(X_meta, actual_train) |
| |
| elif self.meta_learner_type == 'nn': |
| self.meta_model = self._build_nn_meta_model(X_meta.shape[1]) |
| self._train_nn_meta(X_meta, actual_train) |
| |
| elif self.meta_learner_type == 'bayesian': |
| |
| self.meta_model = GradientBoostingRegressor( |
| n_estimators=100, |
| max_depth=4, |
| learning_rate=0.05, |
| loss='quantile', alpha=0.5, |
| random_state=42 |
| ) |
| self.meta_model.fit(X_meta, actual_train) |
| |
| self.is_fitted = True |
| |
| |
| pred = self.predict_meta(predictions_train, regime_train, volatility_train) |
| from scipy.stats import spearmanr |
| ic, _ = spearmanr(pred, actual_train) |
| mse = np.mean((pred - actual_train) ** 2) |
| |
| return { |
| 'meta_ic': ic, |
| 'meta_mse': mse, |
| 'n_samples': n_samples |
| } |
| |
| def _build_nn_meta_model(self, input_size: int): |
| """Build small neural network meta-learner.""" |
| class MetaNN(nn.Module): |
| def __init__(self, input_size): |
| super().__init__() |
| self.net = nn.Sequential( |
| nn.Linear(input_size, 64), |
| nn.ReLU(), |
| nn.Dropout(0.2), |
| nn.Linear(64, 32), |
| nn.ReLU(), |
| nn.Linear(32, 1) |
| ) |
| def forward(self, x): |
| return self.net(x) |
| return MetaNN(input_size).to(self.device) |
| |
| def _train_nn_meta(self, X: np.ndarray, y: np.ndarray, epochs: int = 50): |
| """Train NN meta-learner.""" |
| X_t = torch.FloatTensor(X).to(self.device) |
| y_t = torch.FloatTensor(y).unsqueeze(1).to(self.device) |
| |
| optimizer = torch.optim.Adam(self.meta_model.parameters(), lr=1e-3) |
| criterion = nn.MSELoss() |
| |
| for epoch in range(epochs): |
| self.meta_model.train() |
| optimizer.zero_grad() |
| pred = self.meta_model(X_t) |
| loss = criterion(pred, y_t) |
| loss.backward() |
| optimizer.step() |
| |
| def predict_meta(self, |
| predictions: Dict[str, np.ndarray], |
| regimes: Optional[List[str]] = None, |
| volatilities: Optional[np.ndarray] = None) -> np.ndarray: |
| """Generate meta-model predictions.""" |
| if not self.is_fitted: |
| |
| preds = [predictions.get(m, np.zeros(len(list(predictions.values())[0]))) |
| for m in self.base_models] |
| return np.mean(preds, axis=0) |
| |
| X_meta = self._build_meta_features( |
| predictions, |
| regime=regimes[0] if regimes else None, |
| volatility=volatilities[0] if volatilities is not None else None |
| ) |
| |
| if self.meta_learner_type == 'nn': |
| self.meta_model.eval() |
| with torch.no_grad(): |
| X_t = torch.FloatTensor(X_meta).to(self.device) |
| pred = self.meta_model(X_t).cpu().numpy().flatten() |
| else: |
| pred = self.meta_model.predict(X_meta) |
| |
| return pred |
| |
| def update_performance(self, model_name: str, prediction: np.ndarray, actual: np.ndarray): |
| """Update rolling performance history for a base model.""" |
| from scipy.stats import spearmanr |
| ic, _ = spearmanr(prediction, actual) |
| if np.isnan(ic): |
| ic = 0.0 |
| self.performance_history[model_name].append(ic) |
| |
| self.performance_history[model_name] = self.performance_history[model_name][-self.lookback_window:] |
| |
| def get_model_weights(self) -> Dict[str, float]: |
| """Get current implied weights from performance history.""" |
| weights = {} |
| total_ic = 0 |
| for model in self.base_models: |
| perf = self.performance_history.get(model, [0.0]) |
| avg_ic = np.mean(perf) if perf else 0.0 |
| |
| weight = max(avg_ic, 0.0) |
| weights[model] = weight |
| total_ic += weight |
| |
| if total_ic > 0: |
| weights = {k: v / total_ic for k, v in weights.items()} |
| else: |
| |
| weights = {k: 1.0 / len(self.base_models) for k in self.base_models} |
| |
| return weights |
| |
| def adaptive_predict(self, |
| predictions: Dict[str, np.ndarray], |
| actual_prev: Optional[np.ndarray] = None, |
| regime: Optional[str] = None) -> Tuple[np.ndarray, Dict[str, float]]: |
| """ |
| Adaptive prediction that updates weights based on recent performance. |
| |
| Returns: |
| final_predictions, current_weights |
| """ |
| |
| if actual_prev is not None: |
| for model, pred in predictions.items(): |
| if len(pred) == len(actual_prev): |
| self.update_performance(model, pred, actual_prev) |
| |
| |
| weights = self.get_model_weights() |
| self.weight_history.append(weights) |
| |
| |
| final_pred = np.zeros(len(list(predictions.values())[0])) |
| for model, weight in weights.items(): |
| if model in predictions: |
| final_pred += weight * predictions[model] |
| |
| return final_pred, weights |
|
|