Spaces:
Running
Running
import inspect | |
import unittest | |
from unittest.mock import patch | |
import numpy as np | |
from pysr import PySRRegressor | |
from pysr.sr import run_feature_selection, _handle_feature_selection | |
from sklearn.utils.estimator_checks import check_estimator | |
import sympy | |
from sympy import lambdify | |
import pandas as pd | |
import warnings | |
class TestPipeline(unittest.TestCase): | |
def setUp(self): | |
# Using inspect, | |
# get default niterations from PySRRegressor, and double them: | |
default_niterations = ( | |
inspect.signature(PySRRegressor.__init__).parameters["niterations"].default | |
) | |
default_populations = ( | |
inspect.signature(PySRRegressor.__init__).parameters["populations"].default | |
) | |
self.default_test_kwargs = dict( | |
model_selection="accuracy", | |
niterations=default_niterations * 2, | |
populations=default_populations * 2, | |
) | |
self.rstate = np.random.RandomState(0) | |
self.X = self.rstate.randn(100, 5) | |
def test_linear_relation(self): | |
y = self.X[:, 0] | |
model = PySRRegressor(**self.default_test_kwargs) | |
model.fit(self.X, y) | |
print(model.equations_) | |
self.assertLessEqual(model.get_best()["loss"], 1e-4) | |
def test_multiprocessing(self): | |
y = self.X[:, 0] | |
model = PySRRegressor(**self.default_test_kwargs, procs=2, multithreading=False) | |
model.fit(self.X, y) | |
print(model.equations_) | |
self.assertLessEqual(model.equations_.iloc[-1]["loss"], 1e-4) | |
def test_multioutput_custom_operator_quiet_custom_complexity(self): | |
y = self.X[:, [0, 1]] ** 2 | |
model = PySRRegressor( | |
unary_operators=["square_op(x) = x^2"], | |
extra_sympy_mappings={"square_op": lambda x: x**2}, | |
complexity_of_operators={"square_op": 2, "plus": 1}, | |
binary_operators=["plus"], | |
verbosity=0, | |
**self.default_test_kwargs, | |
procs=0, | |
# Test custom operators with constraints: | |
nested_constraints={"square_op": {"square_op": 3}}, | |
constraints={"square_op": 10}, | |
) | |
model.fit(self.X, y) | |
equations = model.equations_ | |
print(equations) | |
self.assertIn("square_op", model.equations_[0].iloc[-1]["equation"]) | |
self.assertLessEqual(equations[0].iloc[-1]["loss"], 1e-4) | |
self.assertLessEqual(equations[1].iloc[-1]["loss"], 1e-4) | |
test_y1 = model.predict(self.X) | |
test_y2 = model.predict(self.X, index=[-1, -1]) | |
mse1 = np.average((test_y1 - y) ** 2) | |
mse2 = np.average((test_y2 - y) ** 2) | |
self.assertLessEqual(mse1, 1e-4) | |
self.assertLessEqual(mse2, 1e-4) | |
bad_y = model.predict(self.X, index=[0, 0]) | |
bad_mse = np.average((bad_y - y) ** 2) | |
self.assertGreater(bad_mse, 1e-4) | |
def test_multioutput_weighted_with_callable_temp_equation(self): | |
X = self.X.copy() | |
y = X[:, [0, 1]] ** 2 | |
w = self.rstate.rand(*y.shape) | |
w[w < 0.5] = 0.0 | |
w[w >= 0.5] = 1.0 | |
# Double equation when weights are 0: | |
y = (2 - w) * y | |
# Thus, pysr needs to use the weights to find the right equation! | |
model = PySRRegressor( | |
unary_operators=["sq(x) = x^2"], | |
binary_operators=["plus"], | |
extra_sympy_mappings={"sq": lambda x: x**2}, | |
**self.default_test_kwargs, | |
procs=0, | |
temp_equation_file=True, | |
delete_tempfiles=False, | |
) | |
model.fit(X.copy(), y, weights=w) | |
# These tests are flaky, so don't fail test: | |
try: | |
np.testing.assert_almost_equal( | |
model.predict(X.copy())[:, 0], X[:, 0] ** 2, decimal=4 | |
) | |
except AssertionError: | |
print("Error in test_multioutput_weighted_with_callable_temp_equation") | |
print("Model equations: ", model.sympy()[0]) | |
print("True equation: x0^2") | |
try: | |
np.testing.assert_almost_equal( | |
model.predict(X.copy())[:, 1], X[:, 1] ** 2, decimal=4 | |
) | |
except AssertionError: | |
print("Error in test_multioutput_weighted_with_callable_temp_equation") | |
print("Model equations: ", model.sympy()[1]) | |
print("True equation: x1^2") | |
def test_empty_operators_single_input_warm_start(self): | |
X = self.rstate.randn(100, 1) | |
y = X[:, 0] + 3.0 | |
regressor = PySRRegressor( | |
unary_operators=[], | |
binary_operators=["plus"], | |
**self.default_test_kwargs, | |
) | |
self.assertTrue("None" in regressor.__repr__()) | |
regressor.fit(X, y) | |
self.assertTrue("None" not in regressor.__repr__()) | |
self.assertTrue(">>>>" in regressor.__repr__()) | |
self.assertLessEqual(regressor.equations_.iloc[-1]["loss"], 1e-4) | |
np.testing.assert_almost_equal(regressor.predict(X), y, decimal=1) | |
# Test if repeated fit works: | |
regressor.set_params(niterations=0, warm_start=True) | |
# This should exit immediately, and use the old equations | |
regressor.fit(X, y) | |
self.assertLessEqual(regressor.equations_.iloc[-1]["loss"], 1e-4) | |
np.testing.assert_almost_equal(regressor.predict(X), y, decimal=1) | |
# Tweak model selection: | |
regressor.set_params(model_selection="best") | |
self.assertEqual(regressor.get_params()["model_selection"], "best") | |
self.assertTrue("None" not in regressor.__repr__()) | |
self.assertTrue(">>>>" in regressor.__repr__()) | |
def test_noisy(self): | |
y = self.X[:, [0, 1]] ** 2 + self.rstate.randn(self.X.shape[0], 1) * 0.05 | |
model = PySRRegressor( | |
# Test that passing a single operator works: | |
unary_operators="sq(x) = x^2", | |
binary_operators="plus", | |
extra_sympy_mappings={"sq": lambda x: x**2}, | |
**self.default_test_kwargs, | |
procs=0, | |
denoise=True, | |
) | |
model.fit(self.X, y) | |
self.assertLessEqual(model.get_best()[1]["loss"], 1e-2) | |
self.assertLessEqual(model.get_best()[1]["loss"], 1e-2) | |
def test_pandas_resample_with_nested_constraints(self): | |
X = pd.DataFrame( | |
{ | |
"T": self.rstate.randn(500), | |
"x": self.rstate.randn(500), | |
"unused_feature": self.rstate.randn(500), | |
} | |
) | |
true_fn = lambda x: np.array(x["T"] + x["x"] ** 2 + 1.323837) | |
y = true_fn(X) | |
noise = self.rstate.randn(500) * 0.01 | |
y = y + noise | |
# We also test y as a pandas array: | |
y = pd.Series(y) | |
# Resampled array is a different order of features: | |
Xresampled = pd.DataFrame( | |
{ | |
"unused_feature": self.rstate.randn(100), | |
"x": self.rstate.randn(100), | |
"T": self.rstate.randn(100), | |
} | |
) | |
model = PySRRegressor( | |
unary_operators=[], | |
binary_operators=["+", "*", "/", "-"], | |
**self.default_test_kwargs, | |
denoise=True, | |
nested_constraints={"/": {"+": 1, "-": 1}, "+": {"*": 4}}, | |
) | |
model.fit(X, y, Xresampled=Xresampled) | |
self.assertNotIn("unused_feature", model.latex()) | |
self.assertIn("T", model.latex()) | |
self.assertIn("x", model.latex()) | |
self.assertLessEqual(model.get_best()["loss"], 1e-1) | |
fn = model.get_best()["lambda_format"] | |
X2 = pd.DataFrame( | |
{ | |
"T": self.rstate.randn(100), | |
"unused_feature": self.rstate.randn(100), | |
"x": self.rstate.randn(100), | |
} | |
) | |
self.assertLess(np.average((fn(X2) - true_fn(X2)) ** 2), 1e-1) | |
self.assertLess(np.average((model.predict(X2) - true_fn(X2)) ** 2), 1e-1) | |
def test_high_dim_selection_early_stop(self): | |
X = pd.DataFrame({f"k{i}": self.rstate.randn(10000) for i in range(10)}) | |
Xresampled = pd.DataFrame({f"k{i}": self.rstate.randn(100) for i in range(10)}) | |
y = X["k7"] ** 2 + np.cos(X["k9"]) * 3 | |
model = PySRRegressor( | |
unary_operators=["cos"], | |
select_k_features=3, | |
early_stop_condition=1e-4, # Stop once most accurate equation is <1e-4 MSE | |
maxsize=12, | |
**self.default_test_kwargs, | |
) | |
model.set_params(model_selection="accuracy") | |
model.fit(X, y, Xresampled=Xresampled) | |
self.assertLess(np.average((model.predict(X) - y) ** 2), 1e-4) | |
# Again, but with numpy arrays: | |
model.fit(X.values, y.values, Xresampled=Xresampled.values) | |
self.assertLess(np.average((model.predict(X.values) - y.values) ** 2), 1e-4) | |
class TestBest(unittest.TestCase): | |
def setUp(self): | |
self.rstate = np.random.RandomState(0) | |
self.X = self.rstate.randn(10, 2) | |
self.y = np.cos(self.X[:, 0]) ** 2 | |
self.model = PySRRegressor( | |
niterations=1, | |
extra_sympy_mappings={}, | |
output_jax_format=False, | |
model_selection="accuracy", | |
equation_file="equation_file.csv", | |
) | |
self.model.fit(self.X, self.y) | |
equations = pd.DataFrame( | |
{ | |
"equation": ["1.0", "cos(x0)", "square(cos(x0))"], | |
"loss": [1.0, 0.1, 1e-5], | |
"complexity": [1, 2, 3], | |
} | |
) | |
equations["complexity loss equation".split(" ")].to_csv( | |
"equation_file.csv.bkup", sep="|" | |
) | |
self.model.refresh() | |
self.equations_ = self.model.equations_ | |
def test_best(self): | |
self.assertEqual(self.model.sympy(), sympy.cos(sympy.Symbol("x0")) ** 2) | |
def test_index_selection(self): | |
self.assertEqual(self.model.sympy(-1), sympy.cos(sympy.Symbol("x0")) ** 2) | |
self.assertEqual(self.model.sympy(2), sympy.cos(sympy.Symbol("x0")) ** 2) | |
self.assertEqual(self.model.sympy(1), sympy.cos(sympy.Symbol("x0"))) | |
self.assertEqual(self.model.sympy(0), 1.0) | |
def test_best_tex(self): | |
self.assertEqual(self.model.latex(), "\\cos^{2}{\\left(x_{0} \\right)}") | |
def test_best_lambda(self): | |
X = self.X | |
y = self.y | |
for f in [self.model.predict, self.equations_.iloc[-1]["lambda_format"]]: | |
np.testing.assert_almost_equal(f(X), y, decimal=4) | |
class TestFeatureSelection(unittest.TestCase): | |
def setUp(self): | |
self.rstate = np.random.RandomState(0) | |
def test_feature_selection(self): | |
X = self.rstate.randn(20000, 5) | |
y = X[:, 2] ** 2 + X[:, 3] ** 2 | |
selected = run_feature_selection(X, y, select_k_features=2) | |
self.assertEqual(sorted(selected), [2, 3]) | |
def test_feature_selection_handler(self): | |
X = self.rstate.randn(20000, 5) | |
y = X[:, 2] ** 2 + X[:, 3] ** 2 | |
var_names = [f"x{i}" for i in range(5)] | |
selected_X, selection = _handle_feature_selection( | |
X, | |
select_k_features=2, | |
variable_names=var_names, | |
y=y, | |
) | |
self.assertTrue((2 in selection) and (3 in selection)) | |
selected_var_names = [var_names[i] for i in selection] | |
self.assertEqual(set(selected_var_names), set("x2 x3".split(" "))) | |
np.testing.assert_array_equal( | |
np.sort(selected_X, axis=1), np.sort(X[:, [2, 3]], axis=1) | |
) | |
class TestMiscellaneous(unittest.TestCase): | |
"""Test miscellaneous functions.""" | |
def setUp(self): | |
# Allows all scikit-learn exception messages to be read. | |
self.maxDiff = None | |
def test_deprecation(self): | |
"""Ensure that deprecation works as expected. | |
This should give a warning, and sets the correct value. | |
""" | |
with self.assertWarns(FutureWarning): | |
model = PySRRegressor(fractionReplaced=0.2) | |
# This is a deprecated parameter, so we should get a warning. | |
# The correct value should be set: | |
self.assertEqual(model.fraction_replaced, 0.2) | |
def test_size_warning(self): | |
"""Ensure that a warning is given for a large input size.""" | |
model = PySRRegressor(max_evals=10000, populations=2) | |
X = np.random.randn(10001, 2) | |
y = np.random.randn(10001) | |
with warnings.catch_warnings(): | |
warnings.simplefilter("error") | |
with self.assertRaises(Exception) as context: | |
model.fit(X, y) | |
self.assertIn("more than 10,000", str(context.exception)) | |
def test_feature_warning(self): | |
"""Ensure that a warning is given for large number of features.""" | |
model = PySRRegressor() | |
X = np.random.randn(100, 10) | |
y = np.random.randn(100) | |
with warnings.catch_warnings(): | |
warnings.simplefilter("error") | |
with self.assertRaises(Exception) as context: | |
model.fit(X, y) | |
self.assertIn("with 10 features or more", str(context.exception)) | |
def test_scikit_learn_compatibility(self): | |
"""Test PySRRegressor compatibility with scikit-learn.""" | |
model = PySRRegressor( | |
max_evals=10000, verbosity=0, progress=False | |
) # Return early. | |
# TODO: Add deterministic option so that we can test these. | |
# (would require backend changes, and procs=0 for serialism.) | |
check_generator = check_estimator(model, generate_only=True) | |
tests_requiring_determinism = [ | |
"check_regressors_int", # PySR is not deterministic, so fails this. | |
"check_regressor_data_not_an_array", | |
"check_supervised_y_2d", | |
"check_regressors_int", | |
"check_fit_idempotent", | |
] | |
exception_messages = [] | |
for (_, check) in check_generator: | |
try: | |
with warnings.catch_warnings(): | |
warnings.simplefilter("ignore") | |
check(model) | |
print("Passed", check.func.__name__) | |
except Exception as e: | |
error_message = str(e) | |
failed_tolerance_check = "Not equal to tolerance" in error_message | |
if ( | |
failed_tolerance_check | |
and check.func.__name__ in tests_requiring_determinism | |
): | |
# Skip test as PySR is not deterministic. | |
print( | |
"Failed", | |
check.func.__name__, | |
"which is an allowed failure, as the test requires determinism.", | |
) | |
else: | |
exception_messages.append( | |
f"{check.func.__name__}: {error_message}\n" | |
) | |
print("Failed", check.func.__name__, "with:") | |
# Add a leading tab to error message, which | |
# might be multi-line: | |
print( | |
"\n".join( | |
[(" " * 4) + row for row in error_message.split("\n")] | |
) | |
) | |
# If any checks failed don't let the test pass. | |
self.assertEqual([], exception_messages) | |