from utils.dataset_loader import get_dataset from nets.dense import Net from nets.deep_dense import dmodel from PINN.pinns import * import matplotlib.pyplot as plt import seaborn as sns import torch import os import numpy as np from torch import nn, tensor import pandas as pd import as px from sklearn.linear_model import SGDRegressor from sklearn.feature_selection import SelectFromModel class SCI(): #Scaled Computing Interface """ Scaled computing interface. Args: hidden_dim (int, optional): Max demension of hidden linear layer. Defaults to 200. Should be >80 in not 1d case dropout (bool, optional): LEGACY, don't use. Defaults to True. epochs (int, optional): Optionally specify epochs here, but better in train. Defaults to 10. dataset (str, optional): dataset to be selected from ./data. Defaults to 'test.pkl'. If name not exists, code will generate new dataset with upcoming parameters. sample_size (int, optional): Samples to be generated (note: BEFORE applying boundary conditions). Defaults to 1000. source (str, optional): Source from which data will be generated. Better to not change. Defaults to 'dataset.csv'. boundary_conditions (list, optional): If sepcified, whole dataset will be cut rectangulary. Input list is [ymin,ymax,xmin,xmax] type. Defaults to None. """ def __init__(self, hidden_dim:int = 200, dropout:bool = True, epochs:int = 10, dataset:str = 'test.pkl',sample_size:int=1000,source:str='dataset.csv',boundary_conditions:list=None): """Init Args: hidden_dim (int, optional): Max demension of hidden linear layer. Defaults to 200. Should be >80 in not 1d case dropout (bool, optional): LEGACY, don't use. Defaults to True. epochs (int, optional): Optionally specify epochs here, but better in train. Defaults to 10. dataset (str, optional): dataset to be selected from ./data. Defaults to 'test.pkl'. If name not exists, code will generate new dataset with upcoming parameters. sample_size (int, optional): Samples to be generated (note: BEFORE applying boundary conditions). Defaults to 1000. source (str, optional): Source from which data will be generated. Better to not change. Defaults to 'dataset.csv'. boundary_conditions (list, optional): If sepcified, whole dataset will be cut rectangulary. Input list is [ymin,ymax,xmin,xmax] type. Defaults to None. """ self.type:str = 'legacy' self.seed:int = 449 self.dim = hidden_dim self.dropout = dropout self.df = get_dataset(sample_size=sample_size,source=source,name=dataset,boundary_conditions=boundary_conditions) self.epochs = epochs self.len_idx = 0 self.input_dim_for_check = 0 def feature_gen(self, base:bool=True, fname:str=None,index:int=None,func=None) -> None: """ Generate new features. If base true, generates most obvious ones. You can customize this by adding new feature as name of column - fname, index of parent column, and lambda function which needs to be applied elementwise. Args: base (bool, optional): Defaults to True. fname (str, optional): Name of new column. Defaults to None. index (int, optional): Index of parent column. Defaults to None. func (_type_, optional): lambda function. Defaults to None. """ if base: self.df['P_sqrt'] = self.df.iloc[:,1].apply(lambda x: x ** 0.5) self.df['j'] = self.df.iloc[:,1]/(self.df.iloc[:,3]*self.df.iloc[:,4]) self.df['B'] = self.df.iloc[:,-1].apply(lambda x: x ** 2).apply(lambda x:1 if x>1 else x) self.df['nu_t'] = self.df.iloc[:,7]**2/(2*self.df.iloc[:,6]*self.df.P) if fname and index and func: self.df[fname] = self.df.iloc[:,index].apply(func) def feature_importance(self,X:pd.DataFrame,Y:pd.Series,verbose:int=1): """ Gets feature importance by SGD regression and score selection. Default threshold is 1.25*mean input X as self.df.iloc[:,(columns of choice)] Y as self.df.iloc[:,(column of choice)] Args: X (pd.DataFrame): Builtin DataFrame Y (pd.Series): Builtin Series verbose (int, optional): either to or to not print actual report. Defaults to 1. Returns: Report (str) """ mod = SGDRegressor() selector = SelectFromModel(mod,threshold='1.25*mean'),np.array(Y)) if verbose: print(f'\n Report of feature importance: {dict(zip(X.columns,selector.estimator_.coef_))}') for i in range(len(selector.get_support())): if selector.get_support()[i]: print(f'-rank 1 PASSED:',X.columns[i]) else: print(f'-rank 0 REJECT:',X.columns[i]) return f'\n Report of feature importance: {dict(zip(X.columns,selector.estimator_.coef_))}' def data_flow(self,columns_idx:tuple = (1,3,3,5), idx:tuple=None, split_idx:int = 800) -> """ Data prep pipeline It is called automatically, don't call it in your code. Args: columns_idx (tuple, optional): Columns to be selected (sliced 1:2 3:4) for feature fitting. Defaults to (1,3,3,5). idx (tuple, optional): 2|3 indexes to be selected for feature fitting. Defaults to None. Use either idx or columns_idx (for F:R->R idx, for F:R->R2 columns_idx) split_idx (int) : Index to split for training Returns: Torch native dataloader """ batch_size=2 self.split_idx=split_idx if idx!=None: self.len_idx = len(idx) if len(idx)==2: self.X = tensor(self.df.iloc[:,idx[0]].values[:split_idx]).float() self.Y = tensor(self.df.iloc[:,idx[1]].values[:split_idx]).float() batch_size = 1 else: self.X = tensor(self.df.iloc[:,[*idx[:-1]]].values[:split_idx,:]).float() self.Y = tensor(self.df.iloc[:,idx[2]].values[:split_idx]).float() else: self.X = tensor(self.df.iloc[:,columns_idx[0]:columns_idx[1]].values[:split_idx,:]).float() self.Y = tensor(self.df.iloc[:,columns_idx[2]:columns_idx[3]].values[:split_idx]).float() print('Shapes for debug: (X,Y)',self.X.shape, self.Y.shape) train_data =, self.Y) Xtrain =,batch_size=batch_size) self.input_dim = self.X.size(-1) self.indexes = idx if idx else columns_idx self.column_names = [self.df.columns[i] for i in self.indexes] return Xtrain def init_seed(self,seed): """ Initializes seed for torch optional() """ torch.manual_seed(seed) def train_epoch(self,X, model, loss_function, optim): for i,data in enumerate(X): Y_pred = model(data[0]) loss = loss_function(Y_pred, data[1]) # mean_abs_percentage_error = MeanAbsolutePercentageError() # ape = mean_abs_percentage_error(Y_pred, data[1]) loss.backward() optim.step() optim.zero_grad() ape_norm = abs(np.mean((Y_pred.detach().numpy()-data[1].detach().numpy())/(data[1].detach().numpy()+0.1))) if (i+1)%200==0: print(f'Iter {i+1} APE =',ape_norm) self.loss_history.append( self.ape_history.append(None if ape_norm >1 else ape_norm) def compile(self,columns:tuple=None,idx:tuple=None, optim:torch.optim = torch.optim.AdamW,loss:nn=nn.L1Loss, model:nn.Module = dmodel, custom:bool=False, lr:float=0.0001) -> None: """ Builds model, loss, optimizer. Has defaults Args: columns (tuple, optional): Columns to be selected for feature fitting. Defaults to (1,3,3,5). optim - torch Optimizer. Default AdamW loss - torch Loss function (nn). Defaults to L1Loss """ self.columns = columns if not(columns): self.len_idx = 0 else: self.len_idx = len(columns) if not(self.columns) and not(idx): self.Xtrain = self.data_flow() elif not(idx): self.Xtrain = self.data_flow(columns_idx=self.columns) else: self.Xtrain = self.data_flow(idx=idx) if custom: self.model = model() self.loss_function = loss() self.optim = optim(self.model.parameters(), lr=lr) if self.len_idx == 2: self.input_dim_for_check = 1 else: if self.len_idx == 2: self.model = model(in_features=1,hidden_features=self.dim).float() self.input_dim_for_check = 1 if self.len_idx == 3: self.model = Net(input_dim=2,hidden_dim=self.dim).float() if (self.len_idx != 2 or 3) or self.columns: self.model = Net(input_dim=self.input_dim,hidden_dim=self.dim).float() self.optim = optim(self.model.parameters(), lr=lr) self.loss_function = loss() if self.input_dim_for_check: self.X = self.X.reshape(-1,1) def train(self,epochs:int=10) -> None: """ Train model If sklearn instance uses .fit() epochs - optional """ if 'sklearn' in str(self.model.__class__):,np.array(self.Y)) plt.scatter(self.X,self.model.predict(self.X)) plt.scatter(self.X,self.Y) plt.xlabel('Xreal') plt.ylabel('Ypred/Yreal') return print('Sklearn model fitted successfully') else: self.model.train() self.loss_history = [] self.ape_history = [] self.epochs = epochs for j in range(self.epochs): self.train_epoch(self.Xtrain,self.model,self.loss_function,self.optim) plt.plot(self.loss_history,label='loss_history') plt.legend() def save(self,name:str='') -> None:,name) def onnx_export(self,path:str='./models/model.onnx'): torch.onnx.export(self.model,self.X,path) def jit_export(self,path:str='./models/'): """Exports properly defined model to jit Args: path (str, optional): path to models. Defaults to './models/'. """,path) def inference(self,X:tensor, model_name:str=None) -> np.ndarray: """ Inference of (pre-)trained model Args: X (tensor): your data in domain of train Returns: np.ndarray: predictions """ if model_name is None: self.model.eval() if model_name in os.listdir('./models'): model = torch.load(f'./models/{model_name}') model.eval() return model(X).detach().numpy() return self.model(X).detach().numpy() def plot(self): """ Automatic 2d plot """ self.model.eval() print(self.Y.shape,self.model(self.X).detach().numpy().shape,self.X.shape) if self.X.shape[-1] != self.model(self.X).detach().numpy().shape[-1]: print('Size mismatch, try 3d plot, plotting by first dim of largest tensor') if len(self.X.shape)==1: X = self.X else: X = self.X[:,0] plt.scatter(X,self.model(self.X).detach().numpy(),label='predicted',s=2) if len(self.Y.shape)!=1: plt.scatter(X,self.Y[:,1],s=1,label='real') else: plt.scatter(X,self.Y,s=1,label='real') plt.xlabel(rf'${self.column_names[0]}$') plt.ylabel(rf'${self.column_names[1]}$') plt.legend() else: plt.scatter(self.X,self.model(self.X).detach().numpy(),s=2,label='predicted') plt.scatter(self.X,self.Y,s=1,label='real') plt.xlabel(r'$X$') plt.ylabel(r'$Y$') plt.legend() def plot3d(self,colX=0,colY=1): """ Plot of inputs and predicted data in mesh format Returns: plotly plot """ X = self.X self.model.eval() x = X[:,colX].numpy().ravel() y = X[:,colY].numpy().ravel() z = self.model(X).detach().numpy().ravel() surf = px.scatter_3d(x=x, y=y,z=self.df.iloc[:,self.indexes[-1]].values[:self.split_idx],opacity=0.3, labels={'x':f'{self.column_names[colX]}', 'y':f'{self.column_names[colY]}', 'z':f'{self.column_names[-1]}' },title='Mesh prediction plot' ) # fig.colorbar(surf, shrink=0.5, aspect=5) surf.update_traces(marker_size=3) surf.update_layout(plot_bgcolor='#888888') surf.add_mesh3d(x=x, y=y, z=z, opacity=0.7,colorscale='sunsetdark',intensity=z, ) # return surf def performance(self,c=0.4) -> dict: """ Automatic APE based performance if applicable, else returns nan Args: c (float, optional): ZDE mitigation constant. Defaults to 0.4. Returns: dict: {'Generator_Accuracy, %':np.mean(a),'APE_abs, %':abs_ape,'Model_APE, %': ape} """ a=[] for i in range(1000): a.append(100-abs(np.mean(self.df.iloc[1:24,1:8].values-self.df.iloc[24:,1:8].sample(23).values)/(self.Y.numpy()[1:]+c))*100) gen_acc = np.mean(a) ape = (100-abs(np.mean(self.model(self.X).detach().numpy()-self.Y.numpy()[1:])*100)) abs_ape = ape*gen_acc/100 return {'Generator_Accuracy, %':np.mean(a),'APE_abs, %':abs_ape,'Model_APE, %': ape} def performance_super(self,c=0.4,real_data_column_index:tuple = (1,8),real_data_samples:int=23, generated_length:int=1000) -> dict: """Performance by custom parameters. APE loss Args: c (float, optional): ZDE mitigation constant. Defaults to 0.4. real_data_column_index (tuple, optional): Defaults to (1,8). real_data_samples (int, optional): Defaults to 23. generated_length (int, optional): Defaults to 1000. Returns:
        dict: {'Generator_Accuracy, %':np.mean(a),'APE_abs, %':abs_ape,'Model_APE, %': ape}
        """
        a=[]
        for i in range(1000):
            a.append(100-abs(np.mean(self.df.iloc[1:real_data_samples+1,real_data_column_index[0]:real_data_column_index[1]].values-self.df.iloc[real_data_samples+1:,real_data_column_index[0]:real_data_column_index[1]].sample(real_data_samples).values)/(self.Y.numpy()[1:]+c))*100)
        gen_acc = np.mean(a)
        ape = (100-abs(np.mean(self.model(self.X).detach().numpy()-self.Y.numpy()[1:])*100))
        abs_ape = ape*gen_acc/100
        return {'Generator_Accuracy, %':np.mean(a),'APE_abs, %':abs_ape,'Model_APE, %': ape} Parent: SCI() """ def __init__(self,*args,**kwargs): super(RCI,self).__init__() def data_flow(self,columns_idx:tuple = (1,3,3,5), idx:tuple=None, split_idx:int = 800) -> """ Data prep pipeline Args: columns_idx (tuple, optional): Columns to be selected (sliced 1:2 3:4) for feature fitting. Defaults to (1,3,3,5). idx (tuple, optional): 2|3 indexes to be selected for feature fitting. Defaults to None. Use either idx or columns_idx (for F:R->R idx, for F:R->R2 columns_idx) split_idx (int) : Index to split for training Returns: Torch native dataloader """ batch_size=2 real_scale = pd.read_csv('data/dataset.csv').iloc[17,1:].to_numpy() self.df.iloc[:,1:] = self.df.iloc[:,1:] * real_scale self.split_idx=split_idx if idx!=None: self.len_idx = len(idx) if len(idx)==2: self.X = tensor(self.df.iloc[:,idx[0]].values[:split_idx].astype(float)).float() self.Y = tensor(self.df.iloc[:,idx[1]].values[:split_idx].astype(float)).float() batch_size = 1 else: self.X = tensor(self.df.iloc[:,[idx[0],idx[1]]].values[:split_idx,:].astype(float)).float() self.Y = tensor(self.df.iloc[:,idx[2]].values[:split_idx].astype(float)).float() else: self.X = tensor(self.df.iloc[:,columns_idx[0]:columns_idx[1]].values[:split_idx,:].astype(float)).float() self.Y = tensor(self.df.iloc[:,columns_idx[2]:columns_idx[3]].values[:split_idx].astype(float)).float() self.Y = self.Y.abs() self.X = self.X.abs() print('Shapes for debug: (X,Y)',self.X.shape, self.Y.shape) train_data =, self.Y) Xtrain =,batch_size=batch_size) self.input_dim = self.X.size(-1) self.indexes = idx if idx else columns_idx self.column_names = [ self.df.columns[i] for i in self.indexes ] return Xtrain def compile(self,columns:tuple=None,idx:tuple=(3,1), optim:torch.optim = torch.optim.AdamW,loss:nn=nn.L1Loss, model:nn.Module = PINNd_p,lr:float=0.001) -> None: """ Builds model, loss, optimizer. Has defaults Args: columns (tuple, optional): Columns to be selected for feature fitting. Defaults to None. idx (tuple, optional): indexes to be selected Default (3,1) optim - torch Optimizer loss - torch Loss function (nn) """ self.columns = columns if not(columns): self.len_idx = 0 else: self.len_idx = len(columns) if not(self.columns) and not(idx): self.Xtrain = self.data_flow() elif not(idx): self.Xtrain = self.data_flow(columns_idx=self.columns) else: self.Xtrain = self.data_flow(idx=idx) self.model = model().float() self.input_dim_for_check = self.X.size(-1) self.optim = optim(self.model.parameters(), lr=lr) self.loss_function = loss() if self.input_dim_for_check == 1: self.X = self.X.reshape(-1,1) def plot(self): """ Plots 2d plot of prediction vs real values """ self.model.eval() if 'PINN' in str(self.model.__class__): self.preds=np.array([]) for i in self.X: self.preds = np.append(self.preds,self.model(i).detach().numpy()) print(self.Y.shape,self.preds.shape,self.X.shape) if self.X.shape[-1] != self.preds.shape[-1]: print('Size mismatch, try 3d plot, plotting by first dim of largest tensor') try: X = self.X[:,0] except: X = self.X pass plt.scatter(X,self.preds,label='predicted',s=2) if self.Y.shape[-1]!=1: sns.scatterplot(x=X,y=self.Y,s=2,label='real') else: sns.scatterplot(x=X,y=self.Y,s=1,label='real') plt.xlabel(rf'${self.column_names[0]}$') plt.ylabel(rf'${self.column_names[1]}$') plt.legend() else: sns.scatterplot(x=self.X,y=self.preds,s=2,label='predicted') sns.scatterplot(x=self.X,y=self.Y,s=1,label='real') plt.xlabel(r'$X$') plt.ylabel(r'$Y$') plt.legend() def performance(self,c=0.4) -> dict: """RCI performnace. APE errors. Args: c (float, optional): correction constant to mitigate division by 0 error. Defaults to 0.4. Returns: dict: {'Generator_Accuracy, %':np.mean(a),'APE_abs, %':abs_ape,'Model_APE, %': ape} """ a=[] for i in range(1000): dfcopy = (self.df.iloc[:,1:8]-self.df.iloc[:,1:8].min())/(self.df.iloc[:,1:8].max()-self.df.iloc[:,1:8].min()) a.append(100-abs(np.mean(dfcopy.iloc[1:24,1:].values-dfcopy.iloc[24:,1:].sample(23).values)/(dfcopy.iloc[1:24,1:].values+c))*100) gen_acc = np.mean(a) ape = (100-abs(np.mean(self.preds-self.Y.numpy())*100)) abs_ape = ape*gen_acc/100 return {'Generator_Accuracy, %':np.mean(a),'APE_abs, %':abs_ape,'Model_APE, %': ape}