Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python | |
| # coding: utf-8 | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from torch.utils.data import DataLoader, random_split | |
| import pandas as pd | |
| from tqdm import tqdm | |
| import time | |
| from .Utilities import LanguageDataset | |
| class Seq2Seq(): | |
| """ | |
| Base class for Seq2Seq (text-generation models). This class will be inherited by wrappers of transformers like GPT2 | |
| and T5. | |
| Attributes: | |
| Methods: | |
| """ | |
| def __init__(self, gpu=0, max_length=0, model_path=None): | |
| # Load Seq2Seq to device based on available hardware | |
| if torch.cuda.is_available(): | |
| self.device = torch.device('cuda') | |
| else: | |
| try: | |
| self.device = torch.device('mps') # Apple Silicon | |
| except Exception: | |
| self.device = torch.device('cpu') | |
| # GPU that model will run on | |
| self.gpu = gpu | |
| # Model specs | |
| if model_path: self.model = torch.load(model_path).to(self.device) | |
| else: self.model = None | |
| self.model_name = "" | |
| self.tokenizer = None | |
| self.max_length = max_length | |
| # Training specs | |
| self.train_loader = None | |
| self.valid_loader = None | |
| self.results = pd.DataFrame(columns=['epoch', 'model_arch', 'batch_size', 'gpu', 'training_loss', 'validation_loss', 'epoch_duration_sec']) | |
| def load_data(self, df, batch_size, train_ratio=0.8): | |
| self.batch_size = batch_size | |
| dataset = LanguageDataset(df, self.tokenizer) | |
| train_size = int(0.8*len(dataset)) | |
| valid_size = len(dataset) - train_size | |
| train_data, valid_data = random_split(dataset, [train_size, valid_size]) | |
| self.max_length = dataset.max_length | |
| self.train_loader = DataLoader(train_data, batch_size=self.batch_size, shuffle=True) | |
| self.valid_loader = DataLoader(valid_data, batch_size=self.batch_size) | |
| """ Return training results """ | |
| def summary(self): | |
| return self.results | |
| """ Save model to path """ | |
| def to_pt(self, path): | |
| torch.save(self.model, path) | |
| class GPT2(Seq2Seq): | |
| """ | |
| This is the GPT2 implementation of Seq2Seq. | |
| """ | |
| def __init__(self, gpu, model_name, batch_size=16): | |
| super().__init__(gpu, max_length=0) | |
| from transformers import GPT2Tokenizer, GPT2LMHeadModel | |
| self.model_name = model_name | |
| self.model = GPT2LMHeadModel.from_pretrained(self.model_name).to(self.device) | |
| self.tokenizer = GPT2Tokenizer.from_pretrained(self.model_name) | |
| self.tokenizer.pad_token = self.tokenizer.eos_token | |
| def train(self, num_epochs=3, train_ratio=0.8): | |
| criterion = nn.CrossEntropyLoss(ignore_index=self.tokenizer.pad_token_id) | |
| optimizer = optim.Adam(self.model.parameters(), lr=5e-4) | |
| # Init a results dataframe | |
| results = pd.DataFrame(columns=['epoch', 'transformer', 'batch_size', 'gpu', | |
| 'training_loss', 'validation_loss', 'epoch_duration_sec']) | |
| # The training loop | |
| for epoch in range(num_epochs): | |
| start_time = time.time() # Start the timer for the epoch | |
| # Training | |
| ## This line tells the self.model we're in 'learning mode' | |
| self.model.train() | |
| epoch_training_loss = 0 | |
| train_iterator = tqdm(self.train_loader, | |
| desc=f"Training Epoch {epoch + 1}/{num_epochs} Batch Size: {self.batch_size}, Transformer: {self.model_name}") | |
| for batch in train_iterator: | |
| optimizer.zero_grad() | |
| inputs = batch['input_ids'].squeeze(1).to(self.device) | |
| targets = inputs.clone() | |
| outputs = self.model(input_ids=inputs, labels=targets) | |
| loss = outputs.loss | |
| loss.backward() | |
| optimizer.step() | |
| train_iterator.set_postfix({'Training Loss': loss.item()}) | |
| epoch_training_loss += loss.item() | |
| avg_epoch_training_loss = epoch_training_loss / len(train_iterator) | |
| # Validation | |
| ## This line below tells the self.model to 'stop learning' | |
| self.model.eval() | |
| epoch_validation_loss = 0 | |
| total_loss = 0 | |
| valid_iterator = tqdm(self.valid_loader, desc=f"Validation Epoch {epoch + 1}/{num_epochs}") | |
| with torch.no_grad(): | |
| for batch in valid_iterator: | |
| inputs = batch['input_ids'].squeeze(1).to(self.device) | |
| targets = inputs.clone() | |
| outputs = self.model(input_ids=inputs, labels=targets) | |
| loss = outputs.loss | |
| total_loss += loss | |
| valid_iterator.set_postfix({'Validation Loss': loss.item()}) | |
| epoch_validation_loss += loss.item() | |
| avg_epoch_validation_loss = epoch_validation_loss / len(self.valid_loader) | |
| end_time = time.time() # End the timer for the epoch | |
| epoch_duration_sec = end_time - start_time # Calculate the duration in seconds | |
| new_row = {'transformer': self.model_name, | |
| 'batch_size': self.batch_size, | |
| 'gpu': self.gpu, | |
| 'epoch': epoch + 1, | |
| 'training_loss': avg_epoch_training_loss, | |
| 'validation_loss': avg_epoch_validation_loss, | |
| 'epoch_duration_sec': epoch_duration_sec} # Add epoch_duration to the dataframe | |
| self.results.loc[len(self.results)] = new_row | |
| print(f"Epoch: {epoch + 1}, Validation Loss: {total_loss / len(self.valid_loader)}") | |
| def generate_text(self, input_str, top_k=16, top_p=0.95, temperature=1.0, repetition_penalty=1.2): | |
| # Encode string to tokens | |
| input_ids= self.tokenizer.encode(input_str, return_tensors='pt').to(self.device) | |
| # Feed tokens to model and get outcome tokens | |
| output = self.model.generate( | |
| input_ids, | |
| max_length=self.max_length, | |
| num_return_sequences=1, | |
| do_sample=True, | |
| top_k=top_k, | |
| top_p=top_p, | |
| temperature=temperature, | |
| repetition_penalty=repetition_penalty | |
| ) | |
| # Decode tokens to string | |
| decoded_output = self.tokenizer.decode(output[0], skip_special_tokens=True) | |
| return decoded_output | |
| class FlanT5(Seq2Seq): | |
| """ | |
| This is the T5 implementation of Seq2Seq - it is designed to support T5 models of various sizes. | |
| """ | |
| def __init__(self, gpu, model_name, batch_size=16): | |
| super().__init__(gpu, max_length=0) | |
| from transformers import T5ForConditionalGeneration, T5Tokenizer | |
| self.model_name = model_name | |
| self.model = T5ForConditionalGeneration.from_pretrained(self.model_name).to(self.device) | |
| self.tokenizer = T5Tokenizer.from_pretrained(self.model_name) | |
| self.tokenizer.pad_token = self.tokenizer.eos_token | |
| def train(self, num_epochs=3, train_ratio=0.8): | |
| criterion = nn.CrossEntropyLoss(ignore_index=self.tokenizer.pad_token_id) | |
| optimizer = optim.Adam(self.model.parameters(), lr=5e-4) | |
| # Init a results dataframe | |
| self.results = pd.DataFrame(columns=['epoch', 'transformer', 'batch_size', 'gpu', | |
| 'training_loss', 'validation_loss', 'epoch_duration_sec']) | |
| # The training loop | |
| for epoch in range(num_epochs): | |
| start_time = time.time() # Start the timer for the epoch | |
| # Training | |
| ## This line tells the model we're in 'learning mode' | |
| self.model.train() | |
| epoch_training_loss = 0 | |
| train_iterator = tqdm(self.train_loader, | |
| desc=f"Training Epoch {epoch + 1}/{num_epochs} Batch Size: {self.batch_size}, Transformer: {self.model_name}") | |
| for batch in train_iterator: | |
| optimizer.zero_grad() | |
| inputs = batch['input_ids'].squeeze(1).to(self.device) | |
| targets = batch['labels'].squeeze(1).to(self.device) | |
| outputs = self.model(input_ids=inputs, labels=targets) | |
| loss = outputs.loss | |
| loss.backward() | |
| optimizer.step() | |
| train_iterator.set_postfix({'Training Loss': loss.item()}) | |
| epoch_training_loss += loss.item() | |
| avg_epoch_training_loss = epoch_training_loss / len(train_iterator) | |
| # Validation | |
| ## This line below tells the model to 'stop learning' | |
| self.model.eval() | |
| epoch_validation_loss = 0 | |
| total_loss = 0 | |
| valid_iterator = tqdm(self.valid_loader, desc=f"Validation Epoch {epoch + 1}/{num_epochs}") | |
| with torch.no_grad(): | |
| for batch in valid_iterator: | |
| inputs = batch['input_ids'].squeeze(1).to(self.device) | |
| targets = batch['labels'].squeeze(1).to(self.device) | |
| outputs = self.model(input_ids=inputs, labels=targets) | |
| loss = outputs.loss | |
| total_loss += loss | |
| valid_iterator.set_postfix({'Validation Loss': loss.item()}) | |
| epoch_validation_loss += loss.item() | |
| avg_epoch_validation_loss = epoch_validation_loss / len(self.valid_loader) | |
| end_time = time.time() # End the timer for the epoch | |
| epoch_duration_sec = end_time - start_time # Calculate the duration in seconds | |
| new_row = {'transformer': self.model_name, | |
| 'batch_size': self.batch_size, | |
| 'gpu': self.gpu, | |
| 'epoch': epoch + 1, | |
| 'training_loss': avg_epoch_training_loss, | |
| 'validation_loss': avg_epoch_validation_loss, | |
| 'epoch_duration_sec': epoch_duration_sec} # Add epoch_duration to the dataframe | |
| self.results.loc[len(self.results)] = new_row | |
| print(f"Epoch: {epoch + 1}, Validation Loss: {total_loss / len(self.valid_loader)}") | |
| def generate_text(self, input_str, top_k=16, top_p=0.95, temperature=1.0, repetition_penalty=1.2): | |
| # Encode input string into tensors via the FlanT5 tokenizer | |
| input_ids = self.tokenizer.encode(input_str, return_tensors='pt', max_length=self.max_length, truncation=True).to(self.device) | |
| # Run tensors through model to get output tensor values | |
| output_ids = self.model.generate(input_ids, | |
| max_length=self.max_length, | |
| do_sample=True, | |
| top_k=top_k, | |
| top_p=top_p, | |
| temperature=temperature, | |
| repetition_penalty=repetition_penalty) | |
| # Decode output tensors to text vi | |
| output_str = self.tokenizer.decode(output_ids[0], skip_special_tokens=True) | |
| return output_str |