Spaces:
Runtime error
Runtime error
""""""""""""""""""""""""""""""""" | |
Do not run or modify this file. | |
For running: DiffEqnSolver.py | |
For modifying: Settings.py | |
""""""""""""""""""""""""""""""""" | |
import datetime | |
import time | |
import numpy as np | |
import tensorflow as tf | |
import DataUtils | |
import Settings | |
from DataUtils import choices_to_init_weight_matrix | |
from DataUtils import tf_diff_sqrt, tf_diff_log, our_tanh, spike | |
from Settings import implicit_function, d_eps | |
tf.compat.v1.disable_eager_execution() | |
def new_weight_matrix(n_rows, n_cols, mean=0.0, name=None): | |
initial = tf.random.normal(shape=[n_rows, n_cols], mean=mean, stddev=Settings.w_matrix_stddev) | |
if name is not None: | |
return tf.Variable(initial, name=name) | |
return tf.Variable(initial) | |
def new_bias(n_cols, name=None): | |
initial = tf.zeros(shape=[1, n_cols]) | |
if name is not None: | |
return tf.Variable(initial, name=name) | |
return tf.Variable(initial) | |
def operate_on_tensors(tensor_A, tensor_B, fn_set, use_both_for_unary=True): | |
# print('op on tensors. input shapes: {}, {}'.format(tensor_A.shape, tensor_B.shape)) | |
if use_both_for_unary: | |
w2 = 1.0 | |
else: | |
w2 = 0.0 | |
answer_vector = [] | |
for operator_i in fn_set: | |
if operator_i == 'id': | |
answer_vector.extend([tensor_A[:, :, 0] + w2 * tensor_B[:, :, 0]]) | |
# print("id vector shape: {}".format(answer_vector[-1].shape)) | |
elif operator_i == 'add': | |
answer_vector.extend([tensor_A[:, :, 0] + tensor_B[:, :, 0]]) | |
elif operator_i == 'sin': | |
answer_vector.extend([tf.sin(tensor_A[:, :, 0] + w2 * tensor_B[:, :, 0])]) | |
elif operator_i == 'cos': | |
answer_vector.extend([tf.cos(tensor_A[:, :, 0] + w2 * tensor_B[:, :, 0])]) | |
elif operator_i == 'sqrt': | |
answer_vector.extend([tf_diff_sqrt(tensor_A[:, :, 0] + w2 * tensor_B[:, :, 0])]) | |
elif operator_i == 'mul': | |
answer_vector.extend([tf.multiply(tensor_A[:, :, 0], tensor_B[:, :, 0])]) | |
elif operator_i == 'div': | |
answer_vector.extend([tf.math.divide_no_nan(tensor_A[:, :, 0], tensor_B[:, :, 0])]) | |
elif operator_i == 'log': | |
answer_vector.extend([tf_diff_log(tensor_A[:, :, 0] + w2 * tensor_B[:, :, 0])]) | |
elif operator_i == 'exp': | |
answer_vector.extend([tf.exp(our_tanh(tensor_A[:, :, 0] + w2 * tensor_B[:, :, 0], factor=np.log(50000)))]) | |
else: | |
answer_vector.extend([None]) | |
return tf.stack(answer_vector, axis=-1) | |
def sm_no_const_selector(nonflat_input, flat_input, initial_weights): | |
# print("sm_no_const_selector---") | |
# print("initial_weights: {}".format(initial_weights.shape)) | |
# print("nonflat_input: {}".format(nonflat_input.shape)) | |
pre_sm_weights = new_weight_matrix(int(nonflat_input.shape[-1]), 1) | |
post_sm_weights = tf.math.softmax(pre_sm_weights + initial_weights, axis=0) | |
# print("post_sm_weights: {}".format(post_sm_weights.shape)) | |
sm_result = tf.matmul(nonflat_input, post_sm_weights) | |
# print("sm_result: {}".format(sm_result.shape)) | |
flat_weights = tf.multiply(post_sm_weights, | |
tf.cast(tf.greater(post_sm_weights, | |
tf.reduce_max(post_sm_weights) - 0.01), tf.float32)) | |
flat_weights = tf.divide(flat_weights, tf.reduce_sum(flat_weights)) | |
flat_result = tf.matmul(flat_input, flat_weights) | |
return sm_result, flat_result, pre_sm_weights+initial_weights, post_sm_weights, flat_weights | |
def collect_op_inputs_str(weight_w, weight_b, input_strs): | |
num_inputs = weight_w.shape[0] | |
# print("num_inputs: {}. input_strs length: {}".format(num_inputs, len(input_strs))) | |
# print(weight_w) | |
# print(input_strs) | |
temp_answer = '' | |
has_one = False | |
has_more_than_one = False | |
for row in range(num_inputs): | |
if np.abs(weight_w[row][0]) > Settings.big_eps and input_strs[row] != '0': | |
if has_one: | |
temp_answer += ' + ' | |
has_more_than_one = True | |
if np.abs(weight_w[row][0] - 1) < Settings.big_eps: | |
temp_answer += '{}'.format(input_strs[row]) | |
else: | |
temp_answer += '{:.4f}*({})'.format(weight_w[row][0], input_strs[row]) | |
has_one = True | |
# print('weight_b[-1]: {}'.format(weight_b)) | |
if np.abs(weight_b[-1][0]) > Settings.big_eps: | |
if has_one: | |
temp_answer += ' + ' | |
has_more_than_one = True | |
temp_answer += '{:.4f}'.format(weight_b[-1][0]) | |
if len(temp_answer) == 0: | |
temp_answer = '0' | |
if has_more_than_one: | |
return '(' + temp_answer + ')' | |
return temp_answer | |
def collect_minimal_op_inputs_str(weight_w, input_strs): | |
num_inputs = weight_w.shape[0] | |
temp_answer = '' | |
has_one = False | |
has_more_than_one = False | |
for row in range(num_inputs): | |
if has_one: | |
temp_answer += ' + ' | |
has_more_than_one = True | |
temp_answer += '{}'.format(input_strs[row]) | |
has_one = True | |
if len(temp_answer) == 0: | |
temp_answer = '0' | |
if has_more_than_one: | |
return '(' + temp_answer + ')' | |
return temp_answer | |
def operation_to_str_best(weight_w, weight_b, weight_sm, input_strs1, input_strs2, fn_set, | |
digits=None, unary_both=True, minimal=False): | |
if input_strs2 is None: | |
temp_answer = collect_op_inputs_str(weight_w, weight_b, input_strs1) | |
return [temp_answer] | |
answer = ['0' for _ in fn_set] | |
temp_answer1 = input_strs1 | |
temp_answer2 = input_strs2 | |
# Set up temp answer. Don't change this value! | |
if unary_both: | |
if temp_answer1 == '0' and temp_answer2 == '0': | |
temp_answer = '0' | |
elif temp_answer1 == '0': | |
temp_answer = str(temp_answer2) | |
elif temp_answer2 == '0': | |
temp_answer = str(temp_answer1) | |
else: | |
temp_answer = '({} + {})'.format(temp_answer1, temp_answer2) | |
else: | |
temp_answer = str(temp_answer1) | |
if 'id' in fn_set: | |
fn_index = fn_set.index('id') | |
answer[fn_index] = temp_answer | |
if 'sin' in fn_set: | |
fn_index = fn_set.index('sin') | |
if temp_answer == '0': | |
answer[fn_index] = '0' | |
else: | |
answer[fn_index] = 'sin({})'.format(temp_answer) | |
if 'cos' in fn_set: | |
fn_index = fn_set.index('cos') | |
answer[fn_index] = 'cos({})'.format(temp_answer) | |
if 'sqrt' in fn_set: | |
fn_index = fn_set.index('sqrt') | |
answer[fn_index] = '(abs({}))^(0.5)'.format(temp_answer) | |
if 'log' in fn_set: | |
fn_index = fn_set.index('log') | |
if temp_answer == '0': | |
answer[fn_index] = 'log(0.0001)' | |
else: | |
answer[fn_index] = 'log({})'.format(temp_answer) | |
if 'mul' in fn_set: | |
fn_index = fn_set.index('mul') | |
if temp_answer1 == '0' or temp_answer2 == '0': | |
prod_answer = '0' | |
else: | |
prod_answer = '({} * {})'.format(temp_answer1, temp_answer2) | |
answer[fn_index] = prod_answer | |
if 'add' in fn_set: | |
fn_index = fn_set.index('add') | |
if temp_answer1 == '0' and temp_answer2 == '0': | |
sum_answer = '0' | |
elif temp_answer1 == '0': | |
sum_answer = str(temp_answer2) | |
elif temp_answer2 == '0': | |
sum_answer = str(temp_answer1) | |
else: | |
sum_answer = '({} + {})'.format(temp_answer1, temp_answer2) | |
answer[fn_index] = sum_answer | |
if 'sub' in fn_set: | |
fn_index = fn_set.index('sub') | |
temp_answer1 = input_strs1 | |
temp_answer2 = input_strs2 | |
if temp_answer1 == '0' and temp_answer2 == '0': | |
diff_answer = '0' | |
elif temp_answer1 == '0': | |
diff_answer = "-{}".format(temp_answer2) | |
elif temp_answer2 == '0': | |
diff_answer = temp_answer1 | |
else: | |
diff_answer = '({} - {})'.format(temp_answer1, temp_answer2) | |
answer[fn_index] = diff_answer | |
if 'max' in fn_set: | |
fn_index = fn_set.index('max') | |
answer[fn_index] = 'max({}, {})'.format(temp_answer1, temp_answer2) | |
if 'min' in fn_set: | |
fn_index = fn_set.index('min') | |
answer[fn_index] = 'min({}, {})'.format(temp_answer1, temp_answer2) | |
if 'div' in fn_set: | |
fn_index = fn_set.index('div') | |
if temp_answer2 == '0': | |
temp_answer2 = '0.001' | |
if temp_answer1 == '0': | |
div_answer = '0' | |
else: | |
div_answer = '({} / ({}))'.format(temp_answer1, temp_answer2) | |
answer[fn_index] = div_answer | |
if 'exp' in fn_set: | |
fn_index = fn_set.index('exp') | |
answer[fn_index] = 'exp({})'.format(temp_answer) | |
new_answer = [collect_op_inputs_str(weight_sm, np.zeros([1, 1]), answer)] | |
# print('New answer: {}'.format(new_answer)) | |
# print('weight w, weight b: {}, {}'.format(weight_w, weight_b)) | |
if minimal: | |
ret_val = collect_minimal_op_inputs_str(weight_w, new_answer) | |
else: | |
ret_val = collect_op_inputs_str(weight_w, weight_b, new_answer) | |
return ret_val | |
def flattened_sm_result(input_x, sm_applied_weights, our_w, our_b): | |
# print('Create operator node. Input shapes: {}, {}'.format(input_1.shape, input_2.shape)) | |
new_sm_weights = tf.multiply(sm_applied_weights, | |
tf.cast(tf.greater(sm_applied_weights, | |
tf.reduce_max(sm_applied_weights) - 0.01), tf.float32)) | |
new_sm_weights = tf.divide(new_sm_weights, tf.reduce_sum(new_sm_weights)) | |
sm_result = tf.matmul(input_x, new_sm_weights) | |
final_result = tf.multiply(sm_result, our_w) + our_b | |
# print(' Final result shape: {}'.format(final_result.shape)) | |
return final_result, new_sm_weights | |
class SFL: | |
def __init__(self, var_names=None): | |
self.name = "Symbolic Function Learner" | |
self.short_name = "SFL" | |
# mode: in ["sr", "de", "lr"] | |
self.mode = Settings.mode | |
# main hyperparameters of the symbolic expression | |
self.n_tree_layers = Settings.n_tree_layers | |
self.function_set = Settings.function_set.copy() | |
self.n_input_variables = Settings.num_features | |
self.n_dims_per_variable = Settings.num_dims_per_feature | |
self.n_dims_in_output = Settings.n_dims_in_output | |
assert self.n_dims_in_output in [1, self.n_dims_per_variable] | |
# use_both_for_unary: decide how to handle two input children | |
# for a unary operator. | |
# True: add the two inputs. | |
# False: keep first input; discard second input. | |
self.use_both_for_unary = Settings.use_both_for_unary | |
# Use a softmax on leaf layer? | |
self.sm_leaf_layer = Settings.use_leaf_sm | |
# data_x,y: Input (x, y) values over which we are training. | |
# For symbolic regression, it's the same as fixed_x,y. | |
# For differential equations, it's random values. | |
self.data_x = tf.compat.v1.placeholder("float", [None, self.n_dims_per_variable, | |
self.n_input_variables], name="data_x") | |
self.data_y = tf.compat.v1.placeholder("float", [None, self.n_dims_per_variable, 1], name="data_y") | |
# Fixed_x,y: these are the set of points that must be satisfied | |
# by the function that is learned. These are used to compute | |
# the residual error in the cost function. | |
self.fixed_x = tf.compat.v1.placeholder("float", [None, self.n_dims_per_variable, | |
self.n_input_variables], name="data_x") | |
self.fixed_y = tf.compat.v1.placeholder("float", [None, self.n_dims_per_variable, 1], name="data_y") | |
# To initialize operators in the SFL with a warm start before training | |
self.init_op_weights = tf.compat.v1.placeholder("float", [len(self.function_set), 2 ** self.n_tree_layers - 1], | |
name="init_op_weights") | |
self.init_op_weight_matrix = np.zeros(shape=[len(self.function_set), 2 ** self.n_tree_layers - 1]) | |
# To initialize variable choices in the SFL with a warm start before training | |
num_var_input_choices = self.n_input_variables | |
self.init_var_weights = tf.compat.v1.placeholder("float", [num_var_input_choices, 2 ** self.n_tree_layers], | |
name="init_var_weights") | |
self.init_var_weight_matrix = np.zeros(shape=[num_var_input_choices, 2 ** self.n_tree_layers]) | |
# variables can have default or custom names | |
if self.n_input_variables == 1 and var_names is None: | |
self.var_names = ['x'] | |
elif var_names is None: | |
self.var_names = ['x{}'.format(i + 1) for i in range(self.n_input_variables)] | |
else: | |
self.var_names = var_names | |
self.learn_rate = Settings.learn_rate | |
self.y_gold = self.data_y | |
self.g_error = tf.Variable(0.0) | |
self.g_error_not_flat = tf.Variable(0.0) | |
self.mse = tf.Variable(0.0) | |
self.mse_not_flat = tf.Variable(0.0) | |
self.spike_error = tf.Variable(0.0) | |
self.ivp_error = tf.Variable(0.0) | |
self.ivp_error_not_flat = tf.Variable(0.0) | |
self.total_error = tf.Variable(0.0) | |
if self.mode == "de": | |
self.ivp_lambda = Settings.ivp_lambda | |
else: | |
self.ivp_lambda = 0 | |
self.train_accuracy_log = [] | |
self.valid_accuracy_log = [] | |
self.test_accuracy_log = [] | |
self.seen_eqns = [] | |
self.seen_minimal_eqns = [] | |
self.log_iters = [] | |
self.best_accuracy_so_far = 9999999 | |
self.best_formula_so_far = "" | |
self.best_iter = 0 | |
self.y_hat = None | |
self.y_hat_p1 = None | |
self.y_hat_pp1 = None | |
self.y_hat_p2 = None | |
self.y_hat_pp2 = None | |
self.y_hat_p3 = None | |
self.y_hat_pp3 = None | |
self.y_hat_pp12 = None | |
self.implicit_g = None | |
self.y_hat_not_flat = None | |
self.y_hat_p_not_flat = None | |
self.y_hat_pp_not_flat = None | |
self.implicit_g_not_flat = None | |
self.W_matrices = [] | |
self.b_matrices = [] | |
self.non_sm_weights = [] | |
self.leaf_sm_weights = [] | |
self.sm_W_matrices = [] | |
self.sm_applied_W_matrices = [] | |
self.flattened_W_matrices = [] | |
self.use_both_for_unary = Settings.use_both_for_unary | |
self.init = None | |
self.sess = None | |
self.build_sfl() | |
self.reset(var_names) | |
def build_sfl(self): | |
self.data_x = tf.compat.v1.placeholder("float", [None, self.n_dims_per_variable, | |
self.n_input_variables], name="data_x") | |
self.data_y = tf.compat.v1.placeholder("float", [None, self.n_dims_per_variable, 1], name="data_y") | |
self.fixed_x = tf.compat.v1.placeholder("float", [None, self.n_dims_per_variable, | |
self.n_input_variables], name="fixed_x") | |
self.fixed_y = tf.compat.v1.placeholder("float", [None, self.n_dims_per_variable, 1], name="fixed_y") | |
# To initialize operators in the SFL with a warm start before training | |
self.init_op_weights = tf.compat.v1.placeholder("float", [len(self.function_set), 2 ** self.n_tree_layers - 1], | |
name="init_op_weights") | |
# To initialize variable choices in the SFL with a warm start before training | |
# Right now, only one variable is supported. | |
num_var_input_choices = self.n_input_variables | |
self.init_var_weights = tf.compat.v1.placeholder("float", [num_var_input_choices, 2 ** self.n_tree_layers], | |
name="init_var_weights") | |
self.g_error = tf.Variable(0.0) | |
self.g_error_not_flat = tf.Variable(0.0) | |
self.mse = tf.Variable(0.0) | |
self.mse_not_flat = tf.Variable(0.0) | |
self.spike_error = tf.Variable(0.0) | |
self.ivp_error = tf.Variable(0.0) | |
self.ivp_error_not_flat = tf.Variable(0.0) | |
self.total_error = tf.Variable(0.0) | |
self.W_matrices = [] | |
self.b_matrices = [] | |
self.non_sm_weights = [] | |
self.leaf_sm_weights = [] | |
self.sm_W_matrices = [] | |
self.sm_applied_W_matrices = [] | |
self.flattened_W_matrices = [] | |
previous_output = [] | |
previous_flat_output = [] | |
weight_layer = [] | |
bias_layer = [] | |
if Settings.show_output: | |
print("Setting up {} model.".format(self.name)) | |
print(" {} tree layers.".format(self.n_tree_layers)) | |
print(" {} features of {} component(s) each.".format(self.n_input_variables, self.n_dims_per_variable)) | |
print(" {} component(s) in output.".format(self.n_dims_in_output)) | |
print(" {} operators: {}.".format(len(self.function_set), | |
self.function_set)) | |
# Set up leaf layer | |
for i in range(2 ** (Settings.n_tree_layers - 1)): | |
if self.sm_leaf_layer: | |
num_leaf_weights = 1 | |
else: | |
num_leaf_weights = self.n_input_variables | |
new_weights1 = new_weight_matrix(num_leaf_weights, 1, mean=0.0) | |
new_b1 = new_bias(1) | |
new_weights2 = new_weight_matrix(num_leaf_weights, 1, mean=0.0) | |
new_b2 = new_bias(1) | |
# print("self.data_x.shape: {}".format(self.data_x.shape)) | |
if self.sm_leaf_layer: | |
new_sm_weights1 = new_weight_matrix(self.n_input_variables, 1, mean=0.0) | |
new_sm_weights2 = new_weight_matrix(self.n_input_variables, 1, mean=0.0) | |
input_1 = tf.matmul(self.data_x, tf.math.softmax(new_sm_weights1, axis=0)) | |
input_2 = tf.matmul(self.data_x, tf.math.softmax(new_sm_weights2, axis=0)) | |
# todo: ugh | |
# new_weights1 = tf.constant([[1.0]]) | |
# new_weights2 = tf.constant([[1.0]]) | |
else: | |
input_1 = self.data_x | |
input_2 = self.data_x | |
# print("input_1.shape: {}".format(input_1.shape)) | |
result_1 = tf.matmul(input_1, new_weights1) + new_b1 | |
result_2 = tf.matmul(input_2, new_weights2) + new_b2 | |
# print("result_1.shape: {}".format(result_1.shape)) | |
weight_layer.extend([new_weights1, new_weights2]) | |
bias_layer.extend([new_b1, new_b2]) | |
if self.sm_leaf_layer: | |
self.leaf_sm_weights.extend([tf.math.softmax(new_sm_weights1, axis=0), | |
tf.math.softmax(new_sm_weights2, axis=0)]) | |
self.non_sm_weights.extend([new_weights1, new_weights2, new_b1, new_b2]) | |
# self.non_sm_weights.extend([new_weights1, new_weights2]) | |
previous_output.extend([result_1, result_2]) | |
previous_flat_output.extend([result_1, result_2]) | |
self.W_matrices.append(weight_layer) | |
self.b_matrices.append(bias_layer) | |
self.sm_W_matrices.append([]) | |
self.sm_applied_W_matrices.append([]) | |
self.flattened_W_matrices.append([]) | |
current_node = 0 | |
# Set up parent layers, one at a time going up | |
for j in range(Settings.n_tree_layers): | |
sm_weight_layer = [] | |
sm_applied_weight_layer = [] | |
flattened_weight_layer = [] | |
weight_layer = [] | |
bias_layer = [] | |
new_output = [] | |
new_flat_output = [] | |
result_layer = [] | |
flattened_result_layer = [] | |
for i in range(2 ** (Settings.n_tree_layers - j - 1)): | |
current_input_1 = previous_output[2 * i] | |
current_input_2 = previous_output[2 * i + 1] | |
nonflatten_input = operate_on_tensors(current_input_1, current_input_2, self.function_set, | |
use_both_for_unary=self.use_both_for_unary) | |
current_flat_input_1 = previous_flat_output[2 * i] | |
current_flat_input_2 = previous_flat_output[2 * i + 1] | |
flatten_input = operate_on_tensors(current_flat_input_1, current_flat_input_2, | |
self.function_set, | |
use_both_for_unary=self.use_both_for_unary) | |
init_op_weights = tf.reshape(self.init_op_weights[:, current_node], [-1, 1]) | |
sm_r, flat_r, pre_sm_w, post_sm_w, flat_w = sm_no_const_selector(nonflatten_input, | |
flatten_input, | |
init_op_weights) | |
new_w = new_weight_matrix(1, 1, mean=1.0) | |
new_b = new_bias(1) | |
# self.non_sm_weights.extend([new_b]) | |
sm_r = tf.math.multiply(sm_r, new_w) + new_b | |
flat_r = tf.multiply(flat_r, new_w) + new_b | |
sm_weight_layer.extend([pre_sm_w]) | |
sm_applied_weight_layer.extend([post_sm_w]) | |
flattened_weight_layer.extend([flat_w]) | |
new_output.extend([sm_r]) | |
new_flat_output.extend([flat_r]) | |
weight_layer.extend([new_w]) | |
bias_layer.extend([new_b]) | |
""" self.non_sm_weights.extend([new_w, new_b])""" | |
result_layer.extend([sm_r]) | |
flattened_result_layer.extend([flat_r]) | |
current_node += 1 | |
self.sm_W_matrices.extend([sm_weight_layer]) | |
self.sm_applied_W_matrices.extend([sm_applied_weight_layer]) | |
self.flattened_W_matrices.extend([flattened_weight_layer]) | |
self.W_matrices.extend([weight_layer]) | |
self.b_matrices.extend([bias_layer]) | |
previous_output = new_output | |
previous_flat_output = new_flat_output | |
if self.mode == "lr": | |
self.y_hat_not_flat = spike(previous_output[-1]) | |
self.y_hat = spike(previous_flat_output[-1]) | |
else: | |
self.y_hat_not_flat = our_tanh(previous_output[-1], factor=10000) | |
self.y_hat = our_tanh(previous_flat_output[-1], factor=10000) | |
def reset(self, var_names=None): | |
tf.compat.v1.reset_default_graph() | |
self.build_sfl() | |
if var_names is not None: | |
self.var_names = var_names | |
self.log_iters = [] | |
self.train_accuracy_log = [] | |
self.valid_accuracy_log = [] | |
self.test_accuracy_log = [] | |
self.seen_eqns = [] | |
self.seen_minimal_eqns = [] | |
self.setup_derivative_values() | |
self.setup_err_values(non_const=Settings.non_const) | |
# TODO: really need to sort out the whole fixed_x, fixed_y thing | |
if self.mode == "de": | |
self.ivp_error_not_flat, self.ivp_error = self.setup_ivp_values(self.fixed_x, self.fixed_y) | |
if self.mode == "de": | |
self.total_error = self.total_error + self.g_error | |
if Settings.non_const: | |
self.total_error = self.total_error + self.spike_error | |
self.total_error = self.total_error + self.mse + self.ivp_lambda * self.ivp_error | |
sum_of_squares = tf.reduce_sum([tf.reduce_sum(tf.square(reg_w)) for reg_w in self.non_sm_weights]) | |
sum_of_squares_minus_max = sum_of_squares - tf.reduce_sum([tf.reduce_max(tf.square(reg_w)) | |
for reg_w in self.non_sm_weights]) | |
self.regularization_penalty = tf.reduce_mean([tf.reduce_sum(tf.abs(reg_w)) | |
for reg_w in self.non_sm_weights]) | |
# self.regularization_penalty += sum_of_squares_minus_max | |
self.loss_function1 = self.mse_not_flat + self.g_error_not_flat + self.spike_error + self.ivp_lambda * self.ivp_error_not_flat | |
self.loss_function2 = self.mse + self.g_error + self.spike_error + self.ivp_lambda * self.ivp_error | |
self.loss_function2 += self.regularization_penalty * 0.05 # 0.1 | |
self.loss_function3 = self.mse + self.g_error + self.spike_error + self.ivp_lambda * self.ivp_error | |
self.loss_function3 += self.regularization_penalty * 0.9 # 1.0 | |
self.opt = tf.compat.v1.train.AdamOptimizer(self.learn_rate) | |
self.train_step_1 = self.opt.minimize(self.loss_function1) | |
self.train_step_2 = self.opt.minimize(self.loss_function2) | |
self.train_step_3 = self.opt.minimize(self.loss_function3) | |
self.init = tf.compat.v1.global_variables_initializer() | |
self.sess = tf.compat.v1.Session() | |
self.sess.run(self.init) | |
self.best_accuracy_so_far = 9999999 | |
self.best_formula_so_far = "" | |
self.best_iter = 0 | |
def setup_err_values(self, non_const=False): | |
if self.mode == "de": | |
self.g_error = tf.reduce_mean(tf.math.square(self.implicit_g)) | |
self.g_not_flat = tf.reduce_mean(tf.math.square(self.implicit_g_not_flat)) | |
else: | |
self.g_error = tf.Variable(0.0) | |
self.g_error_not_flat = tf.Variable(0.0) | |
self.mse = tf.reduce_mean(tf.math.squared_difference(self.y_hat, self.data_y)) | |
self.mse_not_flat = tf.reduce_mean(tf.math.squared_difference(self.y_hat_not_flat, self.data_y)) | |
if non_const: | |
self.spike_error = tf.reduce_mean(spike(self.y_hat_p1)) | |
# tf.reduce_sum(spike(self.y_hat_p1) + spike(self.y_hat_p2) + spike(self.y_hat_p3)) | |
def setup_ivp_values(self, fixed_x_ph, fixed_y_ph): | |
y_hat_err_not_flat = tf.Variable(0.0) | |
y_hat_err = tf.Variable(0.0) | |
if fixed_x_ph is not None: | |
y_hat_err_not_flat = tf.reduce_mean(tf.math.squared_difference(fixed_y_ph, | |
self.eval_formula(fixed_x_ph, flat=False))) | |
y_hat_err = tf.reduce_mean(tf.math.squared_difference(fixed_y_ph, self.eval_formula(fixed_x_ph))) | |
eye = tf.eye(self.n_input_variables) | |
u1 = eye[:, 0] | |
if Settings.fixed_x_p1 is not None and len(Settings.fixed_x_p1) > 0: | |
fixed_x_p1 = tf.constant(np.reshape(Settings.fixed_x_p1, | |
[-1, Settings.num_dims_per_feature, Settings.num_features]), | |
dtype="float32") | |
fixed_y_p1 = tf.constant(np.reshape(Settings.fixed_y_p1, | |
[-1, Settings.n_dims_in_output, 1]), | |
dtype="float32") | |
y_p1_fixed_hat = self.eval_formula(fixed_x_p1 + d_eps * u1 / 2) | |
y_p1_fixed_hat -= self.eval_formula(fixed_x_p1 - d_eps * u1 / 2) | |
y_p1_fixed_hat = y_p1_fixed_hat / d_eps | |
y_hat_err_not_flat += tf.reduce_mean(tf.math.squared_difference(fixed_y_p1, y_p1_fixed_hat)) | |
y_hat_err += tf.reduce_mean(tf.math.squared_difference(fixed_y_p1, y_p1_fixed_hat)) | |
if self.n_input_variables > 1: | |
u2 = eye[:, 1] | |
if Settings.fixed_x_p2 is not None and len(Settings.fixed_x_p2) > 0: | |
fixed_x_p2 = tf.constant(np.reshape(Settings.fixed_x_p2, | |
[-1, Settings.num_dims_per_feature, Settings.num_features]), | |
dtype="float32") | |
fixed_y_p2 = tf.constant(np.reshape(Settings.fixed_y_p2, | |
[-1, Settings.n_dims_in_output, 1]), | |
dtype="float32") | |
y_p2_fixed_hat = self.eval_formula(fixed_x_p2 + d_eps * u2 / 2) | |
y_p2_fixed_hat -= self.eval_formula(fixed_x_p2 - d_eps * u2 / 2) | |
y_p2_fixed_hat = y_p2_fixed_hat / d_eps | |
y_hat_err_not_flat += tf.reduce_mean(tf.math.squared_difference(fixed_y_p2, y_p2_fixed_hat)) | |
y_hat_err += tf.reduce_mean(tf.math.squared_difference(fixed_y_p2, y_p2_fixed_hat)) | |
return y_hat_err_not_flat, y_hat_err | |
def get_formula_string(self, digits=None): | |
eval_dict = {self.init_op_weights: self.init_op_weight_matrix, | |
self.init_var_weights: self.init_var_weight_matrix} | |
inputs = [] | |
for i in range(len(self.W_matrices[0])): | |
w_matrix = self.W_matrices[0][i].eval(session=self.sess) | |
b_vector = self.b_matrices[0][i].eval(session=self.sess) | |
if self.sm_leaf_layer: | |
sm_vector = self.leaf_sm_weights[i].eval(session=self.sess) | |
print("sm_vector: {}".format(sm_vector)) | |
new_answer = [collect_op_inputs_str(sm_vector, np.zeros([1, 1]), self.var_names)] | |
new_input = collect_op_inputs_str(w_matrix, b_vector, new_answer) | |
else: | |
new_input = collect_op_inputs_str(w_matrix, b_vector, self.var_names) | |
inputs.extend([new_input]) | |
for layer_i in range(1, len(self.W_matrices)): | |
sm_applied_this_layer = self.flattened_W_matrices[layer_i] | |
w_this_layer = self.W_matrices[layer_i] | |
b_this_layer = self.b_matrices[layer_i] | |
new_inputs = [] | |
for iii in range(0, len(w_this_layer)): | |
new_inputs.extend([operation_to_str_best(w_this_layer[iii].eval(self.sess), | |
b_this_layer[iii].eval(self.sess), | |
sm_applied_this_layer[iii].eval(session=self.sess, | |
feed_dict=eval_dict), | |
inputs[2 * iii], | |
inputs[2 * iii + 1], | |
self.function_set, | |
unary_both=self.use_both_for_unary)]) | |
inputs = new_inputs | |
if isinstance(inputs[0], list): | |
return inputs[0][0] | |
return inputs[0] | |
def get_minimal_formula_string(self): | |
eval_dict = {self.init_op_weights: self.init_op_weight_matrix, | |
self.init_var_weights: self.init_var_weight_matrix} | |
inputs = [] | |
for i in range(len(self.W_matrices[0])): | |
# w_matrix = self.W_matrices[0][i].eval(self.sess) | |
# inputs.extend([collect_minimal_op_inputs_str(w_matrix, self.var_names)]) | |
inputs.append("A{}".format(i+1)) | |
for layer_i in range(1, len(self.W_matrices)): | |
sm_applied_this_layer = self.flattened_W_matrices[layer_i] | |
w_this_layer = self.W_matrices[layer_i] | |
new_inputs = [] | |
for iii in range(0, len(sm_applied_this_layer)): | |
new_inputs.extend([operation_to_str_best(w_this_layer[iii].eval(self.sess), | |
None, | |
sm_applied_this_layer[iii].eval(session=self.sess, | |
feed_dict=eval_dict), | |
inputs[2 * iii], | |
inputs[2 * iii + 1], | |
self.function_set, | |
unary_both=self.use_both_for_unary, | |
minimal=True)]) | |
inputs = new_inputs | |
if isinstance(inputs[0], list): | |
return inputs[0][0] | |
return inputs[0] | |
def eval_formula(self, input_x, flat=True): | |
inputs = [] | |
for i in range(len(self.W_matrices[0])): | |
w_matrix = self.W_matrices[0][i] | |
b_vector = self.b_matrices[0][i] | |
if self.sm_leaf_layer: | |
post_sm_weights = self.leaf_sm_weights[i] | |
sm_result = tf.matmul(input_x, post_sm_weights) | |
result = tf.multiply(sm_result, w_matrix) + b_vector | |
else: | |
result = tf.matmul(input_x, w_matrix) + b_vector | |
inputs.extend([result]) | |
for layer_i in range(1, len(self.W_matrices)): | |
sm_flat_this_layer = self.flattened_W_matrices[layer_i] | |
sm_applied_this_layer = self.sm_applied_W_matrices[layer_i] | |
w_this_layer = self.W_matrices[layer_i] | |
b_this_layer = self.b_matrices[layer_i] | |
new_inputs = [] | |
for iii in range(0, len(w_this_layer)): | |
post_sm_weights = sm_applied_this_layer[iii] | |
flat_sm_weights = sm_flat_this_layer[iii] | |
op_result = operate_on_tensors(inputs[2 * iii], | |
inputs[2 * iii + 1], | |
self.function_set, | |
use_both_for_unary=self.use_both_for_unary) | |
if flat: | |
# result, flat_sm_weights = flattened_sm_result(op_result, | |
# post_sm_weights, | |
# w_this_layer[iii], | |
# b_this_layer[iii]) | |
sm_result = tf.matmul(op_result, flat_sm_weights) | |
else: | |
sm_result = tf.matmul(op_result, post_sm_weights) | |
result = tf.multiply(sm_result, w_this_layer[iii]) + b_this_layer[iii] | |
new_inputs.extend([result]) | |
inputs = new_inputs | |
if self.mode == "lr": | |
return spike(inputs[0]) | |
return inputs[0] | |
def setup_derivative_values(self): | |
d2_eps = 1e-2 | |
eye = tf.eye(self.n_input_variables) | |
u1 = eye[:, 0] | |
if self.n_input_variables > 1: | |
u2 = eye[:, 1] | |
if self.n_input_variables > 2: | |
u3 = eye[:, 2] | |
# u = [] | |
# for i in range(self.n_input_variables): | |
# u_i = eye[:, i] | |
# dy / dx1 | |
self.y_hat_p1 = self.eval_formula(self.data_x + d_eps * u1 / 2) | |
self.y_hat_p1 -= self.eval_formula(self.data_x - d_eps * u1 / 2) | |
self.y_hat_p1 = self.y_hat_p1 / d_eps | |
# d^2y / dx1^2 | |
self.y_hat_pp1 = self.eval_formula(self.data_x + d2_eps * u1) | |
self.y_hat_pp1 -= (2 * self.eval_formula(self.data_x)) | |
self.y_hat_pp1 += self.eval_formula(self.data_x - d2_eps * u1) | |
self.y_hat_pp1 /= (d2_eps ** 2) | |
if self.n_input_variables > 1: | |
# dy / dx2 | |
self.y_hat_p2 = self.eval_formula(self.data_x + d_eps * u2 / 2) | |
self.y_hat_p2 -= self.eval_formula(self.data_x - d_eps * u2 / 2) | |
self.y_hat_p2 = self.y_hat_p2 / d_eps | |
# d^2y / dx2^2 | |
self.y_hat_pp2 = self.eval_formula(self.data_x + d2_eps * u2) | |
self.y_hat_pp2 -= (2 * self.eval_formula(self.data_x)) | |
self.y_hat_pp2 += self.eval_formula(self.data_x - d2_eps * u2) | |
self.y_hat_pp2 /= (d2_eps ** 2) | |
# d^2y / dx1 dx2 | |
self.y_hat_pp12 = self.eval_formula(self.data_x + d2_eps * (u1 + u2)) | |
self.y_hat_pp12 -= self.eval_formula(self.data_x - d2_eps * (u1 - u2)) | |
self.y_hat_pp12 -= self.eval_formula(self.data_x - d2_eps * (u2 - u1)) | |
self.y_hat_pp12 -= self.eval_formula(self.data_x + d2_eps * (-u1 - u2)) | |
self.y_hat_pp12 /= (4 * d2_eps ** 2) | |
else: | |
self.y_hat_p2 = None | |
self.y_hat_pp2 = None | |
self.y_hat_pp12 = None | |
if self.n_input_variables > 2: | |
# dy / dx2 | |
self.y_hat_p3 = self.eval_formula(self.data_x + d_eps * u3 / 2) | |
self.y_hat_p3 -= self.eval_formula(self.data_x - d_eps * u3 / 2) | |
self.y_hat_p3 = self.y_hat_p3 / d_eps | |
else: | |
self.y_hat_p3 = None | |
self.y_hat_p_not_flat = self.eval_formula(self.data_x + d_eps * u1 / 2, flat=False) | |
self.y_hat_p_not_flat -= self.eval_formula(self.data_x - d_eps * u1 / 2, flat=False) | |
self.y_hat_p_not_flat = self.y_hat_p_not_flat / d_eps | |
self.y_hat_pp_not_flat = self.eval_formula(self.data_x + d_eps * u1, flat=False) | |
self.y_hat_pp_not_flat -= 2 * self.eval_formula(self.data_x, flat=False) | |
self.y_hat_pp_not_flat += self.eval_formula(self.data_x - d_eps * u1, flat=False) | |
self.y_hat_pp_not_flat = self.y_hat_pp_not_flat / d_eps ** 2 | |
self.implicit_g = our_tanh(implicit_function(self.data_x, self.y_hat, | |
[self.y_hat_p1, self.y_hat_p2, self.y_hat_p3], | |
[self.y_hat_pp1, self.y_hat_pp2, self.y_hat_pp12])) | |
self.implicit_g_not_flat = our_tanh(implicit_function(self.data_x, self.y_hat_not_flat, | |
[self.y_hat_p_not_flat, self.y_hat_p2, self.y_hat_p3], | |
[self.y_hat_pp_not_flat, self.y_hat_pp2, | |
self.y_hat_pp12])) | |
""" Like reset, but does not erase records of training history. | |
It only restarts training from a new random initialization. """ | |
def soft_reset(self): | |
self.init = tf.compat.v1.global_variables_initializer() | |
self.saver = tf.compat.v1.train.Saver() | |
self.sess = tf.compat.v1.Session() | |
self.sess.run(self.init) | |
self.best_accuracy_so_far = 9999999 | |
self.best_formula_so_far = "" | |
self.best_iter = 0 | |
# Not needed, but don't touch | |
def set_init_op_weight_matrix(self, init_op_weight_matrix): | |
self.init_op_weight_matrix = init_op_weight_matrix | |
# Not needed, but don't touch | |
def set_init_var_weight_matrix(self, init_var_weight_matrix): | |
self.init_var_weight_matrix = init_var_weight_matrix | |
# Not 100% tested | |
def make_y_multi_safe(self, old_y): | |
if isinstance(old_y, list): | |
new_y = np.array(old_y) | |
new_y.reshape([-1, self.n_dims_in_output, 1]) | |
else: | |
new_y = old_y.copy() | |
if len(new_y.shape) == 1: | |
assert (self.n_dims_in_output == 1) | |
new_y = [[[y_value] for _ in range(self.n_dims_per_variable)] for y_value in new_y] | |
new_y = np.array(new_y) | |
elif len(new_y.shape) == 2: | |
assert (self.n_dims_in_output == 1) | |
new_y = [[y_value for _ in range(self.n_dims_per_variable)] for y_value in new_y] | |
new_y = np.array(new_y) | |
elif new_y.shape[1] < self.n_dims_per_variable: | |
assert (self.n_dims_in_output == 1) | |
new_y = [[y_value[0] for _ in range(self.n_dims_per_variable)] for y_value in new_y] | |
new_y = np.array(new_y) | |
return new_y | |
def get_simple_formula(self, digits=None): | |
full_formula = self.get_formula_string() | |
return DataUtils.simplify_formula(full_formula, digits=digits) | |
# todo: want total or mean square error? | |
def test(self, x, y=None): | |
test_dict = {self.data_x: x, | |
self.init_op_weights: self.init_op_weight_matrix, | |
self.init_var_weights: self.init_var_weight_matrix} | |
if y is not None: | |
test_dict[self.data_y] = y | |
return self.sess.run(self.total_error, feed_dict=test_dict) | |
# Runs train process a number of times on a limited number of train steps. | |
# Returns the best formula found during that experience. | |
# If init_ops is given, it will start off with ops initialized accordingly. | |
# If it is None, then ops will be initialized randomly. | |
# If it is 0, then ops will have no initialization. | |
# Same with init_vars. | |
def train(self, x, y=None, init_op_weight_matrix=None, init_var_weight_matrix=None, | |
test_x=None, test_y=None): | |
n_rounds = Settings.num_train_steps_in_repeat_mode | |
batch_size = min(Settings.max_training_batch_size, int(len(x) / 2)) | |
train_set_size = len(x) | |
train_x = np.array(x, dtype=np.float32) | |
if self.mode in ["de"]: | |
y = [0 for _ in range(x.shape[0])] | |
if test_x is not None: | |
test_y = [0 for _ in range(test_x.shape[0])] | |
# elif self.mode == ["sr", "lr"]: | |
# y = DataUtils.true_function(x) | |
# if test_x is not None: | |
# test_y = DataUtils.true_function(test_x) | |
train_y = self.make_y_multi_safe(y) | |
if test_y is not None: | |
test_y = self.make_y_multi_safe(test_y) | |
if init_op_weight_matrix is not None: | |
self.set_init_op_weight_matrix(init_op_weight_matrix) | |
if init_var_weight_matrix is not None: | |
self.set_init_var_weight_matrix(init_var_weight_matrix) | |
target_y = self.y_hat | |
show_gt = False | |
if Settings.show_output: | |
print("Starting actual training!") | |
start_time = time.time() | |
old_time = time.time() | |
time_spent_training = 0 | |
time_getting_formulas = 0 | |
time_getting_scores = 0 | |
time_plotting = 0 | |
other_time = 0 | |
for i in range(1, n_rounds + 1): | |
mini_start_time = time.time() | |
train_batch_x, train_batch_y, valid_batch_x, valid_batch_y = DataUtils.get_samples(train_set_size, | |
batch_size, | |
train_x, train_y) | |
other_time += time.time() - mini_start_time | |
training_dict = {self.data_x: train_batch_x, | |
self.data_y: train_batch_y, | |
self.init_op_weights: self.init_op_weight_matrix, | |
self.init_var_weights: self.init_var_weight_matrix} | |
valid_batch_dict = {self.data_x: valid_batch_x, | |
self.data_y: valid_batch_y, | |
self.init_op_weights: self.init_op_weight_matrix, | |
self.init_var_weights: self.init_var_weight_matrix} | |
test_dict = {self.data_x: test_x, self.data_y: test_y, | |
self.init_op_weights: self.init_op_weight_matrix, | |
self.init_var_weights: self.init_var_weight_matrix} | |
""" Actual training happens here """ | |
mini_start_time = time.time() | |
if i < n_rounds * Settings.t1_fraction: | |
self.sess.run(self.train_step_1, feed_dict=training_dict) | |
elif i < n_rounds * Settings.t2_fraction: | |
self.sess.run(self.train_step_2, feed_dict=training_dict) | |
else: | |
self.sess.run(self.train_step_3, feed_dict=training_dict) | |
time_spent_training += (time.time() - mini_start_time) | |
""" Save formulas, accuracy, etc. """ | |
if (i % Settings.plot_frequency == 0 or i % Settings.output_freq == 0) and Settings.keep_logs: | |
# Save current formula to make list of all formulas seen | |
current_formula = "(Formula not saved)" | |
if Settings.save_all_formulas: | |
mini_start_time = time.time() | |
current_formula = self.get_simple_formula(digits=4) | |
time_getting_formulas += (time.time() - mini_start_time) | |
if current_formula not in self.seen_eqns: | |
self.seen_eqns.append(current_formula) | |
# Get results from validation set. | |
mini_start_time = time.time() | |
[valid_acc, y_pr_v] = self.sess.run([self.total_error, target_y], feed_dict=valid_batch_dict) | |
# Get results from test set. | |
if test_x is not None: | |
[test_acc, y_pr_test] = self.sess.run([self.total_error, target_y], feed_dict=test_dict) | |
y_gold_v = valid_batch_y.reshape([-1, self.n_dims_per_variable, 1])[0].tolist() | |
y_hat_v = y_pr_v.reshape([-1, self.n_dims_per_variable, 1]).tolist() | |
time_getting_scores += (time.time() - mini_start_time) | |
mini_start_time = time.time() | |
[valid_acc, g_pr_v] = self.sess.run([self.total_error, self.implicit_g], feed_dict=valid_batch_dict) | |
g_hat_val = g_pr_v.reshape([-1, self.n_dims_per_variable, 1]).tolist() | |
g_hat_1d_val = [y_value[0][0] for y_value in g_hat_val] | |
g_tru_1d_val = [y_value[0][0] for y_value in valid_batch_y] | |
g_hat_1d_test = None | |
g_tru_1d_test = None | |
[yp_v, ypp_v] = self.sess.run([self.y_hat_p1, self.y_hat_pp1], feed_dict=valid_batch_dict) | |
y_p1_v = yp_v.reshape([-1, self.n_dims_per_variable, 1]).tolist() | |
y_pp1_v = ypp_v.reshape([-1, self.n_dims_per_variable, 1]).tolist() | |
[yp2_v, ypp2_v] = self.sess.run([self.y_hat_p2, self.y_hat_pp2], feed_dict=valid_batch_dict) | |
y_p2_v = yp2_v.reshape([-1, self.n_dims_per_variable, 1]).tolist() | |
y_pp2_v = ypp2_v.reshape([-1, self.n_dims_per_variable, 1]).tolist() | |
time_getting_scores += (time.time() - mini_start_time) | |
if test_x is not None: | |
mini_start_time = time.time() | |
[test_acc, g_pr_test] = self.sess.run([self.total_error, self.implicit_g], feed_dict=test_dict) | |
g_hat_test = g_pr_test.reshape([-1, self.n_dims_per_variable, 1]).tolist() | |
g_hat_1d_test = [g_value[0][0] for g_value in g_hat_test] | |
g_tru_1d_test = [g_value[0][0] for g_value in test_y] | |
time_getting_scores += (time.time() - mini_start_time) | |
# Update best formula seen based on validation error. | |
if Settings.save_all_formulas: | |
if valid_acc < self.best_accuracy_so_far: | |
self.best_accuracy_so_far = valid_acc | |
self.best_formula_so_far = current_formula | |
self.best_iter = i | |
# We only can make plots using y values if y is 1d. | |
if self.n_dims_in_output == 1: | |
mini_start_time = time.time() | |
y_hat_1d_val = [y_value[0][0] for y_value in y_hat_v] | |
y_tru_1d_val = [y_value[0][0] for y_value in valid_batch_y] | |
y_hat_1d_test = None | |
y_tru_1d_test = None | |
if test_x is not None: | |
y_hat_test = y_pr_test.reshape([-1, self.n_dims_per_variable, 1]).tolist() | |
y_hat_1d_test = [y_value[0][0] for y_value in y_hat_test] | |
y_tru_1d_test = [y_value[0][0] for y_value in test_y] | |
other_time += (time.time() - mini_start_time) | |
if self.mode in ["sr", "lr"]: | |
# Plot predicted y value against actual y value. | |
mini_start_time = time.time() | |
DataUtils.plot_predicted_vs_actual(y_hat_1d_val, y_tru_1d_val, | |
y_hat_1d_test, y_tru_1d_test, | |
self.name, | |
set_name="Iteration {}".format(i)) | |
time_plotting += (time.time() - mini_start_time) | |
# DataUtils.plot_2d_curve(x_1d_val, y_tru_1d_val, y_hat_1d_val, None, None, None) | |
# If x is also 1d, we can plot the function itself. | |
if self.n_input_variables == 1: | |
# Plot the actual function we learned. | |
mini_start_time = time.time() | |
x_1d_val = [x_value[0][0] for x_value in valid_batch_x] | |
x_1d_test = None | |
if test_x is not None: | |
x_1d_test = [x_value[0][0] for x_value in test_x] | |
other_time += (time.time() - mini_start_time) | |
mini_start_time = time.time() | |
DataUtils.plot_1d_curve(x_1d_val, y_tru_1d_val, y_hat_1d_val, | |
x_1d_test, y_tru_1d_test, y_hat_1d_test, | |
file_suffix="_y", | |
title="Learned function: Iteration {}".format(i), | |
show_ground_truth=show_gt) | |
time_plotting += (time.time() - mini_start_time) | |
# Plot the g output values, in implicit case | |
if test_x is not None: | |
mini_start_time = time.time() | |
DataUtils.plot_1d_curve(x_1d_val, g_tru_1d_val, g_hat_1d_val, | |
x_1d_test, g_tru_1d_test, g_hat_1d_test, | |
file_suffix="_g", | |
title="Output of g: Iteration {}".format(i)) | |
time_plotting += time.time() - mini_start_time | |
elif self.n_input_variables == 2: | |
# Plot the actual function we learned. | |
mini_start_time = time.time() | |
plot2d_x1 = np.arange(Settings.test_scope[0], Settings.test_scope[1], 0.1) | |
plot2d_x2 = np.arange(Settings.test_scope[0], Settings.test_scope[1], 0.1) | |
plot2d_x1_m, plot2d_x2_m = np.meshgrid(plot2d_x1, plot2d_x2) | |
plot2d_x1 = np.reshape(plot2d_x1_m, [-1, 1, 1]) | |
plot2d_x2 = np.reshape(plot2d_x2_m, [-1, 1, 1]) | |
plot2d_x1x2 = np.concatenate([plot2d_x1, plot2d_x2], axis=-1) | |
[plot2d_y, plot2d_g] = self.sess.run([target_y, self.implicit_g], | |
feed_dict={self.data_x: plot2d_x1x2, | |
self.init_op_weights: self.init_op_weight_matrix, | |
self.init_var_weights: self.init_var_weight_matrix}) | |
if self.mode == "sr": | |
plot2d_g = DataUtils.true_function(plot2d_x1x2) | |
plot2d_y_m = np.reshape(plot2d_y, plot2d_x1_m.shape) | |
plot2d_g_m = np.reshape(plot2d_g, plot2d_x1_m.shape) | |
DataUtils.plot_2d_curve(plot2d_x1_m, plot2d_x2_m, plot2d_y_m, plot2d_g_m) | |
time_plotting += (time.time() - mini_start_time) | |
if Settings.keep_logs: | |
mini_start_time = time.time() | |
self.train_accuracy_log.append(self.test(train_x, train_y)) | |
# self.valid_accuracy_log.append(valid_acc) | |
self.valid_accuracy_log.append(self.test(valid_batch_x, valid_batch_y)) | |
# self.log_iters.append(i) | |
if len(self.log_iters) == 0: | |
self.log_iters.append(i) | |
else: | |
self.log_iters.append(self.log_iters[-1] + Settings.plot_frequency) | |
accuracies_to_plot = [self.train_accuracy_log, | |
self.valid_accuracy_log] | |
accuracy_type_names = ["Training Error", "Validation Error"] | |
if test_x is not None: | |
self.test_accuracy_log.append(test_acc) | |
accuracies_to_plot.append(self.test_accuracy_log) | |
accuracy_type_names.append("Test Error") | |
time_getting_scores += (time.time() - mini_start_time) | |
mini_start_time = time.time() | |
DataUtils.plot_accuracy_over_time(self.log_iters, accuracies_to_plot, accuracy_type_names) | |
time_plotting += time.time() - mini_start_time | |
if i % Settings.output_freq == 0 and Settings.show_output: | |
if not Settings.keep_logs: | |
# Get results from validation set. | |
mini_start_time = time.time() | |
[valid_acc, y_pr_v] = self.sess.run([self.total_error, target_y], feed_dict=valid_batch_dict) | |
# Get results from test set. | |
if test_x is not None: | |
[test_acc, y_pr_test] = self.sess.run([self.total_error, target_y], feed_dict=test_dict) | |
y_hat_v = y_pr_v.reshape([-1, self.n_dims_per_variable, 1]).tolist() | |
g_pr_v = self.sess.run(self.implicit_g, feed_dict=valid_batch_dict) | |
g_hat_val = g_pr_v.reshape([-1, self.n_dims_per_variable, 1]).tolist() | |
[yp1_v, ypp1_v] = self.sess.run([self.y_hat_p1, self.y_hat_pp1], | |
feed_dict={self.data_x: valid_batch_x, | |
self.init_op_weights: self.init_op_weight_matrix, | |
self.init_var_weights: self.init_var_weight_matrix}) | |
y_p1_v = yp1_v.reshape([-1, self.n_dims_per_variable, 1]).tolist() | |
y_pp1_v = ypp1_v.reshape([-1, self.n_dims_per_variable, 1]).tolist() | |
[yp2_v, ypp2_v] = self.sess.run([self.y_hat_p2, self.y_hat_pp2], | |
feed_dict={self.data_x: valid_batch_x, | |
self.init_op_weights: self.init_op_weight_matrix, | |
self.init_var_weights: self.init_var_weight_matrix}) | |
y_p2_v = yp2_v.reshape([-1, self.n_dims_per_variable, 1]).tolist() | |
y_pp2_v = ypp2_v.reshape([-1, self.n_dims_per_variable, 1]).tolist() | |
time_getting_scores += (time.time() - mini_start_time) | |
print() | |
print('Iteration {}:'.format(i)) | |
mini_start_time = time.time() | |
formula_as_string = self.get_formula_string(digits=4) | |
dotdot = "" | |
if len(formula_as_string) > Settings.max_formula_output_length: | |
dotdot = " ..." | |
print("# Current Model: {}{}".format(formula_as_string[:Settings.max_formula_output_length], dotdot)) | |
simple_formula = self.get_simple_formula(digits=4) | |
dotdot = "" | |
if len(simple_formula) > Settings.max_formula_output_length: | |
dotdot = " ..." | |
print("# AKA: {}{}".format(simple_formula[:Settings.max_formula_output_length], dotdot)) | |
minimal_eqn = self.get_minimal_formula_string() | |
print("# Simple: {}".format(minimal_eqn)) | |
if minimal_eqn not in self.seen_minimal_eqns: | |
self.seen_minimal_eqns.append(minimal_eqn) | |
if "**" in simple_formula: | |
print("(Has a power)") | |
time_getting_formulas += (time.time() - mini_start_time) | |
print( | |
" Length: {} ({})".format(len(formula_as_string), len("{}".format(simple_formula)))) | |
print(" Train batch size: {}".format(train_batch_x.shape)) | |
print(" Valid batch size: {}".format(valid_batch_x.shape)) | |
print(" # Mnml eqns seen: {}".format(len(self.seen_minimal_eqns))) | |
iters_per_min = Settings.output_freq * 60 / (time.time() - old_time) | |
print(' Iters per minute: {:.2f}'.format(iters_per_min)) | |
total_time = time.time() - start_time | |
print(' Time so far: {:.2f} minutes'.format(total_time / 60.0)) | |
print(' ({:.1%} training, {:.1%} scoring, {:.1%} formulas)'.format( | |
time_spent_training / total_time, | |
time_getting_scores / total_time, | |
time_getting_formulas / total_time)) | |
print(' ({:.1%} plotting, {:.1%} other)'.format(time_plotting / total_time, | |
other_time / total_time)) | |
print(' Est. time left: {:.2f} minutes'.format((n_rounds - i) / iters_per_min)) | |
print('Error values:') | |
mini_start_time = time.time() | |
curr_errs = self.sess.run([self.g_error, self.ivp_error, | |
self.spike_error, self.total_error], | |
feed_dict=valid_batch_dict) | |
if self.mode == "de": | |
print(' g-err Valid: {}'.format(curr_errs[0])) | |
print(' IVP Valid: {}'.format(curr_errs[1])) | |
if Settings.non_const: | |
print(' Spike err: {}'.format(curr_errs[2])) | |
print(' Tot. Val.: {}'.format(curr_errs[3])) | |
if np.abs(curr_errs[3] - (curr_errs[0] + self.ivp_lambda * curr_errs[1] + curr_errs[2])) > 1e-4: | |
print("Something is wrong.") | |
# Hope we don't get nans, but break out if we do. | |
nans = np.isnan(curr_errs[0]) | |
if nans: | |
break | |
if test_x is not None: | |
print(' Tot. Test: {}'.format(test_acc)) | |
time_getting_scores += (time.time() - mini_start_time) | |
print('Performance on sample validation data:') | |
print_str = "" | |
for feature_i in range(Settings.num_features): | |
print_str += "{}\t\t".format(self.var_names[feature_i]) | |
print_str += "|\t" | |
# print_str += "y_tru\t" | |
if self.mode == "de": | |
print_str += "g_hat\t" | |
elif self.mode == "sr" or self.mode == "lr": | |
print_str += "y_tru\t" | |
print_str += "y_hat\t" | |
if self.mode == "de": | |
print_str += "y_p1\t" | |
print_str += "y_pp1\t" | |
print_str += "y_p2\t" | |
print_str += "y_pp2\t" | |
print(print_str) | |
line_len = len(print_str) + 16 | |
print("=" * line_len) | |
var_range = range(self.n_input_variables) | |
num_pts_to_show = 5 | |
if self.mode in ["sr", "lr"]: | |
y_tru_v = valid_batch_y[:num_pts_to_show, :, :] | |
# y_tru_v = DataUtils.predict_from_formula(Settings.true_eqn, valid_batch_x[:num_pts_to_show, :, :]) | |
for datapoint_i in range(min(valid_batch_x.shape[0], num_pts_to_show)): | |
comps_to_show = range(self.n_dims_per_variable) | |
if self.n_dims_per_variable > 9: | |
comps_to_show = [0, 1, 2, -1] | |
for component_j in comps_to_show: | |
if component_j == -1: | |
print(" ... ") | |
print_str = "" | |
for var_k in var_range: | |
x_ijk = valid_batch_x[datapoint_i, component_j, var_k] | |
print_str += "{:.3f}\t".format(x_ijk) | |
print_str += "|\t" | |
# print_str += "{:.3f}\t".format(valid_batch_y[datapoint_i, component_j, 0]) | |
if self.mode == "de": | |
print_str += "{:.3f}\t".format(g_hat_val[datapoint_i][component_j][0]) | |
elif self.mode in ["sr", "lr"]: | |
print_str += "{:.3f}\t".format(y_tru_v[datapoint_i][0][0]) | |
print_str += "{:.3f}\t".format(y_hat_v[datapoint_i][component_j][0]) | |
if self.mode == "de": | |
print_str += "{:.3f}\t".format(y_p1_v[datapoint_i][component_j][0]) | |
print_str += "{:.3f}\t".format(y_pp1_v[datapoint_i][component_j][0]) | |
print_str += "{:.3f}\t".format(y_p2_v[datapoint_i][component_j][0]) | |
print_str += "{:.3f}\t".format(y_pp2_v[datapoint_i][component_j][0]) | |
print(print_str) | |
print("-" * line_len) | |
print() | |
old_time = time.time() | |
if Settings.show_output: | |
print('Finished training at {:%H:%M:%S}.\n'.format(datetime.datetime.now())) | |
end_time = time.time() | |
total_time = end_time - start_time | |
print('Took {:.2f} seconds to finish.'.format(total_time)) | |
print(' ({:.1%} training, {:.1%} scoring, {:.1%} formulas)'.format(time_spent_training / total_time, | |
time_getting_scores / total_time, | |
time_getting_formulas / total_time)) | |
print(' ({:.1%} plotting, {:.1%} other)'.format(time_plotting / total_time, | |
other_time / total_time)) | |
print('Average of {:.2f} training steps per minute.'.format( | |
60 * n_rounds / total_time)) | |
print('Average of {:.2f} minutes per 10000 training steps.'.format( | |
10000 * total_time / (60 * n_rounds))) | |
print() | |
if Settings.save_all_formulas: | |
print("Best formula had accuracy {:.3f} and was seen at iteration {}:".format( | |
self.best_accuracy_so_far, | |
self.best_iter)) | |
print("{}".format(self.best_formula_so_far)[:1000]) | |
else: | |
final_acc = self.sess.run(self.total_error, feed_dict={self.data_x: train_x, | |
self.data_y: train_y, | |
self.init_op_weights: self.init_op_weight_matrix, | |
self.init_var_weights: self.init_var_weight_matrix}) | |
print("Final formula had accuracy {:.3f}:".format(final_acc)) | |
print("{}".format(self.get_simple_formula(digits=4))[:1000]) | |
print() | |
return self.get_simple_formula(digits=4) | |
def repeat_train(self, x, y=None, | |
num_repeats=Settings.num_train_repeat_processes, | |
test_x=None, test_y=None, | |
verbose=True): | |
# we still reduce train set size if only 1 repeat | |
train_set_size = int(len(x) * Settings.quick_train_fraction + 0.1) | |
x = np.array(x) | |
if y is not None: | |
y = np.array(y) | |
# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ | |
y = np.zeros((x.shape[0], 1, 1)) | |
sample = np.random.choice(range(x.shape[0]), size=train_set_size, replace=False) | |
train_x = x[sample][:] | |
if y is not None: | |
train_y = y[sample] | |
out_sample = [aaa for aaa in range(x.shape[0]) if aaa not in sample] | |
valid_x = x[out_sample][:] | |
if y is not None: | |
valid_y = y[out_sample] | |
valid_y = self.make_y_multi_safe(valid_y) | |
best_formula = "" | |
best_iter = 0 | |
best_validation = 999999 | |
best_err = 999999 | |
old_time = time.time() | |
if verbose: | |
print("Beginning {} repeat sessions of {} iterations each.".format(num_repeats, | |
Settings.num_train_steps_in_repeat_mode)) | |
print() | |
start_time = time.time() | |
old_time = start_time | |
for train_iter in range(1, 1 + num_repeats): | |
if verbose: | |
print("Repeated train session {} of {}.".format(train_iter, num_repeats)) | |
self.soft_reset() | |
self.set_init_op_weight_matrix(choices_to_init_weight_matrix(Settings.initialize_ops, | |
self.function_set)) | |
self.set_init_var_weight_matrix(choices_to_init_weight_matrix(np.zeros([2 ** self.n_tree_layers]), | |
self.var_names)) | |
self.train(train_x, train_y, test_x=test_x, test_y=test_y) | |
valid_err = self.test(valid_x, valid_y) | |
current_time = time.time() | |
if verbose: | |
# print(self.get_simple_formula()) | |
print("Attained validation error: {:.5f}".format(valid_err)) | |
if valid_err < best_validation: | |
best_validation = valid_err | |
best_formula = self.get_simple_formula() | |
best_iter = train_iter | |
if test_x is not None: | |
safe_test_y = self.make_y_multi_safe(test_y) | |
best_err = self.test(test_x, safe_test_y) | |
else: | |
best_err = valid_err | |
if verbose: | |
print(">>> New best model!") | |
print(best_formula) | |
if verbose: | |
iters_per_minute = 60.0 / (current_time - old_time) | |
print("Took {:.2f} minutes.".format((current_time - old_time) / 60)) | |
print("Est. {:.2f} minutes remaining.".format((num_repeats - train_iter) / iters_per_minute)) | |
print() | |
old_time = current_time | |
if verbose: | |
print("Total time for repeat process: {:.2f} minutes.".format((time.time() - start_time) / 60)) | |
return best_formula, best_iter, best_err |