import torch import torch.nn as nn from torch.utils.data import Dataset, DataLoader LABEL_MAP = {'negative': 0, 'not included':0, '0':0, 0:0, 'excluded':0, 'positive': 1, 'included':1, '1':1, 1:1, } class SLR_DataSet(Dataset): def __init__(self, treat_text =None, etailment_txt =None, LABEL_MAP= None, NA = None, **args): self.tokenizer = args.get('tokenizer') self.data = args.get('data').reset_index() self.max_seq_length = args.get("max_seq_length", 512) self.INPUT_NAME = args.get("input", 'x') self.LABEL_NAME = args.get("output", None) self.treat_text = treat_text self.etailment_txt = etailment_txt self.LABEL_MAP=LABEL_MAP self.NA=NA if not self.INPUT_NAME in self.data.columns: self.data[self.INPUT_NAME] = np.nan # Tokenizing and processing text def encode_text(self, example): comment_text = example[self.INPUT_NAME] if not isinstance(self.treat_text,type(None)): comment_text = self.treat_text(comment_text) if example[self.LABEL_NAME] is np.NaN and self.NA != None: labels = self.NA elif self.LABEL_NAME != None: try: labels = self.LABEL_MAP[example[self.LABEL_NAME]] except: labels = -1 # raise TypeError(f"Label passed {example[self.LABEL_NAME]}, is not be in LABEL_MAP") # print('Not handle LABEL_MAP') else: labels = None if self.etailment_txt: tensor_data = self.tokenize((comment_text, self.etailment_txt), labels ) else: tensor_data = self.tokenize((comment_text), labels) return tensor_data def tokenize(self, comment_text, labels): encoding = self.tokenizer.encode_plus( (comment_text), add_special_tokens=True, max_length=self.max_seq_length, return_token_type_ids=True, padding="max_length", truncation=True, return_attention_mask=True, return_tensors='pt', ) if labels != None: return tuple((( encoding["input_ids"].flatten(), encoding["attention_mask"].flatten(), encoding["token_type_ids"].flatten() ), torch.tensor([torch.tensor(labels).to(int)]) )) else: return tuple((( encoding["input_ids"].flatten(), encoding["attention_mask"].flatten(), encoding["token_type_ids"].flatten() ), torch.empty(0) )) def __len__(self): return len(self.data) # Returning data def __getitem__(self, index: int): # print(index) data_row = self.data.iloc[index] tensor_data = self.encode_text(data_row) return tensor_data from tqdm import tqdm import gc from IPython.display import clear_output from collections import namedtuple features = namedtuple('features', ['bert', 'feature_map']) Output = namedtuple('Output', ['loss', 'features', 'logit']) bert_tuple = namedtuple('bert',['hidden_states', 'attentions']) class loop(): @classmethod def train_loop(self, model,device, optimizer, data_train_loader, scheduler = None, data_valid_loader = None, epochs = 4, print_info = 1000000000, metrics = True, log = None, metrics_print = True): # Start the model's parameters table.reset() model.to(device) model.train() # Task epochs (Inner epochs) for epoch in range(0, epochs): train_loss, _, out = self.batch_loop(data_train_loader, model, optimizer, device) if scheduler is not None: for sched in scheduler: sched.step() if (epoch % print_info == 0): if metrics: labels = self.map_batch(out[1]).to(int).squeeze() logits = self.map_batch(out[0]).squeeze() train_metrics, _ = plot(logits, labels, 0.9) del labels, logits train_metrics['Loss'] = torch.Tensor(train_loss).mean().item() if not isinstance(log,type(None)): log({"train_"+ x :y for x,y in train_metrics.items()}) table(train_metrics, epoch, "Train") else: print("Loss: ", torch.Tensor(train_loss).mean().item()) if data_valid_loader: valid_loss, _, out = self.eval_loop(data_valid_loader, model, device=device) if metrics: global out2 out2 = out labels = self.map_batch(out[1]).to(int).squeeze() logits = self.map_batch(out[0]).squeeze() valid_metrics, _ = plot(logits, labels, 0.9) valid_metrics['Loss'] = torch.Tensor(valid_loss).mean().item() del labels, logits if not isinstance(log,type(None)): log({"valid_"+ x :y for x,y in train_metrics.items()}) table(valid_metrics, epoch, "Valid") if metrics_print: print(table.data_frame().round(4)) else: print("Valid Loss: ", torch.Tensor(valid_loss).mean().item()) return table.data_frame() @classmethod def batch_loop(self, loader, model, optimizer, device): all_loss = [] features_lst = [] attention_lst = [] logits = [] outputs = [] # Test's Batch loop for inner_step, batch in enumerate(tqdm(loader, desc="Train validation | ", ncols=80)) : input, output =batch input = tuple(t.to(device) for t in input) if isinstance(output, torch.Tensor): output = output.to(device) optimizer.zero_grad() # Predictions loss, feature, logit = model(input, output) # compute grads loss.backward() # update parameters optimizer.step() input = tuple(t.to("cpu") for t in input) if isinstance(output, torch.Tensor): output = output.to("cpu") if isinstance(loss, torch.Tensor): all_loss.append(loss.to('cpu').detach().clone()) if isinstance(logit, torch.Tensor): logits.append(logit.to('cpu').detach().clone()) if isinstance(output, torch.Tensor): outputs.append(output.to('cpu').detach().clone()) if len(feature.feature_map)!=0: features_lst.append([x.to('cpu').detach().clone() for x in feature.feature_map]) del batch, input, output, loss, feature, logit # model.to('cpu') gc.collect() torch.cuda.empty_cache() # del model, optimizer return Output(all_loss, features(None,features_lst), (logits, outputs)) @classmethod def eval_loop(self, loader, model, device, attention= False, hidden_states=False): all_loss = [] features_lst = [] attention_lst = [] hidden_states_lst = [] logits = [] outputs = [] model.eval() with torch.no_grad(): # Test's Batch loop for inner_step, batch in enumerate(tqdm(loader, desc="Test validation | ", ncols=80)) : input, output =batch input = tuple(t.to(device) for t in input) if output.numel()!=0: # Predictions loss, feature, logit = model(input, output.to(device), attention= attention, hidden_states=hidden_states) else: # Predictions loss, feature, logit = model(input, attention= attention, hidden_states=hidden_states) input = tuple(t.to("cpu") for t in input) if isinstance(output, torch.Tensor): output = output.to("cpu") if isinstance(loss, torch.Tensor): all_loss.append(loss.to('cpu').detach().clone()) if isinstance(logit, torch.Tensor): logits.append(logit.to('cpu').detach().clone()) try: if not isinstance(feature.bert.attentions, type(None)): attention_lst.append([x.to('cpu').detach().clone() for x in feature.bert.attentions]) except: attention_lst = None try: if not isinstance(feature.bert.hidden_states, type(None)): hidden_states_lst.append([x.to('cpu').detach().clone() for x in feature.bert.hidden_states]) except: hidden_states_lst = None if isinstance(output, torch.Tensor): outputs.append(output.to('cpu').detach().clone()) if len(feature.feature_map)!=0: features_lst.append([x.to('cpu').detach().clone() for x in feature.feature_map]) del batch, input, output, loss, feature, logit # model.to('cpu') gc.collect() torch.cuda.empty_cache() # del model, optimizer return Output(all_loss, features(bert_tuple(hidden_states_lst,attention_lst),features_lst), (logits, outputs)) # Process predictions and map the feature_map in tsne @staticmethod def map_batch(features): features = torch.cat(features, dim =0) # features = np.concatenate(np.array(features,dtype=object)).astype(np.float32) # features = torch.tensor(features) return features.detach().clone() class table: data = [] index = [] @torch.no_grad() def __init__(self, data, epochs, name): self.index.append((epochs, name)) self.data.append(data) @classmethod @torch.no_grad() def data_frame(cls): clear_output() index = pd.MultiIndex.from_tuples(cls.index, names=["Epochs", "Data"]) data = pd.DataFrame(cls.data, index=index) return data @classmethod @torch.no_grad() def reset(cls): cls.data = [] cls.index = [] from collections import namedtuple # Declaring namedtuple() # Pre-trained model class Encoder(nn.Module): def __init__(self, layers, freeze_bert, model): super(Encoder, self).__init__() # Dummy Parameter self.dummy_param = nn.Parameter(torch.empty(0)) # Pre-trained model self.model = deepcopy(model) # Freezing bert parameters if freeze_bert: for param in self.model.parameters(): param.requires_grad = freeze_bert # Selecting hidden layers of the pre-trained model old_model_encoder = self.model.encoder.layer new_model_encoder = nn.ModuleList() for i in layers: new_model_encoder.append(old_model_encoder[i]) self.model.encoder.layer = new_model_encoder # Feed forward def forward(self, output_attentions=False,output_hidden_states=False, **x): return self.model(output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=True, **x) # Complete model class SLR_Classifier(nn.Module): def __init__(self, **data): super(SLR_Classifier, self).__init__() # Dummy Parameter self.dummy_param = nn.Parameter(torch.empty(0)) # Loss function # Binary Cross Entropy with logits reduced to mean self.loss_fn = nn.BCEWithLogitsLoss(reduction = 'mean', pos_weight=torch.FloatTensor([data.get("pos_weight", 2.5)])) # Pre-trained model self.Encoder = Encoder(layers = data.get("bert_layers", range(12)), freeze_bert = data.get("freeze_bert", False), model = data.get("model"), ) # Feature Map Layer self.feature_map = nn.Sequential( # nn.LayerNorm(self.Encoder.model.config.hidden_size), nn.BatchNorm1d(self.Encoder.model.config.hidden_size), # nn.Dropout(data.get("drop", 0.5)), nn.Linear(self.Encoder.model.config.hidden_size, 200), nn.Dropout(data.get("drop", 0.5)), ) # Classifier Layer self.classifier = nn.Sequential( # nn.LayerNorm(self.Encoder.model.config.hidden_size), # nn.Dropout(data.get("drop", 0.5)), # nn.BatchNorm1d(self.Encoder.model.config.hidden_size), # nn.Dropout(data.get("drop", 0.5)), nn.Tanh(), nn.Linear(200, 1) ) # Initializing layer parameters nn.init.normal_(self.feature_map[1].weight, mean=0, std=0.00001) nn.init.zeros_(self.feature_map[1].bias) # Feed forward def forward(self, input, output=None, attention= False, hidden_states=False): # input, output = batch input_ids, attention_mask, token_type_ids = input predict = self.Encoder(output_attentions=attention, output_hidden_states=hidden_states, **{"input_ids":input_ids, "attention_mask":attention_mask, "token_type_ids":token_type_ids }) feature_maped = self.feature_map(predict['pooler_output']) # print(feature_maped) logit = self.classifier(feature_maped) # predict = torch.sigmoid(logit) if not isinstance(output, type(None)): # Loss function loss = self.loss_fn(logit.to(torch.float), output.to(torch.float)) return Output(loss, features(predict, feature_maped), logit) else: return Output(None, features(predict, feature_maped), logit) def fit(self, optimizer, data_train_loader, scheduler = None, data_valid_loader = None, epochs = 4, print_info = 1000000000, metrics = True, log = None, metrics_print = True): return loop.train_loop(self, device = self.dummy_param.device, optimizer=optimizer, scheduler= scheduler, data_train_loader=data_train_loader, data_valid_loader= data_valid_loader, epochs = epochs, print_info = print_info, metrics = metrics, log= log, metrics_print=metrics_print) def evaluate(self, loader, attention= False, hidden_states=False): # global feature all_loss, feature, (logits, outputs) = loop.eval_loop(loader, self, self.dummy_param.device, attention= attention, hidden_states=hidden_states) logits = loop.map_batch(logits) if len(outputs) != 0: outputs = loop.map_batch(outputs) return Output(np.mean(all_loss), feature, (logits, outputs))