| """ |
| Personalization Layer |
| ===================== |
| Clusters traders by behavior and adapts strategies per user. |
| |
| Detects problematic patterns: |
| - Overtrading (excessive frequency) |
| - Revenge trading (increased size after losses) |
| - Risk escalation patterns |
| - Emotional trading signals |
| """ |
|
|
| import torch |
| import torch.nn as nn |
| import numpy as np |
| from typing import Dict, List, Tuple |
| from sklearn.cluster import KMeans |
| from sklearn.preprocessing import StandardScaler |
|
|
|
|
| |
| TRADER_TYPES = { |
| 0: 'Conservative', |
| 1: 'Moderate', |
| 2: 'Aggressive', |
| 3: 'Scalper', |
| 4: 'Swing Trader', |
| } |
|
|
|
|
| class TraderProfiler: |
| """ |
| Build comprehensive trader profiles from historical behavior. |
| Uses both rule-based heuristics and learned embeddings. |
| """ |
| |
| def __init__(self, n_clusters: int = 5): |
| self.n_clusters = n_clusters |
| self.scaler = StandardScaler() |
| self.kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10) |
| self.is_fitted = False |
| |
| def extract_behavior_features(self, trades: List[Dict]) -> np.ndarray: |
| """ |
| Extract behavioral features from trade history. |
| |
| Each trade dict should have: |
| - entry_price, exit_price, size, pnl, holding_time, |
| timestamp, direction (1=long, -1=short) |
| |
| Returns: feature vector for this trader |
| """ |
| if not trades: |
| return np.zeros(15) |
| |
| pnls = [t.get('pnl', 0) for t in trades] |
| sizes = [t.get('size', 0) for t in trades] |
| holding_times = [t.get('holding_time', 0) for t in trades] |
| |
| winners = [p for p in pnls if p > 0] |
| losers = [p for p in pnls if p <= 0] |
| |
| |
| win_rate = len(winners) / max(len(pnls), 1) |
| avg_win = np.mean(winners) if winners else 0 |
| avg_loss = np.mean(losers) if losers else 0 |
| profit_factor = abs(sum(winners)) / (abs(sum(losers)) + 1e-8) |
| |
| |
| avg_position_size = np.mean(sizes) if sizes else 0 |
| max_position_size = np.max(sizes) if sizes else 0 |
| position_size_std = np.std(sizes) if len(sizes) > 1 else 0 |
| |
| |
| avg_holding_time = np.mean(holding_times) if holding_times else 0 |
| trade_frequency = len(trades) |
| |
| |
| consecutive_losses = self._max_consecutive_losses(pnls) |
| size_after_loss = self._avg_size_after_loss(trades) |
| size_after_win = self._avg_size_after_win(trades) |
| revenge_ratio = size_after_loss / (size_after_win + 1e-8) |
| |
| |
| cumulative_pnl = np.cumsum(pnls) |
| running_max = np.maximum.accumulate(cumulative_pnl) if len(cumulative_pnl) > 0 else np.array([0]) |
| drawdowns = running_max - cumulative_pnl if len(cumulative_pnl) > 0 else np.array([0]) |
| max_drawdown = np.max(drawdowns) if len(drawdowns) > 0 else 0 |
| |
| return np.array([ |
| win_rate, |
| avg_win, |
| avg_loss, |
| profit_factor, |
| avg_position_size, |
| max_position_size, |
| position_size_std, |
| avg_holding_time, |
| trade_frequency, |
| consecutive_losses, |
| revenge_ratio, |
| max_drawdown, |
| size_after_loss, |
| size_after_win, |
| len(trades), |
| ]) |
| |
| def _max_consecutive_losses(self, pnls: List[float]) -> int: |
| """Find maximum consecutive losing trades.""" |
| max_streak = 0 |
| current_streak = 0 |
| for p in pnls: |
| if p <= 0: |
| current_streak += 1 |
| max_streak = max(max_streak, current_streak) |
| else: |
| current_streak = 0 |
| return max_streak |
| |
| def _avg_size_after_loss(self, trades: List[Dict]) -> float: |
| """Average position size after a losing trade.""" |
| sizes_after_loss = [] |
| for i in range(1, len(trades)): |
| if trades[i-1].get('pnl', 0) <= 0: |
| sizes_after_loss.append(trades[i].get('size', 0)) |
| return np.mean(sizes_after_loss) if sizes_after_loss else 0 |
| |
| def _avg_size_after_win(self, trades: List[Dict]) -> float: |
| """Average position size after a winning trade.""" |
| sizes_after_win = [] |
| for i in range(1, len(trades)): |
| if trades[i-1].get('pnl', 0) > 0: |
| sizes_after_win.append(trades[i].get('size', 0)) |
| return np.mean(sizes_after_win) if sizes_after_win else 0 |
| |
| def fit(self, all_traders_features: np.ndarray): |
| """Fit clustering model on features from multiple traders.""" |
| self.scaler.fit(all_traders_features) |
| scaled = self.scaler.transform(all_traders_features) |
| self.kmeans.fit(scaled) |
| self.is_fitted = True |
| |
| def predict_type(self, features: np.ndarray) -> Dict: |
| """Predict trader type and provide analysis.""" |
| if not self.is_fitted: |
| |
| return self._rule_based_classification(features) |
| |
| scaled = self.scaler.transform(features.reshape(1, -1)) |
| cluster = self.kmeans.predict(scaled)[0] |
| |
| return { |
| 'cluster': int(cluster), |
| 'type_name': TRADER_TYPES.get(cluster, 'Unknown'), |
| 'features': { |
| 'win_rate': float(features[0]), |
| 'profit_factor': float(features[3]), |
| 'avg_holding_time': float(features[7]), |
| 'revenge_ratio': float(features[10]), |
| 'max_drawdown': float(features[11]), |
| } |
| } |
| |
| def _rule_based_classification(self, features: np.ndarray) -> Dict: |
| """Rule-based trader classification when clustering isn't fitted.""" |
| win_rate = features[0] |
| avg_holding = features[7] |
| position_size = features[4] |
| revenge_ratio = features[10] |
| |
| |
| if avg_holding < 5: |
| trader_type = 3 |
| elif avg_holding > 1440: |
| trader_type = 4 |
| elif position_size > 0.1: |
| trader_type = 2 |
| elif position_size < 0.02: |
| trader_type = 0 |
| else: |
| trader_type = 1 |
| |
| return { |
| 'cluster': trader_type, |
| 'type_name': TRADER_TYPES[trader_type], |
| 'features': { |
| 'win_rate': float(features[0]), |
| 'profit_factor': float(features[3]), |
| 'avg_holding_time': float(features[7]), |
| 'revenge_ratio': float(features[10]), |
| 'max_drawdown': float(features[11]), |
| } |
| } |
|
|
|
|
| class BehaviorAlertSystem: |
| """ |
| Real-time detection of problematic trading patterns. |
| """ |
| |
| def __init__(self): |
| self.thresholds = { |
| 'overtrading_trades_per_hour': 10, |
| 'revenge_size_multiplier': 1.5, |
| 'max_consecutive_losses': 5, |
| 'max_drawdown_pct': 0.15, |
| 'tilt_detection_loss_streak': 3, |
| } |
| |
| def analyze(self, recent_trades: List[Dict], |
| portfolio_value: float, |
| time_window_hours: float = 1.0) -> Dict: |
| """ |
| Analyze recent trading activity for behavioral issues. |
| |
| Returns alerts and recommendations. |
| """ |
| alerts = [] |
| risk_multiplier = 1.0 |
| |
| if not recent_trades: |
| return {'alerts': [], 'risk_multiplier': 1.0, 'status': 'normal'} |
| |
| |
| trade_count = len(recent_trades) |
| if trade_count / max(time_window_hours, 0.1) > self.thresholds['overtrading_trades_per_hour']: |
| alerts.append({ |
| 'type': 'OVERTRADING', |
| 'severity': 'HIGH', |
| 'message': f'Trading {trade_count} times in {time_window_hours}h exceeds safe threshold', |
| 'recommendation': 'Reduce trade frequency. Consider taking a break.', |
| }) |
| risk_multiplier *= 0.5 |
| |
| |
| pnls = [t.get('pnl', 0) for t in recent_trades] |
| sizes = [t.get('size', 0) for t in recent_trades] |
| |
| if len(recent_trades) >= 2: |
| last_pnl = pnls[-2] |
| last_size = sizes[-2] |
| current_size = sizes[-1] |
| |
| if last_pnl < 0 and current_size > last_size * self.thresholds['revenge_size_multiplier']: |
| alerts.append({ |
| 'type': 'REVENGE_TRADING', |
| 'severity': 'CRITICAL', |
| 'message': 'Position size increased significantly after a loss', |
| 'recommendation': 'Avoid increasing size after losses. Maintain discipline.', |
| }) |
| risk_multiplier *= 0.3 |
| |
| |
| consecutive_losses = 0 |
| for p in reversed(pnls): |
| if p <= 0: |
| consecutive_losses += 1 |
| else: |
| break |
| |
| if consecutive_losses >= self.thresholds['tilt_detection_loss_streak']: |
| alerts.append({ |
| 'type': 'LOSS_STREAK', |
| 'severity': 'HIGH', |
| 'message': f'{consecutive_losses} consecutive losing trades detected', |
| 'recommendation': 'Consider pausing trading. Review strategy before next trade.', |
| }) |
| risk_multiplier *= 0.5 |
| |
| |
| total_pnl = sum(pnls) |
| drawdown_pct = abs(min(total_pnl, 0)) / (portfolio_value + 1e-8) |
| |
| if drawdown_pct > self.thresholds['max_drawdown_pct']: |
| alerts.append({ |
| 'type': 'EXCESSIVE_DRAWDOWN', |
| 'severity': 'CRITICAL', |
| 'message': f'Session drawdown at {drawdown_pct*100:.1f}% exceeds {self.thresholds["max_drawdown_pct"]*100}% threshold', |
| 'recommendation': 'Stop trading for the day. Review risk parameters.', |
| }) |
| risk_multiplier *= 0.1 |
| |
| status = 'normal' |
| if any(a['severity'] == 'CRITICAL' for a in alerts): |
| status = 'critical' |
| elif alerts: |
| status = 'warning' |
| |
| return { |
| 'alerts': alerts, |
| 'risk_multiplier': risk_multiplier, |
| 'status': status, |
| 'consecutive_losses': consecutive_losses, |
| 'session_drawdown_pct': drawdown_pct, |
| } |
|
|
|
|
| class PersonalizationEngine: |
| """ |
| Combines profiling, clustering, and alert systems |
| to provide personalized trading recommendations. |
| """ |
| |
| def __init__(self): |
| self.profiler = TraderProfiler() |
| self.alert_system = BehaviorAlertSystem() |
| |
| |
| self.strategy_params = { |
| 0: { |
| 'max_position_pct': 0.02, |
| 'sl_atr_mult': 1.5, |
| 'tp_atr_mult': 2.0, |
| 'min_confidence': 0.7, |
| 'max_trades_per_day': 3, |
| }, |
| 1: { |
| 'max_position_pct': 0.05, |
| 'sl_atr_mult': 2.0, |
| 'tp_atr_mult': 3.0, |
| 'min_confidence': 0.6, |
| 'max_trades_per_day': 5, |
| }, |
| 2: { |
| 'max_position_pct': 0.10, |
| 'sl_atr_mult': 2.5, |
| 'tp_atr_mult': 4.0, |
| 'min_confidence': 0.55, |
| 'max_trades_per_day': 10, |
| }, |
| 3: { |
| 'max_position_pct': 0.03, |
| 'sl_atr_mult': 0.5, |
| 'tp_atr_mult': 1.0, |
| 'min_confidence': 0.55, |
| 'max_trades_per_day': 50, |
| }, |
| 4: { |
| 'max_position_pct': 0.08, |
| 'sl_atr_mult': 3.0, |
| 'tp_atr_mult': 5.0, |
| 'min_confidence': 0.65, |
| 'max_trades_per_day': 2, |
| }, |
| } |
| |
| def get_personalized_params(self, trader_profile: Dict, |
| behavior_alerts: Dict) -> Dict: |
| """ |
| Get personalized trading parameters based on trader profile |
| and current behavior alerts. |
| """ |
| trader_type = trader_profile.get('cluster', 1) |
| params = self.strategy_params.get(trader_type, self.strategy_params[1]).copy() |
| |
| |
| risk_mult = behavior_alerts.get('risk_multiplier', 1.0) |
| params['max_position_pct'] *= risk_mult |
| |
| |
| if any(a['type'] == 'REVENGE_TRADING' for a in behavior_alerts.get('alerts', [])): |
| params['min_confidence'] = min(params['min_confidence'] + 0.15, 0.9) |
| |
| |
| if any(a['type'] == 'OVERTRADING' for a in behavior_alerts.get('alerts', [])): |
| params['max_trades_per_day'] = max(1, params['max_trades_per_day'] // 2) |
| |
| return params |
|
|