| """Portfolio Optimizer - Risk-aware allocation engine.""" |
| import numpy as np |
| import pandas as pd |
| from scipy.optimize import minimize |
| from typing import Dict, List, Optional, Tuple |
| import warnings |
| warnings.filterwarnings('ignore') |
|
|
|
|
| class PortfolioOptimizer: |
| """Portfolio optimizer with constraints and robust optimization""" |
| |
| def __init__(self, |
| max_weight: float = 0.20, |
| min_weight: float = 0.0, |
| target_return: Optional[float] = None, |
| risk_free_rate: float = 0.04, |
| transaction_cost: float = 0.0003, |
| turnover_penalty: float = 0.001, |
| risk_aversion: float = 1.0): |
| self.max_weight = max_weight |
| self.min_weight = min_weight |
| self.target_return = target_return |
| self.risk_free_rate = risk_free_rate |
| self.transaction_cost = transaction_cost |
| self.turnover_penalty = turnover_penalty |
| self.risk_aversion = risk_aversion |
| |
| def optimize_mean_variance(self, |
| mu: np.ndarray, |
| Sigma: np.ndarray, |
| current_weights: Optional[np.ndarray] = None, |
| long_only: bool = True) -> Dict: |
| """ |
| Mean-variance optimization with transaction costs |
| |
| Args: |
| mu: Expected returns vector (n_assets,) |
| Sigma: Covariance matrix (n_assets, n_assets) |
| current_weights: Current portfolio weights (n_assets,) |
| long_only: If True, weights must be >= 0 |
| |
| Returns: |
| Dict with weights, expected_return, volatility, sharpe |
| """ |
| n_assets = len(mu) |
| |
| |
| def objective(w): |
| port_return = np.dot(w, mu) |
| port_variance = np.dot(w, np.dot(Sigma, w)) |
| |
| |
| if current_weights is not None: |
| turnover = np.sum(np.abs(w - current_weights)) |
| tc_penalty = self.turnover_penalty * turnover |
| else: |
| tc_penalty = 0 |
| |
| |
| return -(port_return - self.risk_aversion * port_variance - tc_penalty) |
| |
| |
| constraints = [{'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0}] |
| |
| if self.target_return is not None: |
| constraints.append( |
| {'type': 'eq', 'fun': lambda w: np.dot(w, mu) - self.target_return} |
| ) |
| |
| |
| if long_only: |
| bounds = [(self.min_weight, self.max_weight) for _ in range(n_assets)] |
| else: |
| bounds = [(-self.max_weight, self.max_weight) for _ in range(n_assets)] |
| |
| |
| w0 = np.ones(n_assets) / n_assets |
| |
| |
| result = minimize( |
| objective, |
| w0, |
| method='SLSQP', |
| bounds=bounds, |
| constraints=constraints, |
| options={'maxiter': 1000, 'ftol': 1e-9} |
| ) |
| |
| if not result.success: |
| print(f"Optimization warning: {result.message}") |
| |
| weights = result.x |
| weights = np.maximum(weights, 0) |
| weights /= np.sum(weights) |
| |
| |
| port_return = np.dot(weights, mu) |
| port_vol = np.sqrt(np.dot(weights, np.dot(Sigma, weights))) |
| sharpe = (port_return - self.risk_free_rate) / port_vol if port_vol > 0 else 0 |
| |
| return { |
| 'weights': weights, |
| 'expected_return': port_return, |
| 'volatility': port_vol, |
| 'sharpe_ratio': sharpe, |
| 'success': result.success |
| } |
| |
| def optimize_max_sharpe(self, |
| mu: np.ndarray, |
| Sigma: np.ndarray, |
| current_weights: Optional[np.ndarray] = None) -> Dict: |
| """Optimize for maximum Sharpe ratio""" |
| n_assets = len(mu) |
| |
| def neg_sharpe(w): |
| port_return = np.dot(w, mu) |
| port_vol = np.sqrt(np.dot(w, np.dot(Sigma, w))) |
| |
| if current_weights is not None: |
| turnover = np.sum(np.abs(w - current_weights)) |
| port_return -= self.turnover_penalty * turnover |
| |
| return -(port_return - self.risk_free_rate) / (port_vol + 1e-8) |
| |
| constraints = [{'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0}] |
| bounds = [(self.min_weight, self.max_weight) for _ in range(n_assets)] |
| w0 = np.ones(n_assets) / n_assets |
| |
| result = minimize( |
| neg_sharpe, |
| w0, |
| method='SLSQP', |
| bounds=bounds, |
| constraints=constraints, |
| options={'maxiter': 1000} |
| ) |
| |
| weights = result.x |
| weights = np.maximum(weights, 0) |
| weights /= np.sum(weights) |
| |
| port_return = np.dot(weights, mu) |
| port_vol = np.sqrt(np.dot(weights, np.dot(Sigma, weights))) |
| sharpe = (port_return - self.risk_free_rate) / port_vol |
| |
| return { |
| 'weights': weights, |
| 'expected_return': port_return, |
| 'volatility': port_vol, |
| 'sharpe_ratio': sharpe, |
| 'success': result.success |
| } |
| |
| def optimize_min_volatility(self, |
| mu: np.ndarray, |
| Sigma: np.ndarray, |
| min_return: Optional[float] = None) -> Dict: |
| """Optimize for minimum volatility with optional return constraint""" |
| n_assets = len(mu) |
| |
| def variance(w): |
| return np.dot(w, np.dot(Sigma, w)) |
| |
| constraints = [{'type': 'eq', 'fun': lambda w: np.sum(w) - 1.0}] |
| |
| if min_return is not None: |
| constraints.append( |
| {'type': 'ineq', 'fun': lambda w: np.dot(w, mu) - min_return} |
| ) |
| |
| bounds = [(self.min_weight, self.max_weight) for _ in range(n_assets)] |
| w0 = np.ones(n_assets) / n_assets |
| |
| result = minimize( |
| variance, |
| w0, |
| method='SLSQP', |
| bounds=bounds, |
| constraints=constraints, |
| options={'maxiter': 1000} |
| ) |
| |
| weights = result.x |
| weights = np.maximum(weights, 0) |
| weights /= np.sum(weights) |
| |
| port_return = np.dot(weights, mu) |
| port_vol = np.sqrt(np.dot(weights, np.dot(Sigma, weights))) |
| sharpe = (port_return - self.risk_free_rate) / port_vol if port_vol > 0 else 0 |
| |
| return { |
| 'weights': weights, |
| 'expected_return': port_return, |
| 'volatility': port_vol, |
| 'sharpe_ratio': sharpe, |
| 'success': result.success |
| } |
| |
| def robust_optimization(self, |
| mu: np.ndarray, |
| Sigma: np.ndarray, |
| mu_uncertainty: Optional[np.ndarray] = None, |
| Sigma_uncertainty: Optional[float] = None) -> Dict: |
| """ |
| Robust optimization with uncertainty sets |
| |
| Uses worst-case approach: optimize for worst-case mu within uncertainty ellipsoid |
| """ |
| n_assets = len(mu) |
| |
| if mu_uncertainty is None: |
| |
| mu_uncertainty = np.abs(mu) * 0.2 |
| |
| |
| mu_worst = mu - mu_uncertainty |
| |
| |
| if Sigma_uncertainty is not None: |
| Sigma_robust = Sigma + np.eye(n_assets) * Sigma_uncertainty |
| else: |
| Sigma_robust = Sigma |
| |
| return self.optimize_mean_variance(mu_worst, Sigma_robust) |
| |
| def black_litterman(self, |
| market_caps: np.ndarray, |
| Sigma: np.ndarray, |
| risk_aversion: float = 2.5, |
| views: Optional[List[Dict]] = None, |
| view_confidence: float = 0.5) -> Dict: |
| """ |
| Black-Litterman model for incorporating investor views |
| |
| Args: |
| market_caps: Market capitalization weights |
| Sigma: Covariance matrix |
| risk_aversion: Risk aversion parameter |
| views: List of view dicts with 'assets', 'direction', 'magnitude' |
| view_confidence: Confidence in views (0-1) |
| """ |
| n_assets = len(market_caps) |
| |
| |
| Pi = risk_aversion * np.dot(Sigma, market_caps) |
| |
| if views is None or len(views) == 0: |
| |
| return self.optimize_mean_variance(Pi, Sigma) |
| |
| |
| P = [] |
| Q = [] |
| Omega_diag = [] |
| |
| for view in views: |
| assets = view['assets'] |
| direction = view['direction'] |
| magnitude = view['magnitude'] |
| |
| p_row = np.zeros(n_assets) |
| for asset in assets: |
| p_row[asset] = 1.0 / len(assets) |
| |
| P.append(p_row) |
| Q.append(magnitude if direction == 'overweight' else -magnitude) |
| Omega_diag.append(view_confidence) |
| |
| P = np.array(P) |
| Q = np.array(Q) |
| Omega = np.diag(Omega_diag) |
| |
| |
| tau = 0.05 |
| |
| M_inverse = np.linalg.inv(tau * Sigma) |
| middle = np.linalg.inv(np.dot(np.dot(P, tau * Sigma), P.T) + Omega) |
| |
| BL_mu = Pi + np.dot( |
| np.dot(tau * Sigma, P.T), |
| np.dot(middle, Q - np.dot(P, Pi)) |
| ) |
| |
| BL_Sigma = Sigma + tau * Sigma - np.dot( |
| np.dot(tau * Sigma, P.T), |
| np.dot(middle, np.dot(P, tau * Sigma)) |
| ) |
| |
| return self.optimize_mean_variance(BL_mu, BL_Sigma) |
| |
| def compute_efficient_frontier(self, |
| mu: np.ndarray, |
| Sigma: np.ndarray, |
| n_points: int = 50) -> pd.DataFrame: |
| """Compute efficient frontier points""" |
| min_vol_result = self.optimize_min_volatility(mu, Sigma) |
| max_ret_result = self.optimize_mean_variance(mu, Sigma, target_return=np.max(mu)) |
| |
| min_ret = min_vol_result['expected_return'] |
| max_ret = max_ret_result['expected_return'] |
| |
| target_returns = np.linspace(min_ret, max_ret, n_points) |
| |
| frontier = [] |
| for target in target_returns: |
| result = self.optimize_mean_variance(mu, Sigma, target_return=target) |
| frontier.append({ |
| 'target_return': target, |
| 'volatility': result['volatility'], |
| 'sharpe': result['sharpe_ratio'] |
| }) |
| |
| return pd.DataFrame(frontier) |
|
|