| """
|
| CHG (Covariance-based Hilbert Geometry) Algorithm Implementation
|
|
|
| This module contains the core CHG algorithm implementation with multi-head attention
|
| mechanism for Gaussian Process regression with enhanced covariance computation.
|
|
|
| Author: CHG Algorithm Team
|
| Version: 1.0.0
|
| """
|
|
|
| import numpy as np
|
| from typing import Tuple, Optional
|
|
|
|
|
| class CHG:
|
| """
|
| CHG (Covariance-based Hilbert Geometry) Model
|
|
|
| A Gaussian Process model with multi-head attention mechanism for enhanced
|
| covariance computation, supporting uncertainty quantification and optimization.
|
|
|
| Parameters:
|
| -----------
|
| input_dim : int
|
| Dimensionality of input features
|
| hidden_dim : int
|
| Hidden dimension for feature transformation
|
| num_heads : int
|
| Number of attention heads
|
| """
|
|
|
| def __init__(self, input_dim: int, hidden_dim: int, num_heads: int):
|
| self.input_dim = input_dim
|
| self.hidden_dim = hidden_dim
|
| self.num_heads = num_heads
|
| self.head_dim = hidden_dim // num_heads
|
|
|
| self._init_parameters()
|
|
|
| def _init_parameters(self):
|
| """Initialize model parameters with proper scaling"""
|
|
|
| self.W_q = np.random.normal(0, 0.02, (self.input_dim, self.hidden_dim))
|
| self.W_k = np.random.normal(0, 0.02, (self.input_dim, self.hidden_dim))
|
| self.W_v = np.random.normal(0, 0.02, (self.input_dim, self.hidden_dim))
|
|
|
|
|
| self.W_ff1 = np.random.normal(0, 0.02, (self.hidden_dim, 2 * self.hidden_dim))
|
| self.b_ff1 = np.zeros((2 * self.hidden_dim,))
|
| self.W_ff2 = np.random.normal(0, 0.02, (2 * self.hidden_dim, 1))
|
| self.b_ff2 = np.zeros((1,))
|
|
|
|
|
| self.gamma = np.ones((self.hidden_dim,))
|
| self.beta = np.zeros((self.hidden_dim,))
|
|
|
|
|
| self.W_heads = np.random.normal(0, 0.02, (self.num_heads, 1))
|
| self.scale = np.random.normal(1.0, 0.1, (1,))
|
|
|
| def _layer_norm(self, x: np.ndarray, gamma: np.ndarray, beta: np.ndarray,
|
| eps: float = 1e-6) -> np.ndarray:
|
| """Apply layer normalization"""
|
| mean = np.mean(x, axis=-1, keepdims=True)
|
| var = np.var(x, axis=-1, keepdims=True)
|
| return gamma * (x - mean) / np.sqrt(var + eps) + beta
|
|
|
| def _gelu(self, x: np.ndarray) -> np.ndarray:
|
| """GELU activation function"""
|
| return 0.5 * x * (1 + np.tanh(np.sqrt(2/np.pi) * (x + 0.044715 * x**3)))
|
|
|
| def _compute_covariance(self, X1: np.ndarray, X2: np.ndarray) -> np.ndarray:
|
| """
|
| Compute enhanced covariance matrix using multi-head attention mechanism
|
|
|
| Parameters:
|
| -----------
|
| X1 : np.ndarray
|
| First set of input points
|
| X2 : np.ndarray
|
| Second set of input points
|
|
|
| Returns:
|
| --------
|
| np.ndarray
|
| Covariance matrix between X1 and X2
|
| """
|
| n1, n2 = X1.shape[0], X2.shape[0]
|
|
|
|
|
| Q1 = X1 @ self.W_q
|
| K2 = X2 @ self.W_k
|
| V2 = X2 @ self.W_v
|
|
|
|
|
| Q1_h = Q1.reshape(n1, self.num_heads, self.head_dim)
|
| K2_h = K2.reshape(n2, self.num_heads, self.head_dim)
|
| V2_h = V2.reshape(n2, self.num_heads, self.head_dim)
|
|
|
| head_outputs = []
|
|
|
| for h in range(self.num_heads):
|
| Q_h = Q1_h[:, h, :]
|
| K_h = K2_h[:, h, :]
|
| V_h = V2_h[:, h, :]
|
|
|
|
|
| attention_scores = Q_h @ K_h.T / np.sqrt(self.head_dim)
|
|
|
|
|
| enhanced_cov = np.zeros((n1, n2))
|
|
|
| for i in range(n1):
|
| for j in range(n2):
|
| base_sim = attention_scores[i, j]
|
|
|
|
|
| feature_int = Q_h[i] * K_h[j]
|
|
|
|
|
| norm_features = self._layer_norm(
|
| feature_int.reshape(1, -1),
|
| self.gamma[:self.head_dim],
|
| self.beta[:self.head_dim]
|
| ).flatten()
|
|
|
|
|
| ff_hidden = norm_features @ self.W_ff1[:self.head_dim, :self.head_dim] + self.b_ff1[:self.head_dim]
|
| ff_hidden = self._gelu(ff_hidden)
|
| ff_out = ff_hidden @ self.W_ff2[:self.head_dim, :] + self.b_ff2
|
|
|
|
|
| enhanced_cov[i, j] = base_sim + ff_out[0]
|
|
|
| head_outputs.append(enhanced_cov)
|
|
|
|
|
| final_cov = np.zeros((n1, n2))
|
| for h, head_out in enumerate(head_outputs):
|
| final_cov += self.W_heads[h, 0] * head_out
|
|
|
| final_cov = self.scale[0] * final_cov
|
|
|
|
|
| if n1 == n2 and np.allclose(X1, X2):
|
| final_cov = 0.5 * (final_cov + final_cov.T)
|
| final_cov += 1e-6 * np.eye(n1)
|
|
|
| return final_cov
|
|
|
| def fit_predict(self, X_train: np.ndarray, y_train: np.ndarray,
|
| X_test: np.ndarray, noise_var: float = 1e-6) -> Tuple[np.ndarray, np.ndarray]:
|
| """
|
| Fit the model and make predictions
|
|
|
| Parameters:
|
| -----------
|
| X_train : np.ndarray
|
| Training input data
|
| y_train : np.ndarray
|
| Training target values
|
| X_test : np.ndarray
|
| Test input data
|
| noise_var : float
|
| Observation noise variance
|
|
|
| Returns:
|
| --------
|
| Tuple[np.ndarray, np.ndarray]
|
| Predictive mean and variance
|
| """
|
|
|
| K_train = self._compute_covariance(X_train, X_train)
|
| K_test_train = self._compute_covariance(X_test, X_train)
|
| K_test = self._compute_covariance(X_test, X_test)
|
|
|
|
|
| K_noisy = K_train + noise_var * np.eye(len(X_train))
|
|
|
| try:
|
| L = np.linalg.cholesky(K_noisy)
|
| alpha = np.linalg.solve(L, y_train)
|
| alpha = np.linalg.solve(L.T, alpha)
|
|
|
|
|
| mean_pred = K_test_train @ alpha
|
|
|
|
|
| v = np.linalg.solve(L, K_test_train.T)
|
| var_pred = np.diag(K_test) - np.sum(v**2, axis=0)
|
|
|
| except np.linalg.LinAlgError:
|
| K_inv = np.linalg.pinv(K_noisy)
|
| mean_pred = K_test_train @ K_inv @ y_train
|
| var_pred = np.diag(K_test - K_test_train @ K_inv @ K_test_train.T)
|
|
|
| var_pred = np.maximum(var_pred, 1e-8)
|
| return mean_pred, var_pred
|
|
|
| def log_marginal_likelihood(self, X: np.ndarray, y: np.ndarray,
|
| noise_var: float = 1e-6) -> float:
|
| """
|
| Compute log marginal likelihood for model selection
|
|
|
| Parameters:
|
| -----------
|
| X : np.ndarray
|
| Input data
|
| y : np.ndarray
|
| Target values
|
| noise_var : float
|
| Observation noise variance
|
|
|
| Returns:
|
| --------
|
| float
|
| Log marginal likelihood
|
| """
|
| K = self._compute_covariance(X, X)
|
| K_noisy = K + noise_var * np.eye(len(X))
|
|
|
| try:
|
| L = np.linalg.cholesky(K_noisy)
|
| alpha = np.linalg.solve(L, y)
|
|
|
| data_fit = -0.5 * y.T @ alpha
|
| complexity = -np.sum(np.log(np.diag(L)))
|
| normalization = -0.5 * len(y) * np.log(2 * np.pi)
|
|
|
| return float(data_fit + complexity + normalization)
|
|
|
| except np.linalg.LinAlgError:
|
| sign, logdet = np.linalg.slogdet(K_noisy)
|
| K_inv = np.linalg.pinv(K_noisy)
|
|
|
| data_fit = -0.5 * y.T @ K_inv @ y
|
| complexity = -0.5 * logdet if sign > 0 else -1e6
|
| normalization = -0.5 * len(y) * np.log(2 * np.pi)
|
|
|
| return float(data_fit + complexity + normalization)
|
|
|
| def get_covariance_matrix(self, X: np.ndarray) -> np.ndarray:
|
| """Get the covariance matrix for given inputs"""
|
| return self._compute_covariance(X, X)
|
|
|
| def update_parameters(self, gradient_dict: dict, learning_rate: float = 0.001):
|
| """Update model parameters using computed gradients"""
|
| for param_name, gradient in gradient_dict.items():
|
| if hasattr(self, param_name):
|
| current_param = getattr(self, param_name)
|
| updated_param = current_param - learning_rate * gradient
|
| setattr(self, param_name, updated_param)
|
|
|
|
|
| class CHGOptimizer:
|
| """
|
| Optimizer for CHG model parameters using numerical gradients
|
|
|
| Parameters:
|
| -----------
|
| model : CHG
|
| CHG model instance to optimize
|
| learning_rate : float
|
| Learning rate for parameter updates
|
| """
|
|
|
| def __init__(self, model: CHG, learning_rate: float = 0.001):
|
| self.model = model
|
| self.lr = learning_rate
|
|
|
| def compute_gradients(self, X: np.ndarray, y: np.ndarray, noise_var: float = 1e-6):
|
| """Compute numerical gradients for all model parameters"""
|
| gradients = {}
|
| eps = 1e-5
|
|
|
| base_loss = -self.model.log_marginal_likelihood(X, y, noise_var)
|
|
|
| for param_name in ['W_q', 'W_k', 'W_v', 'W_ff1', 'W_ff2', 'W_heads', 'scale']:
|
| param = getattr(self.model, param_name)
|
| grad = np.zeros_like(param)
|
|
|
| flat_param = param.flatten()
|
| flat_grad = grad.flatten()
|
|
|
| for i in range(len(flat_param)):
|
| flat_param[i] += eps
|
| param_plus = flat_param.reshape(param.shape)
|
| setattr(self.model, param_name, param_plus)
|
|
|
| loss_plus = -self.model.log_marginal_likelihood(X, y, noise_var)
|
|
|
| flat_param[i] -= 2 * eps
|
| param_minus = flat_param.reshape(param.shape)
|
| setattr(self.model, param_name, param_minus)
|
|
|
| loss_minus = -self.model.log_marginal_likelihood(X, y, noise_var)
|
|
|
| flat_grad[i] = (loss_plus - loss_minus) / (2 * eps)
|
| flat_param[i] += eps
|
|
|
| setattr(self.model, param_name, flat_param.reshape(param.shape))
|
| gradients[param_name] = flat_grad.reshape(param.shape)
|
|
|
| return gradients
|
|
|
| def step(self, X: np.ndarray, y: np.ndarray, noise_var: float = 1e-6):
|
| """Perform one optimization step"""
|
| gradients = self.compute_gradients(X, y, noise_var)
|
| self.model.update_parameters(gradients, self.lr)
|
|
|
|
|
| def run_chg_experiment():
|
| """
|
| Run a simple experiment to demonstrate CHG functionality
|
|
|
| Returns:
|
| --------
|
| Tuple
|
| Trained model, predictions, and variances
|
| """
|
|
|
| model = CHG(input_dim=3, hidden_dim=24, num_heads=4)
|
|
|
|
|
| np.random.seed(42)
|
| X_train = np.random.randn(80, 3)
|
| y_train = np.sum(X_train**2, axis=1) + 0.3 * np.sin(2 * X_train[:, 0]) + 0.1 * np.random.randn(80)
|
|
|
| X_test = np.random.randn(25, 3)
|
| y_test = np.sum(X_test**2, axis=1) + 0.3 * np.sin(2 * X_test[:, 0])
|
|
|
|
|
| pred_mean, pred_var = model.fit_predict(X_train, y_train, X_test)
|
|
|
|
|
| rmse = np.sqrt(np.mean((pred_mean - y_test)**2))
|
| mae = np.mean(np.abs(pred_mean - y_test))
|
|
|
|
|
| pred_std = np.sqrt(pred_var)
|
| coverage = np.mean((y_test >= pred_mean - 1.96 * pred_std) &
|
| (y_test <= pred_mean + 1.96 * pred_std))
|
|
|
| print(f"CHG Performance:")
|
| print(f"RMSE: {rmse:.4f}")
|
| print(f"MAE: {mae:.4f}")
|
| print(f"Coverage: {coverage:.4f}")
|
| print(f"Log Marginal Likelihood: {model.log_marginal_likelihood(X_train, y_train):.4f}")
|
|
|
| return model, pred_mean, pred_var |