""" @author: Caglar Aytekin contact: caglar@deepcause.ai """ import torch import torch.nn as nn import random import numpy as np import pandas as pd import copy class CustomEncodingFunction(torch.autograd.Function): @staticmethod def forward(ctx, x, tau,alpha): ctx.save_for_backward(x, tau) # Perform the tanh operation on (x + tau) y = torch.tanh(x + tau) # The actual forward output : binarized output forward_output = alpha * (2 * torch.round((y + 1) / 2) - 1) + (1-alpha)*y return forward_output @staticmethod def backward(ctx, grad_output): x, tau = ctx.saved_tensors # Use the derivative of tanh for the backward pass: 1 - tanh^2(x + tau) grad_input = grad_output * (1 - torch.tanh(x + tau) ** 2) return grad_input, grad_input,None # Assuming tau also requires gradient # Wrapping the custom function in a nn.Module for easier use class EncodingLayer(nn.Module): def __init__(self): super(EncodingLayer, self).__init__() def forward(self, x, tau,alpha): return CustomEncodingFunction.apply(x, tau,alpha) class LEURN(nn.Module): def __init__(self, preprocessor,depth,droprate): """ Initializes the model. Parameters: - preprocessor: A class containing useful info about the dataset - Including: attribute names, categorical features details, suggested embedding size for each category, output type, output dimension, transformation information - depth: Depth of the network - droprate: dropout rate """ super(LEURN, self).__init__() #Find categorical indices and category numbers for each self.alpha=1.0 self.preprocessor=preprocessor self.attribute_names=preprocessor.attribute_names self.label_encoders=preprocessor.encoders_for_nn self.categorical_indices = [info[0] for info in preprocessor.category_details] self.num_categories = [info[1] for info in preprocessor.category_details] #If embedding_size is integer, cast it to all categories if isinstance(preprocessor.suggested_embeddings, int): embedding_sizes = [preprocessor.suggested_embeddings] * len(self.categorical_indices) else: assert len(preprocessor.suggested_embeddings) == len(self.categorical_indices), "Length of embedding_size must match number of categorical features" embedding_sizes = preprocessor.suggested_embeddings self.embedding_sizes=embedding_sizes #Embedding layers for categorical features self.embeddings = nn.ModuleList([ nn.Embedding(num_categories, embedding_dim) for num_categories, embedding_dim in zip(self.num_categories, embedding_sizes) ]) for embedding_now in self.embeddings: nn.init.uniform_(embedding_now.weight, -1.0, 1.0) self.total_embedding_size = sum(embedding_sizes) #number of categorical features for NN self.non_cat_input_dim = len(self.attribute_names) - len(self.categorical_indices) #Number of numerical features for NN self.nn_input_dim = self.total_embedding_size + self.non_cat_input_dim #Number of features for NN #LAYERS self.tau_initial = nn.Parameter(torch.zeros(1,self.nn_input_dim)) # Initial tau as a learnable parameter self.layers = nn.ModuleList() self.depth = depth self.output_type=preprocessor.output_type for d_now in range(depth): # Each iteration adds an encoding layer followed by a dropout and then a linear layer self.layers.append(EncodingLayer()) self.layers.append(nn.Dropout1d(droprate)) linear_layer = nn.Linear((d_now + 1) * self.nn_input_dim, self.nn_input_dim) self._init_weights(linear_layer,d_now+1) #special layer initialization self.layers.append(linear_layer) # Final stage: dropout and linear layer self.final_dropout=nn.Dropout1d(droprate) self.final_linear = nn.Linear(depth * self.nn_input_dim, self.preprocessor.output_dim) self._init_weights(self.final_linear, depth) def set_alpha(self, alpha): """Method to update the dynamic parameter.""" self.alpha = alpha def _init_weights(self, layer,depth_now): # Custom initialization # Considering the binary (-1,1) nature of the input, # when we initialize layer in (-1/dim,1/dim) range, output is bounded at (-1,1) # Knowing our input is roughly at (-1,1) range, this serves as good initialization for tau if not(self.embedding_sizes==[]): init_tensor = torch.tensor([1/size for size in self.embedding_sizes for _ in range(size)]) if init_tensor.shape[0] input_value and threshold < upper_boundaries[feature_index]: upper_boundaries[feature_index] = threshold # If the threshold is less than the input value and greater than the current lower boundary, update the lower boundary if threshold < input_value and threshold > lower_boundaries[feature_index]: lower_boundaries[feature_index] = threshold # Convert boundaries to a list of tuples [(lower, upper), ...] for each feature boundaries = list(zip(lower_boundaries.tolist(), upper_boundaries.tolist())) self.upper_boundaries=upper_boundaries self.lower_boundaries=lower_boundaries return boundaries def categories_within_boundaries(self): """ For each categorical feature, checks if embedding weights fall within the specified upper and lower boundaries. Returns a dictionary with categorical feature indices as keys and lists of category indices that fall within the boundaries. """ categories_within_bounds = {} emb_st=0 for cat_index, emb_layer in zip(range(len(self.categorical_indices)), self.embeddings): # Extract upper and lower boundaries for this categorical feature lower_bound=self.lower_boundaries[emb_st:emb_st+self.embedding_sizes[cat_index]] upper_bound=self.upper_boundaries[emb_st:emb_st+self.embedding_sizes[cat_index]] emb_st=emb_st+self.embedding_sizes[cat_index] # Initialize list to hold categories that fall within boundaries categories_within = [] # Iterate over each embedding vector in the layer for i, weight in enumerate(emb_layer.weight): # Check if the embedding weight falls within the boundaries if torch.all(weight >= lower_bound) and torch.all(weight <= upper_bound): categories_within.append(i) # Using index i as category identifier # Store the categories that fall within the boundaries for this feature categories_within_bounds[cat_index] = categories_within return categories_within_bounds def global_importance(self): final_layer_weight=torch.clone(self.final_linear.weight).detach().numpy() importances=np.sum(np.abs(final_layer_weight),0) importances=importances.reshape(importances.shape[0]//self.nn_input_dim,self.nn_input_dim) importances=np.sum(importances,0) importances_features=[] st=0 for i in range(len(self.attribute_names)): try: importances_features.append(np.sum(importances[st:st+self.embedding_sizes[i]])) st=st+self.embedding_sizes[i] except: st=st+1 return np.argsort(importances_features)[::-1],np.sort(importances_features)[::-1] def influence_matrix(self): """ Finds ADG from how each feature effects other's threshold via weight matrices """ def create_block_sum_matrix(sizes, matrix): L = len(sizes) # Initialize the output matrix with zeros, using PyTorch block_sum_matrix = torch.zeros((L, L)) # Define the starting row and column indices for slicing start_row = 0 for i, row_size in enumerate(sizes): start_col = 0 for j, col_size in enumerate(sizes): # Calculate the sum of the current block using PyTorch block_sum = torch.sum(matrix[start_row:start_row+row_size, start_col:start_col+col_size]) block_sum_matrix[i, j] = block_sum # Update the starting column index for the next block in the row start_col += col_size # Update the starting row index for the next block in the column start_row += row_size return block_sum_matrix def add_ones_until_target(initial_list, target_sum): # Continue adding 1s until the sum of the list equals the target sum while sum(initial_list) < target_sum: initial_list.append(1) return initial_list for i in range(0, self.depth * 3, 3): # encode, drop and find next tau weight_now=self.layers[i + 2].weight weight_now_reshaped=weight_now.reshape((weight_now.shape[0], weight_now.shape[1]//self.nn_input_dim,self.nn_input_dim)) #shape: output x depth x input if i==0: # effects=np.sum(np.abs(weight_now_reshaped.numpy()),axis=1)/self.depth #shape: output x input effects=torch.sum(torch.abs(weight_now_reshaped), dim=1) / self.depth else: effects=effects+torch.sum(torch.abs(weight_now_reshaped), dim=1) / self.depth effects=effects.t() #shape: input x output modified_list = add_ones_until_target(copy.deepcopy(self.embedding_sizes), effects.shape[0]) effects=create_block_sum_matrix(modified_list,effects) return effects def explain_without_causal_effects(self,x): """ Explains decisions of the neural network for input sample. For numericals, extracts upper and lower boundaries on the sample For categoricals displays possible categories Also calculates contributions of each feature to final result """ self.find_boundaries(x) #find upper, lower boundaries for all nn inputs #find valid categories for categorical features valid_categories=self.categories_within_boundaries() #numerical boundaries upper_numerical=self.upper_boundaries[sum(self.embedding_sizes):].detach().numpy() lower_numerical=self.lower_boundaries[sum(self.embedding_sizes):].detach().numpy() #Find contribution from each feature in final linear layer, distribute bias evenly contributions=self.encodings * self.final_linear.weight + self.final_linear.bias.unsqueeze(dim=-1)/self.final_linear.weight.shape[1] contributions=contributions.detach().resize_((contributions.shape[0], contributions.shape[1]//self.nn_input_dim,self.nn_input_dim)) contributions=torch.sum(contributions,dim=1) # Initialize an empty list to store the summed contributions summed_contributions = [] # Initialize start index for slicing start_idx = 0 #Sum contribution of each categorical within respective embedding for size in self.embedding_sizes: # Calculate end index for the current chunk end_idx = start_idx + size # Sum the contributions in the current chunk chunk_sum = contributions[:, start_idx:end_idx].sum(dim=1, keepdim=True) # Append the summed chunk to the list summed_contributions.append(chunk_sum) # Update the start index for the next chunk start_idx = end_idx # If there are remaining elements not covered by embedding_sizes, add them as is (numerical features) if start_idx < contributions.shape[1]: remaining = contributions[:, start_idx:] summed_contributions.append(remaining) # Concatenate the summed contributions back into a tensor summed_contributions = torch.cat(summed_contributions, dim=1) # This is to handle multi-class explanations, for binary this is 0 automatically # Note: multi-output regression is not supported yet. This will just return largest regressed value's explanations highest_index=torch.argmax(summed_contributions.sum(dim=1)) # This is contribution from each feature result=summed_contributions[highest_index] self.result=result #Explanation and Contribution formats are in ordered format (categoricals first, numericals later) #Bring them to original format in user input #Combine categoricals and numericals explanations and contributions Explanation = [None] * (len(self.categorical_indices) + len(upper_numerical)) Contribution = np.zeros((len(self.categorical_indices) + len(upper_numerical),)) # Fill in the categorical samples for j, cat_index in enumerate(self.categorical_indices): Explanation[cat_index] = valid_categories[j] Contribution[cat_index] = result[j].numpy() #INVERSE TRANSFORM PART 1------------------------------------------------------------------------------------------- #Inverse transform upper and lower_numericals len_num=len(upper_numerical) if len_num>0: upper_numerical=self.preprocessor.scaler.inverse_transform(upper_numerical.reshape(1,-1)) lower_numerical=self.preprocessor.scaler.inverse_transform(lower_numerical.reshape(1,-1)) if len_num>1: upper_numerical=np.squeeze(upper_numerical) lower_numerical=np.squeeze(lower_numerical) upper_iter = iter(upper_numerical) lower_iter = iter(lower_numerical) cnt=0 for i in range(len(Explanation)): if Explanation[i] is None: #Note the denormalization here Explanation[i] = next(lower_iter),next(upper_iter) if len(self.categorical_indices)>0: Contribution[i] = result[j+cnt+1].numpy() else: Contribution[i] = result[cnt].numpy() cnt=cnt+1 attribute_names_list = [] revised_explanations_list = [] contributions_list = [] # Process each feature to fill lists for idx, attr_name in enumerate(self.attribute_names): if isinstance(Explanation[idx], list): # Categorical #INVERSE TRANSFORM PART 2------------------------------------------------------------------------------------------- #Inverse transform categoricals category_names = [key for key, value in self.label_encoders[attr_name].items() if value in Explanation[idx]] revised_explanation = " ,OR, ".join(category_names) elif isinstance(Explanation[idx], tuple): # Numerical revised_explanation = f"{Explanation[idx][0].item()} to {Explanation[idx][1].item()}" else: revised_explanation = "Unknown" #shouldn't really happen # Append to lists attribute_names_list.append(attr_name) revised_explanations_list.append(revised_explanation) contributions_list.append(Contribution[idx] if idx < len(Contribution) else None) # Construct DataFrame Explanation_df = pd.DataFrame({ 'Name': attribute_names_list, 'Category': revised_explanations_list, 'Contribution': contributions_list }) result=self.preprocessor.inverse_transform_y(self.output) # Explanation_df['Result'] = [result] * len(Explanation_df) return copy.deepcopy(Explanation_df),self.output.clone(),copy.deepcopy(result),copy.deepcopy(Explanation) def explain(self,x,include_causal_analysis=False): """ Fixes all features but one, sweeps that feature across its own categories, reports the average change from other categories to current one. """ def update_intervals(available_intervals, incoming_interval): updated_intervals = [] for interval in available_intervals: if incoming_interval[1] <= interval[0] or incoming_interval[0] >= interval[1]: # The incoming interval does not overlap, keep the interval as is updated_intervals.append(interval) else: # There is some overlap, possibly split the interval if incoming_interval[0] > interval[0]: # Add the left part that doesn't overlap updated_intervals.append((interval[0], incoming_interval[0])) if incoming_interval[1] < interval[1]: # Add the right part that doesn't overlap updated_intervals.append((incoming_interval[1], interval[1])) return updated_intervals def sample_from_intervals(available_intervals): if not available_intervals: return None # Choose a random interval chosen_interval = random.choice(available_intervals) # Sample a random point within this interval return random.uniform(chosen_interval[0], chosen_interval[1]) Explanation_df,output,result,Explanation=self.explain_without_causal_effects(x) if include_causal_analysis: # Causal analysis causal_effect=np.zeros((x.shape[-1],)) numerical_cnt=0 for idx, attr_name in enumerate(self.attribute_names): if isinstance(Explanation[idx], list): # Categorical all_category_names = [value for key, value in self.label_encoders[attr_name].items()] sweeped_category_names = [value for key, value in self.label_encoders[attr_name].items() if value in Explanation[idx]] if list(set(all_category_names)-set(sweeped_category_names)) == []: is_category_empty=True else: is_category_empty=False cnt=0 while is_category_empty==False: new_x=x.clone() next_category=list(set(all_category_names)-set(sweeped_category_names))[0] new_x[0,idx]=float(next_category) Explanation_df_new,output_new,result_new,Explanation_new=self.explain_without_causal_effects(new_x) sweeped_category_names = sweeped_category_names+[value for key, value in self.label_encoders[attr_name].items() if value in Explanation_new[idx]] if list(set(all_category_names)-set(sweeped_category_names)) == []: is_category_empty=True else: is_category_empty=False causal_effect[idx]=causal_effect[idx]+(output-output_new).detach().numpy()[0,0] cnt=cnt+1 if cnt>0: causal_effect[idx]=causal_effect[idx]/cnt else: search_complete=False # Initial available interval . we know -100,100 from initial setting up lower, upper bounds available_intervals = [(-100, 100)] # Example incoming intervals #numerical boundaries self.explain_without_causal_effects(x) upper_numerical=self.upper_boundaries[sum(self.embedding_sizes):].detach().numpy() lower_numerical=self.lower_boundaries[sum(self.embedding_sizes):].detach().numpy() incoming_interval = (lower_numerical[numerical_cnt],upper_numerical[numerical_cnt]) available_intervals = update_intervals(available_intervals, incoming_interval) cnt=0 while not(search_complete): new_sample=sample_from_intervals(available_intervals) new_x=x.clone() new_x[0,idx]=new_sample Explanation_df_new,output_new,result_new,Explanation_new=self.explain_without_causal_effects(new_x) causal_effect[idx]=causal_effect[idx]+(output-output_new).detach().numpy()[0,0] cnt=cnt+1 upper_numerical=self.upper_boundaries[sum(self.embedding_sizes):].detach().numpy() lower_numerical=self.lower_boundaries[sum(self.embedding_sizes):].detach().numpy() incoming_interval = (lower_numerical[numerical_cnt],upper_numerical[numerical_cnt]) available_intervals = update_intervals(available_intervals, incoming_interval) if available_intervals == []: search_complete=True if cnt>0: causal_effect[idx]=causal_effect[idx]/cnt numerical_cnt=numerical_cnt+1 Explanation_df['Causal Effects'] = causal_effect return Explanation_df,output,result def sample_from_boundaries(self): """ Assumes higher and lower boundaries are already extracted (eg self.explain is run on one input already) Samples a value for each feature within the specified upper and lower boundaries stored in the class instance. For numericals, samples a value, for categoricals samples a category from possible categories Returns: - A tensor containing sampled values within the given boundaries for each feature. """ #First sample from categories categories_within_bounds=self.categories_within_boundaries() try: sampled_indices = [random.choice(categories) for categories in categories_within_bounds.values()] except: categories_within_bounds=self.categories_within_boundaries() #Then from numericals samples = [] cnt=0 for lower, upper in zip(self.lower_boundaries[sum(self.embedding_sizes):], self.upper_boundaries[sum(self.embedding_sizes):]): # Sample from a uniform distribution between lower and upper boundaries upper=torch.minimum(upper.detach(),torch.from_numpy(np.array(1.0).astype('float32'))) lower=torch.maximum(lower.detach(),torch.from_numpy(np.array(-1.0).astype('float32'))) sample = lower + (upper - lower) * torch.rand(1) samples.append(sample) cnt=cnt+1 #Combine categoricals and numericals # Initialize an empty list to hold the combined samples combined_samples = [None] * (len(self.categorical_indices) + len(samples)) # Fill in the categorical samples for i, cat_index in enumerate(self.categorical_indices): combined_samples[cat_index] = torch.tensor([sampled_indices[i]], dtype=torch.float) # Fill in the numerical samples num_samples_iter = iter(samples) for i in range(len(combined_samples)): if combined_samples[i] is None: combined_samples[i] = next(num_samples_iter) # Combine into a single tensor combined_tensor = torch.cat(combined_samples, dim=-1) return combined_tensor.unsqueeze(dim=0) def generate(self): """ Generates a data sample from learned network """ def sample_with_tau(tau,max_bound,min_bound): # Sample according to tau, lower and upper bounds sampled=torch.zeros((self.nn_input_dim)) st=0 # Randomly pick from valid categories for embedding in self.embeddings: categories_within = [] # Iterate over each embedding vector in the layer for i, weight in enumerate(embedding.weight): # Check if the embedding weight falls within the boundaries if torch.all(weight >= min_bound[st:st+embedding.weight.shape[-1]]) and torch.all(weight <= max_bound[st:st+embedding.weight.shape[-1]]): categories_within.append(i) # Using index i as category identifier feature_now=embedding.weight[np.random.choice(categories_within),:] cnt=0 for j in range(st,st+embedding.weight.shape[-1]): if feature_now[cnt]>-tau[0,j]: sampled[j]=1.0 elif feature_now[cnt]<=-tau[0,j]: sampled[j]=-1.0 cnt=cnt+1 st=st+embedding.weight.shape[-1] #Randomly sample for numericals for i in range(st,self.nn_input_dim): if -tau[0,i]>max_bound[i]: #In this case you have to pick -1 sampled[i]=-1.0 elif -tau[0,i]<=min_bound[i]: #In this case you have to pick 1 sampled[i]=1.0 else: sampled[i] = (torch.randint(low=0, high=2, size=(1,)) * 2 - 1).float() return sampled def bound_update(tau,max_bound,min_bound,sampled): for i in range(self.nn_input_dim): if sampled[i]>0: #means input is larger than -tau, so -tau might set a lower bound if -tau[0,i]>min_bound[i]: min_bound[i]=-tau[0,i] elif sampled[i]<=0: #means input is smaller than -tau, so -tau might set an upper bound if -tau[0,i]