| import sympy |
| import numpy as np |
| from sklearn.metrics import r2_score, mean_squared_error |
| from sklearn.metrics import mean_absolute_error |
| from scipy.optimize import minimize |
| import math |
| import re |
|
|
| |
| class Expression: |
| SAFE_FUNCTIONS = { |
| 'sqrt': np.sqrt, |
| 'log': np.log, |
| 'exp': np.exp, |
| 'sin': np.sin, |
| 'cos': np.cos, |
| 'tan': np.tan, |
| 'asin': np.arcsin, |
| 'abs': np.abs, |
| 'pow': np.power, |
| |
| } |
|
|
| OPERATOR_ARITY = { |
| '+': 2, |
| '-': 2, |
| '*': 2, |
| '/': 2, |
| '**': 2, |
| 'sin': 1, |
| 'cos': 1, |
| 'tan': 1, |
| 'log': 1, |
| 'sqrt': 1, |
| 'exp': 1 |
| } |
|
|
| OPERATOR_FUNCS = { |
| '+': sympy.Add, |
| '-': lambda x, y: x - y, |
| '*': sympy.Mul, |
| '/': lambda x, y: x / y, |
| '**': sympy.Pow, |
| 'sin': sympy.sin, |
| 'cos': sympy.cos, |
| 'tan': sympy.tan, |
| 'log': sympy.log, |
| 'sqrt': sympy.sqrt, |
| 'exp': sympy.exp |
| } |
|
|
| def parse_prefix(self, tokens): |
| """Parse prefix notation expression to SymPy. |
| |
| Example: ['*', 'x_1', '+', 'x_2', 'C'] -> x_1*(x_2 + C) |
| """ |
| if not tokens: |
| raise ValueError("Empty token list") |
|
|
| |
| UNARY_OPS = {'sin', 'cos', 'tan', 'exp', 'log', 'sqrt', 'abs', 'asin'} |
| BINARY_OPS = {'+', '-', '*', '/', '**', '^'} |
|
|
| stack = [] |
|
|
| |
| for token in reversed(tokens): |
| if token in BINARY_OPS or token in UNARY_OPS: |
| |
| if token in UNARY_OPS: |
| if len(stack) < 1: |
| raise ValueError(f"Not enough operands for {token}") |
| arg = stack.pop() |
| if token in ['sin', 'cos', 'tan', 'exp', 'log', 'sqrt', 'abs', 'asin']: |
| stack.append(f"{token}({arg})") |
| else: |
| raise ValueError(f"Unknown unary operator: {token}") |
| else: |
| if len(stack) < 2: |
| raise ValueError(f"Not enough operands for {token}") |
| right = stack.pop() |
| left = stack.pop() |
|
|
| |
| op_map = {'+': '+', '-': '-', '*': '*', '/': '/', '**': '**', '^': '**'} |
| op = op_map.get(token, token) |
|
|
| if op in ['**', '^']: |
| stack.append(f"({left})**({right})") |
| elif op == '/': |
| stack.append(f"({left})/({right})") |
| else: |
| stack.append(f"({left}){op}({right})") |
| else: |
| |
| stack.append(token) |
|
|
| if len(stack) != 1: |
| raise ValueError(f"Invalid prefix expression, {len(stack)} elements remaining") |
|
|
| return sympy.sympify(stack[0], evaluate=False) |
|
|
| def __init__(self, expression, is_prefix=False): |
| try: |
| self.original_expression = expression |
|
|
| if is_prefix: |
| |
| tokens = expression.replace('^', '**').split() |
| self.sympy_expression = self.parse_prefix(tokens) |
| else: |
| |
| self.sympy_expression = sympy.sympify(expression, evaluate=False) |
| except Exception as e: |
| raise ValueError(f"Failed to parse expression: {e}") |
|
|
| self.max_var = 0 |
| for symbol in self.sympy_expression.free_symbols: |
| if symbol.name.startswith('x_'): |
| try: |
| index = int(symbol.name.split('_')[1]) |
| self.max_var = max(self.max_var, index) |
| except ValueError: |
| |
| pass |
| |
| computable_expression = str(self.sympy_expression) |
|
|
| for i in range(1, self.max_var + 1): |
| |
| computable_expression = re.sub(rf'\bx_{i}\b', f'x[{i-1}]', computable_expression) |
| |
|
|
| self.computable_expression = computable_expression.replace('**C', '**2') |
| |
| self.constant_count = self.computable_expression.count('C') |
| self.best_constants = [1.0] * self.constant_count |
|
|
|
|
| if self.constant_count > 0: |
| |
| split_expr = self.computable_expression.split('C') |
| new_expr = split_expr[0] |
|
|
| for i in range(1, len(split_expr)): |
| |
| new_expr += f'constants[{i-1}]' |
| |
| new_expr += split_expr[i] |
|
|
| self.computable_expression = new_expr |
| |
|
|
|
|
| |
|
|
| def __str__(self): |
| return f"Expression: {self.original_expression}, Best constants: {self.best_constants}" |
| def sympy_str(self): |
| """ |
| Returns the string representation of the sympy expression. |
| """ |
| return str(self.sympy_expression) |
| |
| def is_valid_on_dataset(self, X, test_constants_list=None): |
| """ |
| Checks if the expression evaluates to valid (finite) values for all rows in X, |
| across one or more sets of test constants. |
| |
| Args: |
| X (np.ndarray): Input data, shape (n_samples, n_features) |
| test_constants_list (list of lists): Optional. Defaults to [[1.0]*count]. |
| Example: [[1.0]*n, [0.5]*n, [2.0]*n] to test more thoroughly. |
| |
| Returns: |
| bool: True if no evaluation returns nan/inf or crashes. False otherwise. |
| """ |
| if test_constants_list is None: |
| test_constants_list = [[1.0] * self.constant_count] |
| |
| try: |
| for constants in test_constants_list: |
| results = self.evaluate(X, constants) |
| |
| if not np.all(np.isfinite(results)): |
| return False |
| |
| return True |
| except Exception: |
| return False |
|
|
| |
| def evaluate(self, X, constants=None): |
| |
| |
| |
|
|
| |
|
|
| if constants is None: |
| |
| constants = self.best_constants |
|
|
| try: |
| local_env = { |
| "constants": np.array(constants), |
| **self.SAFE_FUNCTIONS, |
| "__builtins__": None |
| } |
|
|
| if not isinstance(X, np.ndarray): |
| X = np.array(X) |
| |
| |
| if X.ndim == 1: |
| X = X.reshape(1, -1) |
|
|
| |
| x_cols = [X[:, i] for i in range(X.shape[1])] |
| local_env["x"] = x_cols |
| |
| |
| |
| try: |
| y_pred_array = eval(self.computable_expression, local_env) |
|
|
| except FloatingPointError as e: |
| |
| |
| |
| return np.full(X.shape[0], np.nan) |
|
|
| except Exception as e: |
| |
| return np.full(X.shape[0], np.nan) |
|
|
| finally: |
| np.seterr(all='warn') |
|
|
| |
| return np.asarray(y_pred_array, dtype=float) |
|
|
| except Exception as e: |
| |
| num_samples = X.shape[0] if X.ndim > 0 else 1 |
| return np.full(num_samples, np.nan) |
|
|
| def fit_constants(self, X, y): |
| X = np.array(X) |
| y = np.array(y) |
|
|
| if self.constant_count == 0: |
| try: |
| y_pred = self.evaluate(X) |
| if not np.all(np.isfinite(y_pred)): |
| return -np.inf |
| if np.all(y_pred == y_pred[0]) and len(np.unique(y)) > 1: |
| return 0.0 |
| return r2_score(y, y_pred) |
| except Exception as e: |
| return -np.inf |
|
|
| def loss(current_constants): |
|
|
| try: |
| y_pred = self.evaluate(X, current_constants) |
| |
| except Exception as e: |
| print(f"Exception during evaluation: {e}") |
| return np.inf |
| |
| if not np.all(np.isfinite(y_pred)): |
| return np.inf |
| |
| |
| mse = np.mean((y - y_pred) ** 2) |
| |
| return mse |
|
|
| bounds = [(-2., 2.)] * self.constant_count |
| |
| initial_guess = ( |
| self.best_constants |
| if self.best_constants and len(self.best_constants) == self.constant_count |
| else [.0] * self.constant_count |
| ) |
|
|
| |
| initial_guess = np.array(initial_guess, dtype=float).flatten() |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| result = minimize(loss, |
| x0=initial_guess, |
| method='L-BFGS-B', |
| bounds=bounds, |
| |
| ) |
|
|
| if result.success: |
| self.best_constants = result.x.tolist() |
| |
| try: |
| y_pred = self.evaluate(X) |
| if not np.all(np.isfinite(y_pred)): |
| return -np.inf |
| |
| if len(np.unique(y)) == 1: |
| if np.allclose(y_pred, y[0]): |
| return 1.0 |
| else: |
| return 0.0 |
| |
| |
| return r2_score(y, y_pred) |
| except Exception as e: |
| return -np.inf |
| else: |
| return -np.inf |
|
|
| |
|
|
| |
| |
|
|
| |
| |
| |
|
|
| |
| |
| |
|
|
| |
| |
| |
|
|
|
|
| |
| |
|
|
| |
| |
| |
| |
| |
| |
| |