import torch.nn.functional as F import torch.nn as nn from torch.utils.data import Dataset, DataLoader import math import torch import numpy as np import pandas as pd import time import transformers from transformers import AutoTokenizer, AutoModelForSequenceClassification from copy import deepcopy, copy from pprint import pprint import shutil import datetime import re import json from pathlib import Path from itertools import chain import numpy as np import pandas as pd import torch import torch.nn as nn from torch.utils.data import Dataset, DataLoader # Fetching pre-trained model and tokenizer class initializer: def __init__(self, MODEL_NAME, **config): self.MODEL_NAME = MODEL_NAME model = config.get("model") tokenizer = config.get("tokenizer") # Model self.model = model.from_pretrained(MODEL_NAME, return_dict=True, output_attentions = False) # Tokenizer self.tokenizer = tokenizer.from_pretrained(MODEL_NAME, do_lower_case = True) config = { "model": AutoModelForSequenceClassification, "tokenizer": AutoTokenizer } # Pre-trained model initializer (uncased sciBERT) initializer_model_scibert = initializer('allenai/scibert_scivocab_uncased', **config) # initializer_model = initializer('bert-base-uncased', **config) LABEL_MAP = {'negative': 0, 'not included':0, '0':0, 0:0, 'excluded':0, 'positive': 1, 'included':1, '1':1, 1:1, } class SLR_DataSet(Dataset): def __init__(self, treat_text =None, etailment_txt =None, LABEL_MAP= None, NA = None, **args): self.tokenizer = args.get('tokenizer') self.data = args.get('data').reset_index() self.max_seq_length = args.get("max_seq_length", 512) self.INPUT_NAME = args.get("input", 'x') self.LABEL_NAME = args.get("output", None) self.treat_text = treat_text self.etailment_txt = etailment_txt self.LABEL_MAP=LABEL_MAP self.NA=NA if not self.INPUT_NAME in self.data.columns: self.data[self.INPUT_NAME] = np.nan # Tokenizing and processing text def encode_text(self, example): comment_text = example[self.INPUT_NAME] if not isinstance(self.treat_text,type(None)): comment_text = self.treat_text(comment_text) if example[self.LABEL_NAME] is np.NaN and self.NA != None: labels = self.NA elif self.LABEL_NAME != None: try: labels = self.LABEL_MAP[example[self.LABEL_NAME]] except: labels = -1 # raise TypeError(f"Label passed {example[self.LABEL_NAME]}, is not be in LABEL_MAP") # print('Not handle LABEL_MAP') else: labels = None if self.etailment_txt: tensor_data = self.tokenize((comment_text, self.etailment_txt), labels ) else: tensor_data = self.tokenize((comment_text), labels) return tensor_data def tokenize(self, comment_text, labels): encoding = self.tokenizer.encode_plus( (comment_text), add_special_tokens=True, max_length=self.max_seq_length, return_token_type_ids=True, padding="max_length", truncation=True, return_attention_mask=True, return_tensors='pt', ) if labels != None: return tuple((( encoding["input_ids"].flatten(), encoding["attention_mask"].flatten(), encoding["token_type_ids"].flatten() ), torch.tensor([torch.tensor(labels).to(int)]) )) else: return tuple((( encoding["input_ids"].flatten(), encoding["attention_mask"].flatten(), encoding["token_type_ids"].flatten() ), torch.empty(0) )) def __len__(self): return len(self.data) # Returning data def __getitem__(self, index: int): # print(index) data_row = self.data.iloc[index] tensor_data = self.encode_text(data_row) return tensor_data from tqdm import tqdm import gc from IPython.display import clear_output from collections import namedtuple features = namedtuple('features', ['bert', 'feature_map']) Output = namedtuple('Output', ['loss', 'features', 'logit']) bert_tuple = namedtuple('bert',['hidden_states', 'attentions']) class loop(): @classmethod def train_loop(self, model,device, optimizer, data_train_loader, scheduler = None, data_valid_loader = None, epochs = 4, print_info = 1000000000, metrics = True, log = None, metrics_print = True): # Start the model's parameters table.reset() model.to(device) model.train() # Task epochs (Inner epochs) for epoch in range(0, epochs): train_loss, _, out = self.batch_loop(data_train_loader, model, optimizer, device) if scheduler is not None: for sched in scheduler: sched.step() if (epoch % print_info == 0): if metrics: labels = self.map_batch(out[1]).to(int).squeeze() logits = self.map_batch(out[0]).squeeze() train_metrics, _ = plot(logits, labels, 0.9) del labels, logits train_metrics['Loss'] = torch.Tensor(train_loss).mean().item() if not isinstance(log,type(None)): log({"train_"+ x :y for x,y in train_metrics.items()}) table(train_metrics, epoch, "Train") else: print("Loss: ", torch.Tensor(train_loss).mean().item()) if data_valid_loader: valid_loss, _, out = self.eval_loop(data_valid_loader, model, device=device) if metrics: global out2 out2 = out labels = self.map_batch(out[1]).to(int).squeeze() logits = self.map_batch(out[0]).squeeze() valid_metrics, _ = plot(logits, labels, 0.9) valid_metrics['Loss'] = torch.Tensor(valid_loss).mean().item() del labels, logits if not isinstance(log,type(None)): log({"valid_"+ x :y for x,y in train_metrics.items()}) table(valid_metrics, epoch, "Valid") if metrics_print: print(table.data_frame().round(4)) else: print("Valid Loss: ", torch.Tensor(valid_loss).mean().item()) return table.data_frame() @classmethod def batch_loop(self, loader, model, optimizer, device): all_loss = [] features_lst = [] attention_lst = [] logits = [] outputs = [] # Test's Batch loop for inner_step, batch in enumerate(tqdm(loader, desc="Train validation | ", ncols=80)) : input, output =batch input = tuple(t.to(device) for t in input) if isinstance(output, torch.Tensor): output = output.to(device) optimizer.zero_grad() # Predictions loss, feature, logit = model(input, output) # compute grads loss.backward() # update parameters optimizer.step() input = tuple(t.to("cpu") for t in input) if isinstance(output, torch.Tensor): output = output.to("cpu") if isinstance(loss, torch.Tensor): all_loss.append(loss.to('cpu').detach().clone()) if isinstance(logit, torch.Tensor): logits.append(logit.to('cpu').detach().clone()) if isinstance(output, torch.Tensor): outputs.append(output.to('cpu').detach().clone()) if len(feature.feature_map)!=0: features_lst.append([x.to('cpu').detach().clone() for x in feature.feature_map]) del batch, input, output, loss, feature, logit # model.to('cpu') gc.collect() torch.cuda.empty_cache() # del model, optimizer return Output(all_loss, features(None,features_lst), (logits, outputs)) @classmethod def eval_loop(self, loader, model, device, attention= False, hidden_states=False): all_loss = [] features_lst = [] attention_lst = [] hidden_states_lst = [] logits = [] outputs = [] model.eval() with torch.no_grad(): # Test's Batch loop for inner_step, batch in enumerate(tqdm(loader, desc="Test validation | ", ncols=80)) : input, output =batch input = tuple(t.to(device) for t in input) if output.numel()!=0: # Predictions loss, feature, logit = model(input, output.to(device), attention= attention, hidden_states=hidden_states) else: # Predictions loss, feature, logit = model(input, attention= attention, hidden_states=hidden_states) input = tuple(t.to("cpu") for t in input) if isinstance(output, torch.Tensor): output = output.to("cpu") if isinstance(loss, torch.Tensor): all_loss.append(loss.to('cpu').detach().clone()) if isinstance(logit, torch.Tensor): logits.append(logit.to('cpu').detach().clone()) try: if not isinstance(feature.bert.attentions, type(None)): attention_lst.append([x.to('cpu').detach().clone() for x in feature.bert.attentions]) except: attention_lst = None try: if not isinstance(feature.bert.hidden_states, type(None)): hidden_states_lst.append([x.to('cpu').detach().clone() for x in feature.bert.hidden_states]) except: hidden_states_lst = None if isinstance(output, torch.Tensor): outputs.append(output.to('cpu').detach().clone()) if len(feature.feature_map)!=0: features_lst.append([x.to('cpu').detach().clone() for x in feature.feature_map]) del batch, input, output, loss, feature, logit # model.to('cpu') gc.collect() torch.cuda.empty_cache() # del model, optimizer return Output(all_loss, features(bert_tuple(hidden_states_lst,attention_lst),features_lst), (logits, outputs)) # Process predictions and map the feature_map in tsne @staticmethod def map_batch(features): features = torch.cat(features, dim =0) # features = np.concatenate(np.array(features,dtype=object)).astype(np.float32) # features = torch.tensor(features) return features.detach().clone() class table: data = [] index = [] @torch.no_grad() def __init__(self, data, epochs, name): self.index.append((epochs, name)) self.data.append(data) @classmethod @torch.no_grad() def data_frame(cls): clear_output() index = pd.MultiIndex.from_tuples(cls.index, names=["Epochs", "Data"]) data = pd.DataFrame(cls.data, index=index) return data @classmethod @torch.no_grad() def reset(cls): cls.data = [] cls.index = [] from collections import namedtuple # Declaring namedtuple() # Pre-trained model class Encoder(nn.Module): def __init__(self, layers, freeze_bert, model): super(Encoder, self).__init__() # Dummy Parameter self.dummy_param = nn.Parameter(torch.empty(0)) # Pre-trained model self.model = deepcopy(model) # Freezing bert parameters if freeze_bert: for param in self.model.parameters(): param.requires_grad = freeze_bert # Selecting hidden layers of the pre-trained model old_model_encoder = self.model.encoder.layer new_model_encoder = nn.ModuleList() for i in layers: new_model_encoder.append(old_model_encoder[i]) self.model.encoder.layer = new_model_encoder # Feed forward def forward(self, output_attentions=False,output_hidden_states=False, **x): return self.model(output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=True, **x) # Complete model class SLR_Classifier(nn.Module): def __init__(self, **data): super(SLR_Classifier, self).__init__() # Dummy Parameter self.dummy_param = nn.Parameter(torch.empty(0)) # Loss function # Binary Cross Entropy with logits reduced to mean self.loss_fn = nn.BCEWithLogitsLoss(reduction = 'mean', pos_weight=torch.FloatTensor([data.get("pos_weight", 2.5)])) # Pre-trained model self.Encoder = Encoder(layers = data.get("bert_layers", range(12)), freeze_bert = data.get("freeze_bert", False), model = data.get("model"), ) # Feature Map Layer self.feature_map = nn.Sequential( # nn.LayerNorm(self.Encoder.model.config.hidden_size), nn.BatchNorm1d(self.Encoder.model.config.hidden_size), # nn.Dropout(data.get("drop", 0.5)), nn.Linear(self.Encoder.model.config.hidden_size, 200), nn.Dropout(data.get("drop", 0.5)), ) # Classifier Layer self.classifier = nn.Sequential( # nn.LayerNorm(self.Encoder.model.config.hidden_size), # nn.Dropout(data.get("drop", 0.5)), # nn.BatchNorm1d(self.Encoder.model.config.hidden_size), # nn.Dropout(data.get("drop", 0.5)), nn.Tanh(), nn.Linear(200, 1) ) # Initializing layer parameters nn.init.normal_(self.feature_map[1].weight, mean=0, std=0.00001) nn.init.zeros_(self.feature_map[1].bias) # Feed forward def forward(self, input, output=None, attention= False, hidden_states=False): # input, output = batch input_ids, attention_mask, token_type_ids = input predict = self.Encoder(output_attentions=attention, output_hidden_states=hidden_states, **{"input_ids":input_ids, "attention_mask":attention_mask, "token_type_ids":token_type_ids }) feature_maped = self.feature_map(predict['pooler_output']) # print(feature_maped) logit = self.classifier(feature_maped) # predict = torch.sigmoid(logit) if not isinstance(output, type(None)): # Loss function loss = self.loss_fn(logit.to(torch.float), output.to(torch.float)) return Output(loss, features(predict, feature_maped), logit) else: return Output(None, features(predict, feature_maped), logit) def fit(self, optimizer, data_train_loader, scheduler = None, data_valid_loader = None, epochs = 4, print_info = 1000000000, metrics = True, log = None, metrics_print = True): return loop.train_loop(self, device = self.dummy_param.device, optimizer=optimizer, scheduler= scheduler, data_train_loader=data_train_loader, data_valid_loader= data_valid_loader, epochs = epochs, print_info = print_info, metrics = metrics, log= log, metrics_print=metrics_print) def evaluate(self, loader, attention= False, hidden_states=False): # global feature all_loss, feature, (logits, outputs) = loop.eval_loop(loader, self, self.dummy_param.device, attention= attention, hidden_states=hidden_states) logits = loop.map_batch(logits) if len(outputs) != 0: outputs = loop.map_batch(outputs) return Output(np.mean(all_loss), feature, (logits, outputs))