SFL / SymbolicFunctionLearner.py
amirhosseinkarami's picture
Update SymbolicFunctionLearner.py
f738fe0
"""""""""""""""""""""""""""""""""
Do not run or modify this file.
For running: DiffEqnSolver.py
For modifying: Settings.py
"""""""""""""""""""""""""""""""""
import datetime
import time
import numpy as np
import tensorflow as tf
import DataUtils
import Settings
from DataUtils import choices_to_init_weight_matrix
from DataUtils import tf_diff_sqrt, tf_diff_log, our_tanh, spike
from Settings import implicit_function, d_eps
tf.compat.v1.disable_eager_execution()
def new_weight_matrix(n_rows, n_cols, mean=0.0, name=None):
initial = tf.random.normal(shape=[n_rows, n_cols], mean=mean, stddev=Settings.w_matrix_stddev)
if name is not None:
return tf.Variable(initial, name=name)
return tf.Variable(initial)
def new_bias(n_cols, name=None):
initial = tf.zeros(shape=[1, n_cols])
if name is not None:
return tf.Variable(initial, name=name)
return tf.Variable(initial)
def operate_on_tensors(tensor_A, tensor_B, fn_set, use_both_for_unary=True):
# print('op on tensors. input shapes: {}, {}'.format(tensor_A.shape, tensor_B.shape))
if use_both_for_unary:
w2 = 1.0
else:
w2 = 0.0
answer_vector = []
for operator_i in fn_set:
if operator_i == 'id':
answer_vector.extend([tensor_A[:, :, 0] + w2 * tensor_B[:, :, 0]])
# print("id vector shape: {}".format(answer_vector[-1].shape))
elif operator_i == 'add':
answer_vector.extend([tensor_A[:, :, 0] + tensor_B[:, :, 0]])
elif operator_i == 'sin':
answer_vector.extend([tf.sin(tensor_A[:, :, 0] + w2 * tensor_B[:, :, 0])])
elif operator_i == 'cos':
answer_vector.extend([tf.cos(tensor_A[:, :, 0] + w2 * tensor_B[:, :, 0])])
elif operator_i == 'sqrt':
answer_vector.extend([tf_diff_sqrt(tensor_A[:, :, 0] + w2 * tensor_B[:, :, 0])])
elif operator_i == 'mul':
answer_vector.extend([tf.multiply(tensor_A[:, :, 0], tensor_B[:, :, 0])])
elif operator_i == 'div':
answer_vector.extend([tf.math.divide_no_nan(tensor_A[:, :, 0], tensor_B[:, :, 0])])
elif operator_i == 'log':
answer_vector.extend([tf_diff_log(tensor_A[:, :, 0] + w2 * tensor_B[:, :, 0])])
elif operator_i == 'exp':
answer_vector.extend([tf.exp(our_tanh(tensor_A[:, :, 0] + w2 * tensor_B[:, :, 0], factor=np.log(50000)))])
else:
answer_vector.extend([None])
return tf.stack(answer_vector, axis=-1)
def sm_no_const_selector(nonflat_input, flat_input, initial_weights):
# print("sm_no_const_selector---")
# print("initial_weights: {}".format(initial_weights.shape))
# print("nonflat_input: {}".format(nonflat_input.shape))
pre_sm_weights = new_weight_matrix(int(nonflat_input.shape[-1]), 1)
post_sm_weights = tf.math.softmax(pre_sm_weights + initial_weights, axis=0)
# print("post_sm_weights: {}".format(post_sm_weights.shape))
sm_result = tf.matmul(nonflat_input, post_sm_weights)
# print("sm_result: {}".format(sm_result.shape))
flat_weights = tf.multiply(post_sm_weights,
tf.cast(tf.greater(post_sm_weights,
tf.reduce_max(post_sm_weights) - 0.01), tf.float32))
flat_weights = tf.divide(flat_weights, tf.reduce_sum(flat_weights))
flat_result = tf.matmul(flat_input, flat_weights)
return sm_result, flat_result, pre_sm_weights+initial_weights, post_sm_weights, flat_weights
def collect_op_inputs_str(weight_w, weight_b, input_strs):
num_inputs = weight_w.shape[0]
# print("num_inputs: {}. input_strs length: {}".format(num_inputs, len(input_strs)))
# print(weight_w)
# print(input_strs)
temp_answer = ''
has_one = False
has_more_than_one = False
for row in range(num_inputs):
if np.abs(weight_w[row][0]) > Settings.big_eps and input_strs[row] != '0':
if has_one:
temp_answer += ' + '
has_more_than_one = True
if np.abs(weight_w[row][0] - 1) < Settings.big_eps:
temp_answer += '{}'.format(input_strs[row])
else:
temp_answer += '{:.4f}*({})'.format(weight_w[row][0], input_strs[row])
has_one = True
# print('weight_b[-1]: {}'.format(weight_b))
if np.abs(weight_b[-1][0]) > Settings.big_eps:
if has_one:
temp_answer += ' + '
has_more_than_one = True
temp_answer += '{:.4f}'.format(weight_b[-1][0])
if len(temp_answer) == 0:
temp_answer = '0'
if has_more_than_one:
return '(' + temp_answer + ')'
return temp_answer
def collect_minimal_op_inputs_str(weight_w, input_strs):
num_inputs = weight_w.shape[0]
temp_answer = ''
has_one = False
has_more_than_one = False
for row in range(num_inputs):
if has_one:
temp_answer += ' + '
has_more_than_one = True
temp_answer += '{}'.format(input_strs[row])
has_one = True
if len(temp_answer) == 0:
temp_answer = '0'
if has_more_than_one:
return '(' + temp_answer + ')'
return temp_answer
def operation_to_str_best(weight_w, weight_b, weight_sm, input_strs1, input_strs2, fn_set,
digits=None, unary_both=True, minimal=False):
if input_strs2 is None:
temp_answer = collect_op_inputs_str(weight_w, weight_b, input_strs1)
return [temp_answer]
answer = ['0' for _ in fn_set]
temp_answer1 = input_strs1
temp_answer2 = input_strs2
# Set up temp answer. Don't change this value!
if unary_both:
if temp_answer1 == '0' and temp_answer2 == '0':
temp_answer = '0'
elif temp_answer1 == '0':
temp_answer = str(temp_answer2)
elif temp_answer2 == '0':
temp_answer = str(temp_answer1)
else:
temp_answer = '({} + {})'.format(temp_answer1, temp_answer2)
else:
temp_answer = str(temp_answer1)
if 'id' in fn_set:
fn_index = fn_set.index('id')
answer[fn_index] = temp_answer
if 'sin' in fn_set:
fn_index = fn_set.index('sin')
if temp_answer == '0':
answer[fn_index] = '0'
else:
answer[fn_index] = 'sin({})'.format(temp_answer)
if 'cos' in fn_set:
fn_index = fn_set.index('cos')
answer[fn_index] = 'cos({})'.format(temp_answer)
if 'sqrt' in fn_set:
fn_index = fn_set.index('sqrt')
answer[fn_index] = '(abs({}))^(0.5)'.format(temp_answer)
if 'log' in fn_set:
fn_index = fn_set.index('log')
if temp_answer == '0':
answer[fn_index] = 'log(0.0001)'
else:
answer[fn_index] = 'log({})'.format(temp_answer)
if 'mul' in fn_set:
fn_index = fn_set.index('mul')
if temp_answer1 == '0' or temp_answer2 == '0':
prod_answer = '0'
else:
prod_answer = '({} * {})'.format(temp_answer1, temp_answer2)
answer[fn_index] = prod_answer
if 'add' in fn_set:
fn_index = fn_set.index('add')
if temp_answer1 == '0' and temp_answer2 == '0':
sum_answer = '0'
elif temp_answer1 == '0':
sum_answer = str(temp_answer2)
elif temp_answer2 == '0':
sum_answer = str(temp_answer1)
else:
sum_answer = '({} + {})'.format(temp_answer1, temp_answer2)
answer[fn_index] = sum_answer
if 'sub' in fn_set:
fn_index = fn_set.index('sub')
temp_answer1 = input_strs1
temp_answer2 = input_strs2
if temp_answer1 == '0' and temp_answer2 == '0':
diff_answer = '0'
elif temp_answer1 == '0':
diff_answer = "-{}".format(temp_answer2)
elif temp_answer2 == '0':
diff_answer = temp_answer1
else:
diff_answer = '({} - {})'.format(temp_answer1, temp_answer2)
answer[fn_index] = diff_answer
if 'max' in fn_set:
fn_index = fn_set.index('max')
answer[fn_index] = 'max({}, {})'.format(temp_answer1, temp_answer2)
if 'min' in fn_set:
fn_index = fn_set.index('min')
answer[fn_index] = 'min({}, {})'.format(temp_answer1, temp_answer2)
if 'div' in fn_set:
fn_index = fn_set.index('div')
if temp_answer2 == '0':
temp_answer2 = '0.001'
if temp_answer1 == '0':
div_answer = '0'
else:
div_answer = '({} / ({}))'.format(temp_answer1, temp_answer2)
answer[fn_index] = div_answer
if 'exp' in fn_set:
fn_index = fn_set.index('exp')
answer[fn_index] = 'exp({})'.format(temp_answer)
new_answer = [collect_op_inputs_str(weight_sm, np.zeros([1, 1]), answer)]
# print('New answer: {}'.format(new_answer))
# print('weight w, weight b: {}, {}'.format(weight_w, weight_b))
if minimal:
ret_val = collect_minimal_op_inputs_str(weight_w, new_answer)
else:
ret_val = collect_op_inputs_str(weight_w, weight_b, new_answer)
return ret_val
def flattened_sm_result(input_x, sm_applied_weights, our_w, our_b):
# print('Create operator node. Input shapes: {}, {}'.format(input_1.shape, input_2.shape))
new_sm_weights = tf.multiply(sm_applied_weights,
tf.cast(tf.greater(sm_applied_weights,
tf.reduce_max(sm_applied_weights) - 0.01), tf.float32))
new_sm_weights = tf.divide(new_sm_weights, tf.reduce_sum(new_sm_weights))
sm_result = tf.matmul(input_x, new_sm_weights)
final_result = tf.multiply(sm_result, our_w) + our_b
# print(' Final result shape: {}'.format(final_result.shape))
return final_result, new_sm_weights
class SFL:
def __init__(self, var_names=None):
self.name = "Symbolic Function Learner"
self.short_name = "SFL"
# mode: in ["sr", "de", "lr"]
self.mode = Settings.mode
# main hyperparameters of the symbolic expression
self.n_tree_layers = Settings.n_tree_layers
self.function_set = Settings.function_set.copy()
self.n_input_variables = Settings.num_features
self.n_dims_per_variable = Settings.num_dims_per_feature
self.n_dims_in_output = Settings.n_dims_in_output
assert self.n_dims_in_output in [1, self.n_dims_per_variable]
# use_both_for_unary: decide how to handle two input children
# for a unary operator.
# True: add the two inputs.
# False: keep first input; discard second input.
self.use_both_for_unary = Settings.use_both_for_unary
# Use a softmax on leaf layer?
self.sm_leaf_layer = Settings.use_leaf_sm
# data_x,y: Input (x, y) values over which we are training.
# For symbolic regression, it's the same as fixed_x,y.
# For differential equations, it's random values.
self.data_x = tf.compat.v1.placeholder("float", [None, self.n_dims_per_variable,
self.n_input_variables], name="data_x")
self.data_y = tf.compat.v1.placeholder("float", [None, self.n_dims_per_variable, 1], name="data_y")
# Fixed_x,y: these are the set of points that must be satisfied
# by the function that is learned. These are used to compute
# the residual error in the cost function.
self.fixed_x = tf.compat.v1.placeholder("float", [None, self.n_dims_per_variable,
self.n_input_variables], name="data_x")
self.fixed_y = tf.compat.v1.placeholder("float", [None, self.n_dims_per_variable, 1], name="data_y")
# To initialize operators in the SFL with a warm start before training
self.init_op_weights = tf.compat.v1.placeholder("float", [len(self.function_set), 2 ** self.n_tree_layers - 1],
name="init_op_weights")
self.init_op_weight_matrix = np.zeros(shape=[len(self.function_set), 2 ** self.n_tree_layers - 1])
# To initialize variable choices in the SFL with a warm start before training
num_var_input_choices = self.n_input_variables
self.init_var_weights = tf.compat.v1.placeholder("float", [num_var_input_choices, 2 ** self.n_tree_layers],
name="init_var_weights")
self.init_var_weight_matrix = np.zeros(shape=[num_var_input_choices, 2 ** self.n_tree_layers])
# variables can have default or custom names
if self.n_input_variables == 1 and var_names is None:
self.var_names = ['x']
elif var_names is None:
self.var_names = ['x{}'.format(i + 1) for i in range(self.n_input_variables)]
else:
self.var_names = var_names
self.learn_rate = Settings.learn_rate
self.y_gold = self.data_y
self.g_error = tf.Variable(0.0)
self.g_error_not_flat = tf.Variable(0.0)
self.mse = tf.Variable(0.0)
self.mse_not_flat = tf.Variable(0.0)
self.spike_error = tf.Variable(0.0)
self.ivp_error = tf.Variable(0.0)
self.ivp_error_not_flat = tf.Variable(0.0)
self.total_error = tf.Variable(0.0)
if self.mode == "de":
self.ivp_lambda = Settings.ivp_lambda
else:
self.ivp_lambda = 0
self.train_accuracy_log = []
self.valid_accuracy_log = []
self.test_accuracy_log = []
self.seen_eqns = []
self.seen_minimal_eqns = []
self.log_iters = []
self.best_accuracy_so_far = 9999999
self.best_formula_so_far = ""
self.best_iter = 0
self.y_hat = None
self.y_hat_p1 = None
self.y_hat_pp1 = None
self.y_hat_p2 = None
self.y_hat_pp2 = None
self.y_hat_p3 = None
self.y_hat_pp3 = None
self.y_hat_pp12 = None
self.implicit_g = None
self.y_hat_not_flat = None
self.y_hat_p_not_flat = None
self.y_hat_pp_not_flat = None
self.implicit_g_not_flat = None
self.W_matrices = []
self.b_matrices = []
self.non_sm_weights = []
self.leaf_sm_weights = []
self.sm_W_matrices = []
self.sm_applied_W_matrices = []
self.flattened_W_matrices = []
self.use_both_for_unary = Settings.use_both_for_unary
self.init = None
self.sess = None
self.build_sfl()
self.reset(var_names)
def build_sfl(self):
self.data_x = tf.compat.v1.placeholder("float", [None, self.n_dims_per_variable,
self.n_input_variables], name="data_x")
self.data_y = tf.compat.v1.placeholder("float", [None, self.n_dims_per_variable, 1], name="data_y")
self.fixed_x = tf.compat.v1.placeholder("float", [None, self.n_dims_per_variable,
self.n_input_variables], name="fixed_x")
self.fixed_y = tf.compat.v1.placeholder("float", [None, self.n_dims_per_variable, 1], name="fixed_y")
# To initialize operators in the SFL with a warm start before training
self.init_op_weights = tf.compat.v1.placeholder("float", [len(self.function_set), 2 ** self.n_tree_layers - 1],
name="init_op_weights")
# To initialize variable choices in the SFL with a warm start before training
# Right now, only one variable is supported.
num_var_input_choices = self.n_input_variables
self.init_var_weights = tf.compat.v1.placeholder("float", [num_var_input_choices, 2 ** self.n_tree_layers],
name="init_var_weights")
self.g_error = tf.Variable(0.0)
self.g_error_not_flat = tf.Variable(0.0)
self.mse = tf.Variable(0.0)
self.mse_not_flat = tf.Variable(0.0)
self.spike_error = tf.Variable(0.0)
self.ivp_error = tf.Variable(0.0)
self.ivp_error_not_flat = tf.Variable(0.0)
self.total_error = tf.Variable(0.0)
self.W_matrices = []
self.b_matrices = []
self.non_sm_weights = []
self.leaf_sm_weights = []
self.sm_W_matrices = []
self.sm_applied_W_matrices = []
self.flattened_W_matrices = []
previous_output = []
previous_flat_output = []
weight_layer = []
bias_layer = []
if Settings.show_output:
print("Setting up {} model.".format(self.name))
print(" {} tree layers.".format(self.n_tree_layers))
print(" {} features of {} component(s) each.".format(self.n_input_variables, self.n_dims_per_variable))
print(" {} component(s) in output.".format(self.n_dims_in_output))
print(" {} operators: {}.".format(len(self.function_set),
self.function_set))
# Set up leaf layer
for i in range(2 ** (Settings.n_tree_layers - 1)):
if self.sm_leaf_layer:
num_leaf_weights = 1
else:
num_leaf_weights = self.n_input_variables
new_weights1 = new_weight_matrix(num_leaf_weights, 1, mean=0.0)
new_b1 = new_bias(1)
new_weights2 = new_weight_matrix(num_leaf_weights, 1, mean=0.0)
new_b2 = new_bias(1)
# print("self.data_x.shape: {}".format(self.data_x.shape))
if self.sm_leaf_layer:
new_sm_weights1 = new_weight_matrix(self.n_input_variables, 1, mean=0.0)
new_sm_weights2 = new_weight_matrix(self.n_input_variables, 1, mean=0.0)
input_1 = tf.matmul(self.data_x, tf.math.softmax(new_sm_weights1, axis=0))
input_2 = tf.matmul(self.data_x, tf.math.softmax(new_sm_weights2, axis=0))
# todo: ugh
# new_weights1 = tf.constant([[1.0]])
# new_weights2 = tf.constant([[1.0]])
else:
input_1 = self.data_x
input_2 = self.data_x
# print("input_1.shape: {}".format(input_1.shape))
result_1 = tf.matmul(input_1, new_weights1) + new_b1
result_2 = tf.matmul(input_2, new_weights2) + new_b2
# print("result_1.shape: {}".format(result_1.shape))
weight_layer.extend([new_weights1, new_weights2])
bias_layer.extend([new_b1, new_b2])
if self.sm_leaf_layer:
self.leaf_sm_weights.extend([tf.math.softmax(new_sm_weights1, axis=0),
tf.math.softmax(new_sm_weights2, axis=0)])
self.non_sm_weights.extend([new_weights1, new_weights2, new_b1, new_b2])
# self.non_sm_weights.extend([new_weights1, new_weights2])
previous_output.extend([result_1, result_2])
previous_flat_output.extend([result_1, result_2])
self.W_matrices.append(weight_layer)
self.b_matrices.append(bias_layer)
self.sm_W_matrices.append([])
self.sm_applied_W_matrices.append([])
self.flattened_W_matrices.append([])
current_node = 0
# Set up parent layers, one at a time going up
for j in range(Settings.n_tree_layers):
sm_weight_layer = []
sm_applied_weight_layer = []
flattened_weight_layer = []
weight_layer = []
bias_layer = []
new_output = []
new_flat_output = []
result_layer = []
flattened_result_layer = []
for i in range(2 ** (Settings.n_tree_layers - j - 1)):
current_input_1 = previous_output[2 * i]
current_input_2 = previous_output[2 * i + 1]
nonflatten_input = operate_on_tensors(current_input_1, current_input_2, self.function_set,
use_both_for_unary=self.use_both_for_unary)
current_flat_input_1 = previous_flat_output[2 * i]
current_flat_input_2 = previous_flat_output[2 * i + 1]
flatten_input = operate_on_tensors(current_flat_input_1, current_flat_input_2,
self.function_set,
use_both_for_unary=self.use_both_for_unary)
init_op_weights = tf.reshape(self.init_op_weights[:, current_node], [-1, 1])
sm_r, flat_r, pre_sm_w, post_sm_w, flat_w = sm_no_const_selector(nonflatten_input,
flatten_input,
init_op_weights)
new_w = new_weight_matrix(1, 1, mean=1.0)
new_b = new_bias(1)
# self.non_sm_weights.extend([new_b])
sm_r = tf.math.multiply(sm_r, new_w) + new_b
flat_r = tf.multiply(flat_r, new_w) + new_b
sm_weight_layer.extend([pre_sm_w])
sm_applied_weight_layer.extend([post_sm_w])
flattened_weight_layer.extend([flat_w])
new_output.extend([sm_r])
new_flat_output.extend([flat_r])
weight_layer.extend([new_w])
bias_layer.extend([new_b])
""" self.non_sm_weights.extend([new_w, new_b])"""
result_layer.extend([sm_r])
flattened_result_layer.extend([flat_r])
current_node += 1
self.sm_W_matrices.extend([sm_weight_layer])
self.sm_applied_W_matrices.extend([sm_applied_weight_layer])
self.flattened_W_matrices.extend([flattened_weight_layer])
self.W_matrices.extend([weight_layer])
self.b_matrices.extend([bias_layer])
previous_output = new_output
previous_flat_output = new_flat_output
if self.mode == "lr":
self.y_hat_not_flat = spike(previous_output[-1])
self.y_hat = spike(previous_flat_output[-1])
else:
self.y_hat_not_flat = our_tanh(previous_output[-1], factor=10000)
self.y_hat = our_tanh(previous_flat_output[-1], factor=10000)
def reset(self, var_names=None):
tf.compat.v1.reset_default_graph()
self.build_sfl()
if var_names is not None:
self.var_names = var_names
self.log_iters = []
self.train_accuracy_log = []
self.valid_accuracy_log = []
self.test_accuracy_log = []
self.seen_eqns = []
self.seen_minimal_eqns = []
self.setup_derivative_values()
self.setup_err_values(non_const=Settings.non_const)
# TODO: really need to sort out the whole fixed_x, fixed_y thing
if self.mode == "de":
self.ivp_error_not_flat, self.ivp_error = self.setup_ivp_values(self.fixed_x, self.fixed_y)
if self.mode == "de":
self.total_error = self.total_error + self.g_error
if Settings.non_const:
self.total_error = self.total_error + self.spike_error
self.total_error = self.total_error + self.mse + self.ivp_lambda * self.ivp_error
sum_of_squares = tf.reduce_sum([tf.reduce_sum(tf.square(reg_w)) for reg_w in self.non_sm_weights])
sum_of_squares_minus_max = sum_of_squares - tf.reduce_sum([tf.reduce_max(tf.square(reg_w))
for reg_w in self.non_sm_weights])
self.regularization_penalty = tf.reduce_mean([tf.reduce_sum(tf.abs(reg_w))
for reg_w in self.non_sm_weights])
# self.regularization_penalty += sum_of_squares_minus_max
self.loss_function1 = self.mse_not_flat + self.g_error_not_flat + self.spike_error + self.ivp_lambda * self.ivp_error_not_flat
self.loss_function2 = self.mse + self.g_error + self.spike_error + self.ivp_lambda * self.ivp_error
self.loss_function2 += self.regularization_penalty * 0.05 # 0.1
self.loss_function3 = self.mse + self.g_error + self.spike_error + self.ivp_lambda * self.ivp_error
self.loss_function3 += self.regularization_penalty * 0.9 # 1.0
self.opt = tf.compat.v1.train.AdamOptimizer(self.learn_rate)
self.train_step_1 = self.opt.minimize(self.loss_function1)
self.train_step_2 = self.opt.minimize(self.loss_function2)
self.train_step_3 = self.opt.minimize(self.loss_function3)
self.init = tf.compat.v1.global_variables_initializer()
self.sess = tf.compat.v1.Session()
self.sess.run(self.init)
self.best_accuracy_so_far = 9999999
self.best_formula_so_far = ""
self.best_iter = 0
def setup_err_values(self, non_const=False):
if self.mode == "de":
self.g_error = tf.reduce_mean(tf.math.square(self.implicit_g))
self.g_not_flat = tf.reduce_mean(tf.math.square(self.implicit_g_not_flat))
else:
self.g_error = tf.Variable(0.0)
self.g_error_not_flat = tf.Variable(0.0)
self.mse = tf.reduce_mean(tf.math.squared_difference(self.y_hat, self.data_y))
self.mse_not_flat = tf.reduce_mean(tf.math.squared_difference(self.y_hat_not_flat, self.data_y))
if non_const:
self.spike_error = tf.reduce_mean(spike(self.y_hat_p1))
# tf.reduce_sum(spike(self.y_hat_p1) + spike(self.y_hat_p2) + spike(self.y_hat_p3))
def setup_ivp_values(self, fixed_x_ph, fixed_y_ph):
y_hat_err_not_flat = tf.Variable(0.0)
y_hat_err = tf.Variable(0.0)
if fixed_x_ph is not None:
y_hat_err_not_flat = tf.reduce_mean(tf.math.squared_difference(fixed_y_ph,
self.eval_formula(fixed_x_ph, flat=False)))
y_hat_err = tf.reduce_mean(tf.math.squared_difference(fixed_y_ph, self.eval_formula(fixed_x_ph)))
eye = tf.eye(self.n_input_variables)
u1 = eye[:, 0]
if Settings.fixed_x_p1 is not None and len(Settings.fixed_x_p1) > 0:
fixed_x_p1 = tf.constant(np.reshape(Settings.fixed_x_p1,
[-1, Settings.num_dims_per_feature, Settings.num_features]),
dtype="float32")
fixed_y_p1 = tf.constant(np.reshape(Settings.fixed_y_p1,
[-1, Settings.n_dims_in_output, 1]),
dtype="float32")
y_p1_fixed_hat = self.eval_formula(fixed_x_p1 + d_eps * u1 / 2)
y_p1_fixed_hat -= self.eval_formula(fixed_x_p1 - d_eps * u1 / 2)
y_p1_fixed_hat = y_p1_fixed_hat / d_eps
y_hat_err_not_flat += tf.reduce_mean(tf.math.squared_difference(fixed_y_p1, y_p1_fixed_hat))
y_hat_err += tf.reduce_mean(tf.math.squared_difference(fixed_y_p1, y_p1_fixed_hat))
if self.n_input_variables > 1:
u2 = eye[:, 1]
if Settings.fixed_x_p2 is not None and len(Settings.fixed_x_p2) > 0:
fixed_x_p2 = tf.constant(np.reshape(Settings.fixed_x_p2,
[-1, Settings.num_dims_per_feature, Settings.num_features]),
dtype="float32")
fixed_y_p2 = tf.constant(np.reshape(Settings.fixed_y_p2,
[-1, Settings.n_dims_in_output, 1]),
dtype="float32")
y_p2_fixed_hat = self.eval_formula(fixed_x_p2 + d_eps * u2 / 2)
y_p2_fixed_hat -= self.eval_formula(fixed_x_p2 - d_eps * u2 / 2)
y_p2_fixed_hat = y_p2_fixed_hat / d_eps
y_hat_err_not_flat += tf.reduce_mean(tf.math.squared_difference(fixed_y_p2, y_p2_fixed_hat))
y_hat_err += tf.reduce_mean(tf.math.squared_difference(fixed_y_p2, y_p2_fixed_hat))
return y_hat_err_not_flat, y_hat_err
def get_formula_string(self, digits=None):
eval_dict = {self.init_op_weights: self.init_op_weight_matrix,
self.init_var_weights: self.init_var_weight_matrix}
inputs = []
for i in range(len(self.W_matrices[0])):
w_matrix = self.W_matrices[0][i].eval(session=self.sess)
b_vector = self.b_matrices[0][i].eval(session=self.sess)
if self.sm_leaf_layer:
sm_vector = self.leaf_sm_weights[i].eval(session=self.sess)
print("sm_vector: {}".format(sm_vector))
new_answer = [collect_op_inputs_str(sm_vector, np.zeros([1, 1]), self.var_names)]
new_input = collect_op_inputs_str(w_matrix, b_vector, new_answer)
else:
new_input = collect_op_inputs_str(w_matrix, b_vector, self.var_names)
inputs.extend([new_input])
for layer_i in range(1, len(self.W_matrices)):
sm_applied_this_layer = self.flattened_W_matrices[layer_i]
w_this_layer = self.W_matrices[layer_i]
b_this_layer = self.b_matrices[layer_i]
new_inputs = []
for iii in range(0, len(w_this_layer)):
new_inputs.extend([operation_to_str_best(w_this_layer[iii].eval(self.sess),
b_this_layer[iii].eval(self.sess),
sm_applied_this_layer[iii].eval(session=self.sess,
feed_dict=eval_dict),
inputs[2 * iii],
inputs[2 * iii + 1],
self.function_set,
unary_both=self.use_both_for_unary)])
inputs = new_inputs
if isinstance(inputs[0], list):
return inputs[0][0]
return inputs[0]
def get_minimal_formula_string(self):
eval_dict = {self.init_op_weights: self.init_op_weight_matrix,
self.init_var_weights: self.init_var_weight_matrix}
inputs = []
for i in range(len(self.W_matrices[0])):
# w_matrix = self.W_matrices[0][i].eval(self.sess)
# inputs.extend([collect_minimal_op_inputs_str(w_matrix, self.var_names)])
inputs.append("A{}".format(i+1))
for layer_i in range(1, len(self.W_matrices)):
sm_applied_this_layer = self.flattened_W_matrices[layer_i]
w_this_layer = self.W_matrices[layer_i]
new_inputs = []
for iii in range(0, len(sm_applied_this_layer)):
new_inputs.extend([operation_to_str_best(w_this_layer[iii].eval(self.sess),
None,
sm_applied_this_layer[iii].eval(session=self.sess,
feed_dict=eval_dict),
inputs[2 * iii],
inputs[2 * iii + 1],
self.function_set,
unary_both=self.use_both_for_unary,
minimal=True)])
inputs = new_inputs
if isinstance(inputs[0], list):
return inputs[0][0]
return inputs[0]
def eval_formula(self, input_x, flat=True):
inputs = []
for i in range(len(self.W_matrices[0])):
w_matrix = self.W_matrices[0][i]
b_vector = self.b_matrices[0][i]
if self.sm_leaf_layer:
post_sm_weights = self.leaf_sm_weights[i]
sm_result = tf.matmul(input_x, post_sm_weights)
result = tf.multiply(sm_result, w_matrix) + b_vector
else:
result = tf.matmul(input_x, w_matrix) + b_vector
inputs.extend([result])
for layer_i in range(1, len(self.W_matrices)):
sm_flat_this_layer = self.flattened_W_matrices[layer_i]
sm_applied_this_layer = self.sm_applied_W_matrices[layer_i]
w_this_layer = self.W_matrices[layer_i]
b_this_layer = self.b_matrices[layer_i]
new_inputs = []
for iii in range(0, len(w_this_layer)):
post_sm_weights = sm_applied_this_layer[iii]
flat_sm_weights = sm_flat_this_layer[iii]
op_result = operate_on_tensors(inputs[2 * iii],
inputs[2 * iii + 1],
self.function_set,
use_both_for_unary=self.use_both_for_unary)
if flat:
# result, flat_sm_weights = flattened_sm_result(op_result,
# post_sm_weights,
# w_this_layer[iii],
# b_this_layer[iii])
sm_result = tf.matmul(op_result, flat_sm_weights)
else:
sm_result = tf.matmul(op_result, post_sm_weights)
result = tf.multiply(sm_result, w_this_layer[iii]) + b_this_layer[iii]
new_inputs.extend([result])
inputs = new_inputs
if self.mode == "lr":
return spike(inputs[0])
return inputs[0]
def setup_derivative_values(self):
d2_eps = 1e-2
eye = tf.eye(self.n_input_variables)
u1 = eye[:, 0]
if self.n_input_variables > 1:
u2 = eye[:, 1]
if self.n_input_variables > 2:
u3 = eye[:, 2]
# u = []
# for i in range(self.n_input_variables):
# u_i = eye[:, i]
# dy / dx1
self.y_hat_p1 = self.eval_formula(self.data_x + d_eps * u1 / 2)
self.y_hat_p1 -= self.eval_formula(self.data_x - d_eps * u1 / 2)
self.y_hat_p1 = self.y_hat_p1 / d_eps
# d^2y / dx1^2
self.y_hat_pp1 = self.eval_formula(self.data_x + d2_eps * u1)
self.y_hat_pp1 -= (2 * self.eval_formula(self.data_x))
self.y_hat_pp1 += self.eval_formula(self.data_x - d2_eps * u1)
self.y_hat_pp1 /= (d2_eps ** 2)
if self.n_input_variables > 1:
# dy / dx2
self.y_hat_p2 = self.eval_formula(self.data_x + d_eps * u2 / 2)
self.y_hat_p2 -= self.eval_formula(self.data_x - d_eps * u2 / 2)
self.y_hat_p2 = self.y_hat_p2 / d_eps
# d^2y / dx2^2
self.y_hat_pp2 = self.eval_formula(self.data_x + d2_eps * u2)
self.y_hat_pp2 -= (2 * self.eval_formula(self.data_x))
self.y_hat_pp2 += self.eval_formula(self.data_x - d2_eps * u2)
self.y_hat_pp2 /= (d2_eps ** 2)
# d^2y / dx1 dx2
self.y_hat_pp12 = self.eval_formula(self.data_x + d2_eps * (u1 + u2))
self.y_hat_pp12 -= self.eval_formula(self.data_x - d2_eps * (u1 - u2))
self.y_hat_pp12 -= self.eval_formula(self.data_x - d2_eps * (u2 - u1))
self.y_hat_pp12 -= self.eval_formula(self.data_x + d2_eps * (-u1 - u2))
self.y_hat_pp12 /= (4 * d2_eps ** 2)
else:
self.y_hat_p2 = None
self.y_hat_pp2 = None
self.y_hat_pp12 = None
if self.n_input_variables > 2:
# dy / dx2
self.y_hat_p3 = self.eval_formula(self.data_x + d_eps * u3 / 2)
self.y_hat_p3 -= self.eval_formula(self.data_x - d_eps * u3 / 2)
self.y_hat_p3 = self.y_hat_p3 / d_eps
else:
self.y_hat_p3 = None
self.y_hat_p_not_flat = self.eval_formula(self.data_x + d_eps * u1 / 2, flat=False)
self.y_hat_p_not_flat -= self.eval_formula(self.data_x - d_eps * u1 / 2, flat=False)
self.y_hat_p_not_flat = self.y_hat_p_not_flat / d_eps
self.y_hat_pp_not_flat = self.eval_formula(self.data_x + d_eps * u1, flat=False)
self.y_hat_pp_not_flat -= 2 * self.eval_formula(self.data_x, flat=False)
self.y_hat_pp_not_flat += self.eval_formula(self.data_x - d_eps * u1, flat=False)
self.y_hat_pp_not_flat = self.y_hat_pp_not_flat / d_eps ** 2
self.implicit_g = our_tanh(implicit_function(self.data_x, self.y_hat,
[self.y_hat_p1, self.y_hat_p2, self.y_hat_p3],
[self.y_hat_pp1, self.y_hat_pp2, self.y_hat_pp12]))
self.implicit_g_not_flat = our_tanh(implicit_function(self.data_x, self.y_hat_not_flat,
[self.y_hat_p_not_flat, self.y_hat_p2, self.y_hat_p3],
[self.y_hat_pp_not_flat, self.y_hat_pp2,
self.y_hat_pp12]))
""" Like reset, but does not erase records of training history.
It only restarts training from a new random initialization. """
def soft_reset(self):
self.init = tf.compat.v1.global_variables_initializer()
self.saver = tf.compat.v1.train.Saver()
self.sess = tf.compat.v1.Session()
self.sess.run(self.init)
self.best_accuracy_so_far = 9999999
self.best_formula_so_far = ""
self.best_iter = 0
# Not needed, but don't touch
def set_init_op_weight_matrix(self, init_op_weight_matrix):
self.init_op_weight_matrix = init_op_weight_matrix
# Not needed, but don't touch
def set_init_var_weight_matrix(self, init_var_weight_matrix):
self.init_var_weight_matrix = init_var_weight_matrix
# Not 100% tested
def make_y_multi_safe(self, old_y):
if isinstance(old_y, list):
new_y = np.array(old_y)
new_y.reshape([-1, self.n_dims_in_output, 1])
else:
new_y = old_y.copy()
if len(new_y.shape) == 1:
assert (self.n_dims_in_output == 1)
new_y = [[[y_value] for _ in range(self.n_dims_per_variable)] for y_value in new_y]
new_y = np.array(new_y)
elif len(new_y.shape) == 2:
assert (self.n_dims_in_output == 1)
new_y = [[y_value for _ in range(self.n_dims_per_variable)] for y_value in new_y]
new_y = np.array(new_y)
elif new_y.shape[1] < self.n_dims_per_variable:
assert (self.n_dims_in_output == 1)
new_y = [[y_value[0] for _ in range(self.n_dims_per_variable)] for y_value in new_y]
new_y = np.array(new_y)
return new_y
def get_simple_formula(self, digits=None):
full_formula = self.get_formula_string()
return DataUtils.simplify_formula(full_formula, digits=digits)
# todo: want total or mean square error?
def test(self, x, y=None):
test_dict = {self.data_x: x,
self.init_op_weights: self.init_op_weight_matrix,
self.init_var_weights: self.init_var_weight_matrix}
if y is not None:
test_dict[self.data_y] = y
return self.sess.run(self.total_error, feed_dict=test_dict)
# Runs train process a number of times on a limited number of train steps.
# Returns the best formula found during that experience.
# If init_ops is given, it will start off with ops initialized accordingly.
# If it is None, then ops will be initialized randomly.
# If it is 0, then ops will have no initialization.
# Same with init_vars.
def train(self, x, y=None, init_op_weight_matrix=None, init_var_weight_matrix=None,
test_x=None, test_y=None):
n_rounds = Settings.num_train_steps_in_repeat_mode
batch_size = min(Settings.max_training_batch_size, int(len(x) / 2))
train_set_size = len(x)
train_x = np.array(x, dtype=np.float32)
if self.mode in ["de"]:
y = [0 for _ in range(x.shape[0])]
if test_x is not None:
test_y = [0 for _ in range(test_x.shape[0])]
# elif self.mode == ["sr", "lr"]:
# y = DataUtils.true_function(x)
# if test_x is not None:
# test_y = DataUtils.true_function(test_x)
train_y = self.make_y_multi_safe(y)
if test_y is not None:
test_y = self.make_y_multi_safe(test_y)
if init_op_weight_matrix is not None:
self.set_init_op_weight_matrix(init_op_weight_matrix)
if init_var_weight_matrix is not None:
self.set_init_var_weight_matrix(init_var_weight_matrix)
target_y = self.y_hat
show_gt = False
if Settings.show_output:
print("Starting actual training!")
start_time = time.time()
old_time = time.time()
time_spent_training = 0
time_getting_formulas = 0
time_getting_scores = 0
time_plotting = 0
other_time = 0
for i in range(1, n_rounds + 1):
mini_start_time = time.time()
train_batch_x, train_batch_y, valid_batch_x, valid_batch_y = DataUtils.get_samples(train_set_size,
batch_size,
train_x, train_y)
other_time += time.time() - mini_start_time
training_dict = {self.data_x: train_batch_x,
self.data_y: train_batch_y,
self.init_op_weights: self.init_op_weight_matrix,
self.init_var_weights: self.init_var_weight_matrix}
valid_batch_dict = {self.data_x: valid_batch_x,
self.data_y: valid_batch_y,
self.init_op_weights: self.init_op_weight_matrix,
self.init_var_weights: self.init_var_weight_matrix}
test_dict = {self.data_x: test_x, self.data_y: test_y,
self.init_op_weights: self.init_op_weight_matrix,
self.init_var_weights: self.init_var_weight_matrix}
""" Actual training happens here """
mini_start_time = time.time()
if i < n_rounds * Settings.t1_fraction:
self.sess.run(self.train_step_1, feed_dict=training_dict)
elif i < n_rounds * Settings.t2_fraction:
self.sess.run(self.train_step_2, feed_dict=training_dict)
else:
self.sess.run(self.train_step_3, feed_dict=training_dict)
time_spent_training += (time.time() - mini_start_time)
""" Save formulas, accuracy, etc. """
if (i % Settings.plot_frequency == 0 or i % Settings.output_freq == 0) and Settings.keep_logs:
# Save current formula to make list of all formulas seen
current_formula = "(Formula not saved)"
if Settings.save_all_formulas:
mini_start_time = time.time()
current_formula = self.get_simple_formula(digits=4)
time_getting_formulas += (time.time() - mini_start_time)
if current_formula not in self.seen_eqns:
self.seen_eqns.append(current_formula)
# Get results from validation set.
mini_start_time = time.time()
[valid_acc, y_pr_v] = self.sess.run([self.total_error, target_y], feed_dict=valid_batch_dict)
# Get results from test set.
if test_x is not None:
[test_acc, y_pr_test] = self.sess.run([self.total_error, target_y], feed_dict=test_dict)
y_gold_v = valid_batch_y.reshape([-1, self.n_dims_per_variable, 1])[0].tolist()
y_hat_v = y_pr_v.reshape([-1, self.n_dims_per_variable, 1]).tolist()
time_getting_scores += (time.time() - mini_start_time)
mini_start_time = time.time()
[valid_acc, g_pr_v] = self.sess.run([self.total_error, self.implicit_g], feed_dict=valid_batch_dict)
g_hat_val = g_pr_v.reshape([-1, self.n_dims_per_variable, 1]).tolist()
g_hat_1d_val = [y_value[0][0] for y_value in g_hat_val]
g_tru_1d_val = [y_value[0][0] for y_value in valid_batch_y]
g_hat_1d_test = None
g_tru_1d_test = None
[yp_v, ypp_v] = self.sess.run([self.y_hat_p1, self.y_hat_pp1], feed_dict=valid_batch_dict)
y_p1_v = yp_v.reshape([-1, self.n_dims_per_variable, 1]).tolist()
y_pp1_v = ypp_v.reshape([-1, self.n_dims_per_variable, 1]).tolist()
[yp2_v, ypp2_v] = self.sess.run([self.y_hat_p2, self.y_hat_pp2], feed_dict=valid_batch_dict)
y_p2_v = yp2_v.reshape([-1, self.n_dims_per_variable, 1]).tolist()
y_pp2_v = ypp2_v.reshape([-1, self.n_dims_per_variable, 1]).tolist()
time_getting_scores += (time.time() - mini_start_time)
if test_x is not None:
mini_start_time = time.time()
[test_acc, g_pr_test] = self.sess.run([self.total_error, self.implicit_g], feed_dict=test_dict)
g_hat_test = g_pr_test.reshape([-1, self.n_dims_per_variable, 1]).tolist()
g_hat_1d_test = [g_value[0][0] for g_value in g_hat_test]
g_tru_1d_test = [g_value[0][0] for g_value in test_y]
time_getting_scores += (time.time() - mini_start_time)
# Update best formula seen based on validation error.
if Settings.save_all_formulas:
if valid_acc < self.best_accuracy_so_far:
self.best_accuracy_so_far = valid_acc
self.best_formula_so_far = current_formula
self.best_iter = i
# We only can make plots using y values if y is 1d.
if self.n_dims_in_output == 1:
mini_start_time = time.time()
y_hat_1d_val = [y_value[0][0] for y_value in y_hat_v]
y_tru_1d_val = [y_value[0][0] for y_value in valid_batch_y]
y_hat_1d_test = None
y_tru_1d_test = None
if test_x is not None:
y_hat_test = y_pr_test.reshape([-1, self.n_dims_per_variable, 1]).tolist()
y_hat_1d_test = [y_value[0][0] for y_value in y_hat_test]
y_tru_1d_test = [y_value[0][0] for y_value in test_y]
other_time += (time.time() - mini_start_time)
if self.mode in ["sr", "lr"]:
# Plot predicted y value against actual y value.
mini_start_time = time.time()
DataUtils.plot_predicted_vs_actual(y_hat_1d_val, y_tru_1d_val,
y_hat_1d_test, y_tru_1d_test,
self.name,
set_name="Iteration {}".format(i))
time_plotting += (time.time() - mini_start_time)
# DataUtils.plot_2d_curve(x_1d_val, y_tru_1d_val, y_hat_1d_val, None, None, None)
# If x is also 1d, we can plot the function itself.
if self.n_input_variables == 1:
# Plot the actual function we learned.
mini_start_time = time.time()
x_1d_val = [x_value[0][0] for x_value in valid_batch_x]
x_1d_test = None
if test_x is not None:
x_1d_test = [x_value[0][0] for x_value in test_x]
other_time += (time.time() - mini_start_time)
mini_start_time = time.time()
DataUtils.plot_1d_curve(x_1d_val, y_tru_1d_val, y_hat_1d_val,
x_1d_test, y_tru_1d_test, y_hat_1d_test,
file_suffix="_y",
title="Learned function: Iteration {}".format(i),
show_ground_truth=show_gt)
time_plotting += (time.time() - mini_start_time)
# Plot the g output values, in implicit case
if test_x is not None:
mini_start_time = time.time()
DataUtils.plot_1d_curve(x_1d_val, g_tru_1d_val, g_hat_1d_val,
x_1d_test, g_tru_1d_test, g_hat_1d_test,
file_suffix="_g",
title="Output of g: Iteration {}".format(i))
time_plotting += time.time() - mini_start_time
elif self.n_input_variables == 2:
# Plot the actual function we learned.
mini_start_time = time.time()
plot2d_x1 = np.arange(Settings.test_scope[0], Settings.test_scope[1], 0.1)
plot2d_x2 = np.arange(Settings.test_scope[0], Settings.test_scope[1], 0.1)
plot2d_x1_m, plot2d_x2_m = np.meshgrid(plot2d_x1, plot2d_x2)
plot2d_x1 = np.reshape(plot2d_x1_m, [-1, 1, 1])
plot2d_x2 = np.reshape(plot2d_x2_m, [-1, 1, 1])
plot2d_x1x2 = np.concatenate([plot2d_x1, plot2d_x2], axis=-1)
[plot2d_y, plot2d_g] = self.sess.run([target_y, self.implicit_g],
feed_dict={self.data_x: plot2d_x1x2,
self.init_op_weights: self.init_op_weight_matrix,
self.init_var_weights: self.init_var_weight_matrix})
if self.mode == "sr":
plot2d_g = DataUtils.true_function(plot2d_x1x2)
plot2d_y_m = np.reshape(plot2d_y, plot2d_x1_m.shape)
plot2d_g_m = np.reshape(plot2d_g, plot2d_x1_m.shape)
DataUtils.plot_2d_curve(plot2d_x1_m, plot2d_x2_m, plot2d_y_m, plot2d_g_m)
time_plotting += (time.time() - mini_start_time)
if Settings.keep_logs:
mini_start_time = time.time()
self.train_accuracy_log.append(self.test(train_x, train_y))
# self.valid_accuracy_log.append(valid_acc)
self.valid_accuracy_log.append(self.test(valid_batch_x, valid_batch_y))
# self.log_iters.append(i)
if len(self.log_iters) == 0:
self.log_iters.append(i)
else:
self.log_iters.append(self.log_iters[-1] + Settings.plot_frequency)
accuracies_to_plot = [self.train_accuracy_log,
self.valid_accuracy_log]
accuracy_type_names = ["Training Error", "Validation Error"]
if test_x is not None:
self.test_accuracy_log.append(test_acc)
accuracies_to_plot.append(self.test_accuracy_log)
accuracy_type_names.append("Test Error")
time_getting_scores += (time.time() - mini_start_time)
mini_start_time = time.time()
DataUtils.plot_accuracy_over_time(self.log_iters, accuracies_to_plot, accuracy_type_names)
time_plotting += time.time() - mini_start_time
if i % Settings.output_freq == 0 and Settings.show_output:
if not Settings.keep_logs:
# Get results from validation set.
mini_start_time = time.time()
[valid_acc, y_pr_v] = self.sess.run([self.total_error, target_y], feed_dict=valid_batch_dict)
# Get results from test set.
if test_x is not None:
[test_acc, y_pr_test] = self.sess.run([self.total_error, target_y], feed_dict=test_dict)
y_hat_v = y_pr_v.reshape([-1, self.n_dims_per_variable, 1]).tolist()
g_pr_v = self.sess.run(self.implicit_g, feed_dict=valid_batch_dict)
g_hat_val = g_pr_v.reshape([-1, self.n_dims_per_variable, 1]).tolist()
[yp1_v, ypp1_v] = self.sess.run([self.y_hat_p1, self.y_hat_pp1],
feed_dict={self.data_x: valid_batch_x,
self.init_op_weights: self.init_op_weight_matrix,
self.init_var_weights: self.init_var_weight_matrix})
y_p1_v = yp1_v.reshape([-1, self.n_dims_per_variable, 1]).tolist()
y_pp1_v = ypp1_v.reshape([-1, self.n_dims_per_variable, 1]).tolist()
[yp2_v, ypp2_v] = self.sess.run([self.y_hat_p2, self.y_hat_pp2],
feed_dict={self.data_x: valid_batch_x,
self.init_op_weights: self.init_op_weight_matrix,
self.init_var_weights: self.init_var_weight_matrix})
y_p2_v = yp2_v.reshape([-1, self.n_dims_per_variable, 1]).tolist()
y_pp2_v = ypp2_v.reshape([-1, self.n_dims_per_variable, 1]).tolist()
time_getting_scores += (time.time() - mini_start_time)
print()
print('Iteration {}:'.format(i))
mini_start_time = time.time()
formula_as_string = self.get_formula_string(digits=4)
dotdot = ""
if len(formula_as_string) > Settings.max_formula_output_length:
dotdot = " ..."
print("# Current Model: {}{}".format(formula_as_string[:Settings.max_formula_output_length], dotdot))
simple_formula = self.get_simple_formula(digits=4)
dotdot = ""
if len(simple_formula) > Settings.max_formula_output_length:
dotdot = " ..."
print("# AKA: {}{}".format(simple_formula[:Settings.max_formula_output_length], dotdot))
minimal_eqn = self.get_minimal_formula_string()
print("# Simple: {}".format(minimal_eqn))
if minimal_eqn not in self.seen_minimal_eqns:
self.seen_minimal_eqns.append(minimal_eqn)
if "**" in simple_formula:
print("(Has a power)")
time_getting_formulas += (time.time() - mini_start_time)
print(
" Length: {} ({})".format(len(formula_as_string), len("{}".format(simple_formula))))
print(" Train batch size: {}".format(train_batch_x.shape))
print(" Valid batch size: {}".format(valid_batch_x.shape))
print(" # Mnml eqns seen: {}".format(len(self.seen_minimal_eqns)))
iters_per_min = Settings.output_freq * 60 / (time.time() - old_time)
print(' Iters per minute: {:.2f}'.format(iters_per_min))
total_time = time.time() - start_time
print(' Time so far: {:.2f} minutes'.format(total_time / 60.0))
print(' ({:.1%} training, {:.1%} scoring, {:.1%} formulas)'.format(
time_spent_training / total_time,
time_getting_scores / total_time,
time_getting_formulas / total_time))
print(' ({:.1%} plotting, {:.1%} other)'.format(time_plotting / total_time,
other_time / total_time))
print(' Est. time left: {:.2f} minutes'.format((n_rounds - i) / iters_per_min))
print('Error values:')
mini_start_time = time.time()
curr_errs = self.sess.run([self.g_error, self.ivp_error,
self.spike_error, self.total_error],
feed_dict=valid_batch_dict)
if self.mode == "de":
print(' g-err Valid: {}'.format(curr_errs[0]))
print(' IVP Valid: {}'.format(curr_errs[1]))
if Settings.non_const:
print(' Spike err: {}'.format(curr_errs[2]))
print(' Tot. Val.: {}'.format(curr_errs[3]))
if np.abs(curr_errs[3] - (curr_errs[0] + self.ivp_lambda * curr_errs[1] + curr_errs[2])) > 1e-4:
print("Something is wrong.")
# Hope we don't get nans, but break out if we do.
nans = np.isnan(curr_errs[0])
if nans:
break
if test_x is not None:
print(' Tot. Test: {}'.format(test_acc))
time_getting_scores += (time.time() - mini_start_time)
print('Performance on sample validation data:')
print_str = ""
for feature_i in range(Settings.num_features):
print_str += "{}\t\t".format(self.var_names[feature_i])
print_str += "|\t"
# print_str += "y_tru\t"
if self.mode == "de":
print_str += "g_hat\t"
elif self.mode == "sr" or self.mode == "lr":
print_str += "y_tru\t"
print_str += "y_hat\t"
if self.mode == "de":
print_str += "y_p1\t"
print_str += "y_pp1\t"
print_str += "y_p2\t"
print_str += "y_pp2\t"
print(print_str)
line_len = len(print_str) + 16
print("=" * line_len)
var_range = range(self.n_input_variables)
num_pts_to_show = 5
if self.mode in ["sr", "lr"]:
y_tru_v = valid_batch_y[:num_pts_to_show, :, :]
# y_tru_v = DataUtils.predict_from_formula(Settings.true_eqn, valid_batch_x[:num_pts_to_show, :, :])
for datapoint_i in range(min(valid_batch_x.shape[0], num_pts_to_show)):
comps_to_show = range(self.n_dims_per_variable)
if self.n_dims_per_variable > 9:
comps_to_show = [0, 1, 2, -1]
for component_j in comps_to_show:
if component_j == -1:
print(" ... ")
print_str = ""
for var_k in var_range:
x_ijk = valid_batch_x[datapoint_i, component_j, var_k]
print_str += "{:.3f}\t".format(x_ijk)
print_str += "|\t"
# print_str += "{:.3f}\t".format(valid_batch_y[datapoint_i, component_j, 0])
if self.mode == "de":
print_str += "{:.3f}\t".format(g_hat_val[datapoint_i][component_j][0])
elif self.mode in ["sr", "lr"]:
print_str += "{:.3f}\t".format(y_tru_v[datapoint_i][0][0])
print_str += "{:.3f}\t".format(y_hat_v[datapoint_i][component_j][0])
if self.mode == "de":
print_str += "{:.3f}\t".format(y_p1_v[datapoint_i][component_j][0])
print_str += "{:.3f}\t".format(y_pp1_v[datapoint_i][component_j][0])
print_str += "{:.3f}\t".format(y_p2_v[datapoint_i][component_j][0])
print_str += "{:.3f}\t".format(y_pp2_v[datapoint_i][component_j][0])
print(print_str)
print("-" * line_len)
print()
old_time = time.time()
if Settings.show_output:
print('Finished training at {:%H:%M:%S}.\n'.format(datetime.datetime.now()))
end_time = time.time()
total_time = end_time - start_time
print('Took {:.2f} seconds to finish.'.format(total_time))
print(' ({:.1%} training, {:.1%} scoring, {:.1%} formulas)'.format(time_spent_training / total_time,
time_getting_scores / total_time,
time_getting_formulas / total_time))
print(' ({:.1%} plotting, {:.1%} other)'.format(time_plotting / total_time,
other_time / total_time))
print('Average of {:.2f} training steps per minute.'.format(
60 * n_rounds / total_time))
print('Average of {:.2f} minutes per 10000 training steps.'.format(
10000 * total_time / (60 * n_rounds)))
print()
if Settings.save_all_formulas:
print("Best formula had accuracy {:.3f} and was seen at iteration {}:".format(
self.best_accuracy_so_far,
self.best_iter))
print("{}".format(self.best_formula_so_far)[:1000])
else:
final_acc = self.sess.run(self.total_error, feed_dict={self.data_x: train_x,
self.data_y: train_y,
self.init_op_weights: self.init_op_weight_matrix,
self.init_var_weights: self.init_var_weight_matrix})
print("Final formula had accuracy {:.3f}:".format(final_acc))
print("{}".format(self.get_simple_formula(digits=4))[:1000])
print()
return self.get_simple_formula(digits=4)
def repeat_train(self, x, y=None,
num_repeats=Settings.num_train_repeat_processes,
test_x=None, test_y=None,
verbose=True):
# we still reduce train set size if only 1 repeat
train_set_size = int(len(x) * Settings.quick_train_fraction + 0.1)
x = np.array(x)
if y is not None:
y = np.array(y)
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
y = np.zeros((x.shape[0], 1, 1))
sample = np.random.choice(range(x.shape[0]), size=train_set_size, replace=False)
train_x = x[sample][:]
if y is not None:
train_y = y[sample]
out_sample = [aaa for aaa in range(x.shape[0]) if aaa not in sample]
valid_x = x[out_sample][:]
if y is not None:
valid_y = y[out_sample]
valid_y = self.make_y_multi_safe(valid_y)
best_formula = ""
best_iter = 0
best_validation = 999999
best_err = 999999
old_time = time.time()
if verbose:
print("Beginning {} repeat sessions of {} iterations each.".format(num_repeats,
Settings.num_train_steps_in_repeat_mode))
print()
start_time = time.time()
old_time = start_time
for train_iter in range(1, 1 + num_repeats):
if verbose:
print("Repeated train session {} of {}.".format(train_iter, num_repeats))
self.soft_reset()
self.set_init_op_weight_matrix(choices_to_init_weight_matrix(Settings.initialize_ops,
self.function_set))
self.set_init_var_weight_matrix(choices_to_init_weight_matrix(np.zeros([2 ** self.n_tree_layers]),
self.var_names))
self.train(train_x, train_y, test_x=test_x, test_y=test_y)
valid_err = self.test(valid_x, valid_y)
current_time = time.time()
if verbose:
# print(self.get_simple_formula())
print("Attained validation error: {:.5f}".format(valid_err))
if valid_err < best_validation:
best_validation = valid_err
best_formula = self.get_simple_formula()
best_iter = train_iter
if test_x is not None:
safe_test_y = self.make_y_multi_safe(test_y)
best_err = self.test(test_x, safe_test_y)
else:
best_err = valid_err
if verbose:
print(">>> New best model!")
print(best_formula)
if verbose:
iters_per_minute = 60.0 / (current_time - old_time)
print("Took {:.2f} minutes.".format((current_time - old_time) / 60))
print("Est. {:.2f} minutes remaining.".format((num_repeats - train_iter) / iters_per_minute))
print()
old_time = current_time
if verbose:
print("Total time for repeat process: {:.2f} minutes.".format((time.time() - start_time) / 60))
return best_formula, best_iter, best_err