""" @author: Caglar Aytekin contact: caglar@deepcause.ai """ import numpy as np from sklearn.preprocessing import LabelEncoder, MinMaxScaler import warnings from sklearn.model_selection import train_test_split import torch import pandas as pd pd.set_option('display.max_rows', None) # None means show all rows pd.set_option('display.max_columns', None) # None means show all columns pd.set_option('display.width', None) # Use appropriate width to display columns pd.set_option('display.max_colwidth', None) # Show full content of each column warnings.filterwarnings("ignore") def split_and_processing(X,y,categoricals,output_type,attribute_names): #If every entryin a column of a dataframe is None drop it columns_to_keep_mask = ~X.isna().all() X = X.dropna(axis=1, how='all') # Update the categoricals list to reflect the columns not dropped categoricals = [cat for cat, keep in zip(categoricals, columns_to_keep_mask) if keep] attribute_names= [cat for cat, keep in zip(attribute_names, columns_to_keep_mask) if keep] # Split into train and remaining X_train, X_remaining, y_train, y_remaining = train_test_split(X, y, test_size=0.2, random_state=42) # Split remaining into validation and test X_val, X_test, y_val, y_test = train_test_split(X_remaining, y_remaining, test_size=0.5, random_state=42) # Initialize preprocessor preprocessor=DataProcessor(categoricals,output_type) #Fit and transform for training set X_train=torch.from_numpy(preprocessor.fit_transform_X(X_train).values).float() y_train=torch.from_numpy(preprocessor.fit_transform_y(y_train)).float() if output_type<2: y_train=y_train.unsqueeze(dim=-1) else: y_train=y_train.long() #Transform for validation and test set X_val=torch.from_numpy(preprocessor.transform_X(X_val).values).float() y_val=torch.from_numpy(preprocessor.transform_y(y_val)).float() if output_type<2: y_val=y_val.unsqueeze(dim=-1) else: y_val=y_val.long() X_test=torch.from_numpy(preprocessor.transform_X(X_test).values).float() y_test=torch.from_numpy(preprocessor.transform_y(y_test)).float() if output_type<2: y_test=y_test.unsqueeze(dim=-1) else: y_test=y_test.long() preprocessor.attribute_names=attribute_names preprocessor.output_type=output_type #Determine class no if output_type==0: output_dim=y_train.shape[1] elif output_type==1: output_dim=1 else: output_dim=len(np.unique(y_train)) preprocessor.output_dim=output_dim return X_train,X_val,X_test,y_train,y_val,y_test,preprocessor class DataProcessor: def __init__(self, categoricals, output_type): self.categoricals = categoricals self.output_type = output_type self.label_encoders = {} self.scaler = MinMaxScaler(feature_range=(-1, 1)) self.target_scaler = MinMaxScaler(feature_range=(-1, 1)) self.most_common_categories = {} self.target_encoder = None # For binary and multiclass self.unique_targets = None # To store unique targets for binary classification self.category_details=[] self.suggested_embeddings=None self.encoders_for_nn={} def fit_transform_X(self, X): # Convert all numerical columns to float precision X.iloc[:, ~np.array(self.categoricals)] = X.iloc[:, ~np.array(self.categoricals)].astype(float) X.iloc[:, np.array(self.categoricals)] = X.iloc[:, np.array(self.categoricals)].astype(str) X_transformed = X.copy() for i, is_categorical in enumerate(self.categoricals): if is_categorical: encoder = LabelEncoder() X_transformed.iloc[:, i] = encoder.fit_transform(X.iloc[:, i]) self.label_encoders[i] = encoder self.encoders_for_nn[X_transformed.columns[i]] = dict(zip(encoder.classes_, encoder.transform(encoder.classes_))) self.most_common_categories[i] = X.iloc[:, i].mode()[0] self.category_details.append((i, len(encoder.classes_))) else: # Fill missing values with the median for numerical columns X_transformed.iloc[:, i] = X.iloc[:, i].fillna(X.iloc[:, i].median()) # Scale numerical features numerical_features = X_transformed.iloc[:, ~np.array(self.categoricals)] if numerical_features.shape[-1]>0: self.scaler.fit(numerical_features) X_transformed.iloc[:, ~np.array(self.categoricals)] = self.scaler.transform(numerical_features) self.suggested_embeddings=[max(2, int(np.log2(x[1]))) for x in self.category_details] return X_transformed.astype(float) def transform_X(self, X): X.iloc[:, np.array(self.categoricals)] = X.iloc[:, np.array(self.categoricals)].astype(str) X_transformed = X.copy() for i, is_categorical in enumerate(self.categoricals): if is_categorical: encoder = self.label_encoders[i] # Transform categories, replace unseen with most common category X_transformed.iloc[:, i] = X.iloc[:, i].map(lambda x: x if x in encoder.classes_ else self.most_common_categories[i]) X_transformed.iloc[:, i] = encoder.transform(X_transformed.iloc[:, i]) else: X_transformed.iloc[:, i] = X.iloc[:, i].fillna(X.iloc[:, i].mean()) # Scale numerical features numerical_features = X_transformed.iloc[:, ~np.array(self.categoricals)] if numerical_features.shape[-1]>0: X_transformed.iloc[:, ~np.array(self.categoricals)] = self.scaler.transform(numerical_features) return X_transformed.astype(float) def inverse_transform_X(self, sample): #inverse transform from pytorch tensor sample=sample.detach().numpy() sample_inverse_transformed = pd.DataFrame(sample.copy()) #Handle numerical features numerical_features_indices = np.where(~np.array(self.categoricals))[0] if len(numerical_features_indices)>0: sample_inverse_transformed.iloc[:,numerical_features_indices] = self.scaler.inverse_transform(sample[:,numerical_features_indices]) for i, is_categorical in enumerate(self.categoricals): if is_categorical: encoder = self.label_encoders[i] sample_inverse_transformed.iloc[:, i] = encoder.inverse_transform(sample[:, i].astype('int')) sample_inverse_transformed.columns = self.attribute_names return sample_inverse_transformed def fit_transform_y(self, y): if self.output_type == 0: # Regression y_transformed = self.target_scaler.fit_transform(y.values.reshape(-1, 1)).flatten() elif self.output_type == 1: # Binary classification self.unique_targets = y.unique() mapping = {category: idx for idx, category in enumerate(self.unique_targets)} y_transformed = y.map(mapping).astype(int).values elif self.output_type == 2: # Multiclass classification self.target_encoder = LabelEncoder() y_transformed = self.target_encoder.fit_transform(y) else: raise ValueError("Invalid output type") return y_transformed def transform_y(self, y): if self.output_type == 0: # Regression y_transformed = self.target_scaler.transform(y.values.reshape(-1, 1)).flatten() elif self.output_type == 1: # Binary classification mapping = {category: idx for idx, category in enumerate(self.unique_targets)} y_transformed = y.map(mapping).astype(int).values elif self.output_type == 2: # Multiclass classification y_transformed = self.target_encoder.transform(y) else: raise ValueError("Invalid output type") return y_transformed def inverse_transform_y(self, nn_output): if self.output_type == 0: # Regression y_transformed=nn_output.squeeze().detach().numpy() return self.target_scaler.inverse_transform(y_transformed.reshape(-1, 1)).flatten() elif self.output_type == 1: # Binary classification y_transformed=int(np.round(torch.sigmoid(nn_output).squeeze().detach().numpy())) inverse_mapping = {idx: category for idx, category in enumerate(self.unique_targets)} return inverse_mapping[y_transformed] elif self.output_type == 2: # Multiclass classification y_transformed=int(np.round(torch.argmax(nn_output).squeeze().detach().numpy())) return self.target_encoder.inverse_transform([y_transformed]) else: raise ValueError("Invalid output type")