import time import datetime import torch import numpy as np import tqdm import random from torch import nn from transformers import RobertaTokenizer, RobertaModel, AdamW, RobertaConfig from sklearn.model_selection import train_test_split from transformers import BertTokenizer, BertForSequenceClassification, get_linear_schedule_with_warmup from torch.utils.data import TensorDataset, random_split, DataLoader, RandomSampler, SequentialSampler class BERTClassifier(): def __init__(self, model_name="bert-base-uncased", tokenizer_name="bert-base-uncased") -> None: print(f'Loading BERT tokenizer:{model_name}...') self.model_name = model_name self.tokenizer = BertTokenizer.from_pretrained(tokenizer_name, do_lower_case=True) if model_name.endswith('.model'): self.model = torch.load(model_name) torch.save(self.model.cpu(), model_name) else: self.model = BertForSequenceClassification.from_pretrained( self.model_name, num_labels=14, output_attentions=False, output_hidden_states=False ) self.device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu') self.model.to(self.device) def tokenizeText(self, sentence: str): # return self.tokenizer.encode(sentence, add_special_tokens=True) encoded_dict = self.tokenizer.encode_plus( sentence, add_special_tokens=True, max_length=64, pad_to_max_length=True, return_attention_mask=True, return_tensors='pt') return encoded_dict['input_ids'], encoded_dict['attention_mask'] def tokenizeSentences(self, sentences: list, labels: list): input_ids = [] attention_masks = [] for sent in sentences: input_id, attention_mask = self.tokenizeText(sent) input_ids.append(input_id) attention_masks.append(attention_mask) input_ids = torch.cat(input_ids, dim=0) attention_masks = torch.cat(attention_masks, dim=0) dataset = TensorDataset(input_ids, attention_masks, labels) train_size = int(0.9 * len(dataset)) val_size = len(dataset) - train_size return random_split(dataset, [train_size, val_size]) def flat_accuracy(self, preds, labels): pred_flat = np.argmax(preds, axis=1).flatten() labels_flat = labels.flatten() return np.sum(pred_flat == labels_flat) / len(labels_flat) def format_time(self, elapsed): # Round to the nearest second. elapsed_rounded = int(round((elapsed))) # Format as hh:mm:ss return str(datetime.timedelta(seconds=elapsed_rounded)) def trainModel(self, sentences: list, labels: list, epochs=4, batch_size=32): optimizer = AdamW(self.model.parameters(), lr=2e-5, eps=1e-8) train_dataset, val_dataset = self.tokenizeSentences(sentences, labels) train_dataloader = DataLoader( train_dataset, sampler=RandomSampler(train_dataset), batch_size=batch_size ) validation_dataloader = DataLoader( val_dataset, sampler=SequentialSampler(val_dataset), batch_size=batch_size ) total_steps = len(train_dataloader) * epochs # Create the learning rate scheduler. scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, # Default value in run_glue.py num_training_steps=total_steps) self.train(train_dataloader, optimizer, scheduler, epochs) torch.save(self.model, f"Bert_GoEmotions_BS{batch_size}_E{epochs}.model") def train(self, train_dataloader, optimizer, scheduler, epochs): # This training code is based on the `run_glue.py` script here: # https://github.com/huggingface/transformers/blob/5bfcd0485ece086ebcbed2d008813037968a9e58/examples/run_glue.py#L128 # Measure the total training time for the whole run. total_t0 = time.time() # For each epoch... for epoch_i in range(epochs): print('======== Epoch {:} / {:} ========'.format(epoch_i + 1, epochs)) print('Training...') # Measure how long the training epoch takes. t0 = time.time() # Reset the total loss for this epoch. total_train_loss = 0 # Put the model into training mode. Don't be mislead--the call to # `train` just changes the *mode*, it doesn't *perform* the training. # `dropout` and `batchnorm` layers behave differently during training # vs. test (source: https://stackoverflow.com/questions/51433378/what-does-model-train-do-in-pytorch) self.model.train() # For each batch of training data... for step, batch in enumerate(train_dataloader): # Progress update every 40 batches. if step % 40 == 0 and step != 0: # Calculate elapsed time in minutes. elapsed = self.format_time(time.time() - t0) # Report progress. print(' Batch {:>5,} of {:>5,}. Elapsed: {:}.'.format(step, len(train_dataloader), elapsed)) # Unpack this training batch from our dataloader. # # As we unpack the batch, we'll also copy each tensor to the GPU using the # `to` method. # # `batch` contains three pytorch tensors: # [0]: input ids # [1]: attention masks # [2]: labels b_input_ids = batch[0].to(self.device) b_input_mask = batch[1].to(self.device) b_labels = batch[2].to(self.device) # Always clear any previously calculated gradients before performing a # backward pass. PyTorch doesn't do this automatically because # accumulating the gradients is "convenient while training RNNs". # (source: https://stackoverflow.com/questions/48001598/why-do-we-need-to-call-zero-grad-in-pytorch) self.model.zero_grad() # Perform a forward pass (evaluate the model on this training batch). # The documentation for this `model` function is here: # https://huggingface.co/transformers/v2.2.0/model_doc/bert.html#transformers.BertForSequenceClassification # It returns different numbers of parameters depending on what arguments # arge given and what flags are set. For our useage here, it returns # the loss (because we provided labels) and the "logits"--the model # outputs prior to activation. output = self.model(b_input_ids, token_type_ids=None, attention_mask=b_input_mask, labels=b_labels) loss = output.loss logits = output.logits # Accumulate the training loss over all of the batches so that we can # calculate the average loss at the end. `loss` is a Tensor containing a # single value; the `.item()` function just returns the Python value # from the tensor. total_train_loss += loss.item() # Perform a backward pass to calculate the gradients. loss.backward() # Clip the norm of the gradients to 1.0. # This is to help prevent the "exploding gradients" problem. torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0) # Update parameters and take a step using the computed gradient. # The optimizer dictates the "update rule"--how the parameters are # modified based on their gradients, the learning rate, etc. optimizer.step() # Update the learning rate. scheduler.step() # Calculate the average loss over all of the batches. avg_train_loss = total_train_loss / len(train_dataloader) # Measure how long this epoch took. training_time = self.format_time(time.time() - t0) print("") print(" Average training loss: {0:.2f}".format(avg_train_loss)) print(" Training epoch took: {:}".format(training_time)) print("") print("Training complete!") print("Total training took {:} (h:mm:ss)".format(self.format_time(time.time()-total_t0))) def evaluate(self, sentences:list): input_ids = [] attention_masks = [] for sent in sentences: input_id, attention_mask = self.tokenizeText(sent) input_ids.append(input_id) attention_masks.append(attention_mask) input_ids = torch.cat(input_ids, dim=0) attention_masks = torch.cat(attention_masks, dim=0) labels = torch.zeros(len(sentences)) batch_size = 32 prediction_data = TensorDataset(input_ids, attention_masks, labels) prediction_sampler = SequentialSampler(prediction_data) prediction_dataloader = DataLoader(prediction_data, sampler=prediction_sampler, batch_size=batch_size) self.model.eval() predictions = [] for batch in prediction_dataloader: batch = tuple(t.to(self.device) for t in batch) b_input_ids, b_input_mask, _ = batch with torch.no_grad(): outputs = self.model(b_input_ids, token_type_ids=None, attention_mask=b_input_mask) logits = outputs[0] logits = logits.detach().cpu().numpy() predictions.append(logits) # print(predictions) return [predictions[0][i].argmax() for i, x in enumerate(sentences)]