Gagan Bhatia
Update model.py
7123542
import shutil
from getpass import getpass
from pathlib import Path
import torch
import pandas as pd
from huggingface_hub import HfApi, Repository
from transformers import (
AdamW,
T5ForConditionalGeneration,
T5TokenizerFast as T5Tokenizer,
MT5Tokenizer,
MT5ForConditionalGeneration,
ByT5Tokenizer,
)
from torch.utils.data import Dataset, DataLoader
import pytorch_lightning as pl
from pytorch_lightning.loggers import MLFlowLogger, WandbLogger
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks.early_stopping import EarlyStopping
from pytorch_lightning import LightningDataModule
from pytorch_lightning import LightningModule
from datasets import load_metric
from tqdm.auto import tqdm
# from dagshub.pytorch_lightning import DAGsHubLogger
torch.cuda.empty_cache()
pl.seed_everything(42)
class DataModule(Dataset):
"""
Data Module for pytorch
"""
def __init__(
self,
data: pd.DataFrame,
tokenizer: T5Tokenizer,
source_max_token_len: int = 512,
target_max_token_len: int = 512,
):
"""
:param data:
:param tokenizer:
:param source_max_token_len:
:param target_max_token_len:
"""
self.data = data
self.target_max_token_len = target_max_token_len
self.source_max_token_len = source_max_token_len
self.tokenizer = tokenizer
def __len__(self):
return len(self.data)
def __getitem__(self, index: int):
data_row = self.data.iloc[index]
input_encoding = self.tokenizer(
data_row["input_text"],
max_length=self.source_max_token_len,
padding="max_length",
truncation=True,
return_attention_mask=True,
add_special_tokens=True,
return_tensors="pt",
)
output_encoding = self.tokenizer(
data_row["output_text"],
max_length=self.target_max_token_len,
padding="max_length",
truncation=True,
return_attention_mask=True,
add_special_tokens=True,
return_tensors="pt",
)
labels = output_encoding["input_ids"]
labels[labels == 0] = -100
return dict(
keywords=data_row["input_text"],
text=data_row["output_text"],
keywords_input_ids=input_encoding["input_ids"].flatten(),
keywords_attention_mask=input_encoding["attention_mask"].flatten(),
labels=labels.flatten(),
labels_attention_mask=output_encoding["attention_mask"].flatten(),
)
class PLDataModule(LightningDataModule):
def __init__(
self,
train_df: pd.DataFrame,
test_df: pd.DataFrame,
tokenizer: T5Tokenizer,
source_max_token_len: int = 512,
target_max_token_len: int = 512,
batch_size: int = 4,
split: float = 0.1,
num_workers: int = 2,
):
"""
:param data_df:
:param tokenizer:
:param source_max_token_len:
:param target_max_token_len:
:param batch_size:
:param split:
"""
super().__init__()
self.train_df = train_df
self.test_df = test_df
self.split = split
self.batch_size = batch_size
self.target_max_token_len = target_max_token_len
self.source_max_token_len = source_max_token_len
self.tokenizer = tokenizer
self.num_workers = num_workers
def setup(self, stage=None):
self.train_dataset = DataModule(
self.train_df,
self.tokenizer,
self.source_max_token_len,
self.target_max_token_len,
)
self.test_dataset = DataModule(
self.test_df,
self.tokenizer,
self.source_max_token_len,
self.target_max_token_len,
)
def train_dataloader(self):
"""training dataloader"""
return DataLoader(
self.train_dataset,
batch_size=self.batch_size,
shuffle=True,
num_workers=self.num_workers,
)
def test_dataloader(self):
"""test dataloader"""
return DataLoader(
self.test_dataset,
batch_size=self.batch_size,
shuffle=False,
num_workers=self.num_workers,
)
def val_dataloader(self):
"""validation dataloader"""
return DataLoader(
self.test_dataset,
batch_size=self.batch_size,
shuffle=False,
num_workers=self.num_workers,
)
class LightningModel(LightningModule):
"""PyTorch Lightning Model class"""
def __init__(
self,
tokenizer,
model,
learning_rate,
adam_epsilon,
weight_decay,
output: str = "outputs",
):
"""
initiates a PyTorch Lightning Model
Args:
tokenizer : T5 tokenizer
model : T5 model
output (str, optional): output directory to save model checkpoints. Defaults to "outputs".
"""
super().__init__()
self.model = model
self.tokenizer = tokenizer
self.output = output
self.learning_rate = learning_rate
self.adam_epsilon = adam_epsilon
self.weight_decay = weight_decay
def forward(self, input_ids, attention_mask, decoder_attention_mask, labels=None):
"""forward step"""
output = self.model(
input_ids,
attention_mask=attention_mask,
labels=labels,
decoder_attention_mask=decoder_attention_mask,
)
return output.loss, output.logits
def training_step(self, batch, batch_size):
"""training step"""
input_ids = batch["keywords_input_ids"]
attention_mask = batch["keywords_attention_mask"]
labels = batch["labels"]
labels_attention_mask = batch["labels_attention_mask"]
loss, outputs = self(
input_ids=input_ids,
attention_mask=attention_mask,
decoder_attention_mask=labels_attention_mask,
labels=labels,
)
self.log("train_loss", loss, prog_bar=True, logger=True)
return loss
def validation_step(self, batch, batch_size):
"""validation step"""
input_ids = batch["keywords_input_ids"]
attention_mask = batch["keywords_attention_mask"]
labels = batch["labels"]
labels_attention_mask = batch["labels_attention_mask"]
loss, outputs = self(
input_ids=input_ids,
attention_mask=attention_mask,
decoder_attention_mask=labels_attention_mask,
labels=labels,
)
self.log("val_loss", loss, prog_bar=True, logger=True)
return loss
def test_step(self, batch, batch_size):
"""test step"""
input_ids = batch["keywords_input_ids"]
attention_mask = batch["keywords_attention_mask"]
labels = batch["labels"]
labels_attention_mask = batch["labels_attention_mask"]
loss, outputs = self(
input_ids=input_ids,
attention_mask=attention_mask,
decoder_attention_mask=labels_attention_mask,
labels=labels,
)
self.log("test_loss", loss, prog_bar=True, logger=True)
return loss
def configure_optimizers(self):
"""configure optimizers"""
model = self.model
no_decay = ["bias", "LayerNorm.weight"]
optimizer_grouped_parameters = [
{
"params": [
p
for n, p in model.named_parameters()
if not any(nd in n for nd in no_decay)
],
"weight_decay": self.weight_decay,
},
{
"params": [
p
for n, p in model.named_parameters()
if any(nd in n for nd in no_decay)
],
"weight_decay": 0.0,
},
]
optimizer = AdamW(
optimizer_grouped_parameters, lr=self.learning_rate, eps=self.adam_epsilon
)
self.opt = optimizer
return [optimizer]
class Summarization:
"""Custom Summarization class"""
def __init__(self) -> None:
"""initiates Summarization class"""
pass
def from_pretrained(self, model_type="t5", model_name="t5-base") -> None:
"""
loads T5/MT5 Model model for training/finetuning
Args:
model_name (str, optional): exact model architecture name, "t5-base" or "t5-large". Defaults to "t5-base".
:param model_type:
"""
if model_type == "t5":
self.tokenizer = T5Tokenizer.from_pretrained(f"{model_name}")
self.model = T5ForConditionalGeneration.from_pretrained(
f"{model_name}", return_dict=True
)
elif model_type == "mt5":
self.tokenizer = MT5Tokenizer.from_pretrained(f"{model_name}")
self.model = MT5ForConditionalGeneration.from_pretrained(
f"{model_name}", return_dict=True
)
elif model_type == "byt5":
self.tokenizer = ByT5Tokenizer.from_pretrained(f"{model_name}")
self.model = T5ForConditionalGeneration.from_pretrained(
f"{model_name}", return_dict=True
)
def train(
self,
train_df: pd.DataFrame,
eval_df: pd.DataFrame,
source_max_token_len: int = 512,
target_max_token_len: int = 512,
batch_size: int = 8,
max_epochs: int = 5,
use_gpu: bool = True,
outputdir: str = "models",
early_stopping_patience_epochs: int = 0, # 0 to disable early stopping feature
learning_rate: float = 0.0001,
adam_epsilon: float = 0.01,
num_workers: int = 2,
weight_decay: float = 0.0001,
):
"""
trains T5/MT5 model on custom dataset
Args:
train_df (pd.DataFrame): training datarame. Dataframe must have 2 column --> "input_text" and "output_text"
eval_df ([type], optional): validation datarame. Dataframe must have 2 column --> "input_text" and
"output_text"
source_max_token_len (int, optional): max token length of source text. Defaults to 512.
target_max_token_len (int, optional): max token length of target text. Defaults to 512.
batch_size (int, optional): batch size. Defaults to 8.
max_epochs (int, optional): max number of epochs. Defaults to 5.
use_gpu (bool, optional): if True, model uses gpu for training. Defaults to True.
outputdir (str, optional): output directory to save model checkpoints. Defaults to "outputs".
early_stopping_patience_epochs (int, optional): monitors val_loss on epoch end and stops training,
if val_loss does not improve after the specied number of epochs. set 0 to disable early stopping.
Defaults to 0 (disabled)
:param learning_rate:
:param adam_epsilon:
"""
self.target_max_token_len = target_max_token_len
self.data_module = PLDataModule(
train_df,
eval_df,
self.tokenizer,
batch_size=batch_size,
source_max_token_len=source_max_token_len,
target_max_token_len=target_max_token_len,
num_workers=num_workers,
)
self.T5Model = LightningModel(
tokenizer=self.tokenizer,
model=self.model,
output=outputdir,
learning_rate=learning_rate,
adam_epsilon=adam_epsilon,
weight_decay=weight_decay,
)
MLlogger = MLFlowLogger(
experiment_name="Summarization",
tracking_uri="https://dagshub.com/gagan3012/summarization.mlflow",
)
WandLogger = WandbLogger(project="summarization-dagshub")
# logger = DAGsHubLogger(metrics_path='reports/training_metrics.txt')
early_stop_callback = (
[
EarlyStopping(
monitor="val_loss",
min_delta=0.00,
patience=early_stopping_patience_epochs,
verbose=True,
mode="min",
)
]
if early_stopping_patience_epochs > 0
else None
)
gpus = -1 if use_gpu and torch.cuda.is_available() else 0
trainer = Trainer(
logger=[WandLogger, MLlogger],
callbacks=early_stop_callback,
max_epochs=max_epochs,
gpus=gpus,
progress_bar_refresh_rate=5,
)
trainer.fit(self.T5Model, self.data_module)
def load_model(
self, model_type: str = "t5", model_dir: str = "models", use_gpu: bool = False
):
"""
loads a checkpoint for inferencing/prediction
Args:
model_type (str, optional): "t5" or "mt5". Defaults to "t5".
model_dir (str, optional): path to model directory. Defaults to "outputs".
use_gpu (bool, optional): if True, model uses gpu for inferencing/prediction. Defaults to True.
"""
if model_type == "t5":
self.tokenizer = T5Tokenizer.from_pretrained(f"{model_dir}")
self.model = T5ForConditionalGeneration.from_pretrained(
f"{model_dir}", return_dict=True
)
elif model_type == "mt5":
self.tokenizer = MT5Tokenizer.from_pretrained(f"{model_dir}")
self.model = MT5ForConditionalGeneration.from_pretrained(
f"{model_dir}", return_dict=True
)
elif model_type == "byt5":
self.tokenizer = ByT5Tokenizer.from_pretrained(f"{model_dir}")
self.model = T5ForConditionalGeneration.from_pretrained(
f"{model_dir}", return_dict=True
)
if use_gpu:
if torch.cuda.is_available():
self.device = torch.device("cuda")
else:
raise Exception(
"exception ---> no gpu found. set use_gpu=False, to use CPU"
)
else:
self.device = torch.device("cpu")
self.model = self.model.to(self.device)
def save_model(self, model_dir="models"):
"""
Save model to dir
:param model_dir:
:return: model is saved
"""
path = f"{model_dir}"
self.tokenizer.save_pretrained(path)
self.model.save_pretrained(path)
def predict(
self,
source_text: str,
max_length: int = 512,
num_return_sequences: int = 1,
num_beams: int = 2,
top_k: int = 50,
top_p: float = 0.95,
do_sample: bool = True,
repetition_penalty: float = 2.5,
length_penalty: float = 1.0,
early_stopping: bool = True,
skip_special_tokens: bool = True,
clean_up_tokenization_spaces: bool = True,
):
"""
generates prediction for T5/MT5 model
Args:
source_text (str): any text for generating predictions
max_length (int, optional): max token length of prediction. Defaults to 512.
num_return_sequences (int, optional): number of predictions to be returned. Defaults to 1.
num_beams (int, optional): number of beams. Defaults to 2.
top_k (int, optional): Defaults to 50.
top_p (float, optional): Defaults to 0.95.
do_sample (bool, optional): Defaults to True.
repetition_penalty (float, optional): Defaults to 2.5.
length_penalty (float, optional): Defaults to 1.0.
early_stopping (bool, optional): Defaults to True.
skip_special_tokens (bool, optional): Defaults to True.
clean_up_tokenization_spaces (bool, optional): Defaults to True.
Returns:
list[str]: returns predictions
"""
input_ids = self.tokenizer.encode(
source_text, return_tensors="pt", add_special_tokens=True
)
input_ids = input_ids.to(self.device)
generated_ids = self.model.generate(
input_ids=input_ids,
num_beams=num_beams,
max_length=max_length,
repetition_penalty=repetition_penalty,
length_penalty=length_penalty,
early_stopping=early_stopping,
top_p=top_p,
top_k=top_k,
num_return_sequences=num_return_sequences,
)
preds = self.tokenizer.decode(
generated_ids[0],
skip_special_tokens=skip_special_tokens,
clean_up_tokenization_spaces=clean_up_tokenization_spaces,
)
return preds
def evaluate(self, test_df: pd.DataFrame, metrics: str = "rouge"):
metric = load_metric(metrics)
input_text = test_df["input_text"]
references = test_df["output_text"]
references = references.to_list()
predictions = [self.predict(x) for x in tqdm(input_text)]
results = metric.compute(predictions=predictions, references=references)
output = {
"Rouge_1 Low Precision": results["rouge1"].low.precision,
"Rouge_1 Low recall": results["rouge1"].low.recall,
"Rouge_1 Low F1": results["rouge1"].low.fmeasure,
"Rouge_1 Mid Precision": results["rouge1"].mid.precision,
"Rouge_1 Mid recall": results["rouge1"].mid.recall,
"Rouge_1 Mid F1": results["rouge1"].mid.fmeasure,
"Rouge_1 High Precision": results["rouge1"].high.precision,
"Rouge_1 High recall": results["rouge1"].high.recall,
"Rouge_1 High F1": results["rouge1"].high.fmeasure,
"Rouge_2 Low Precision": results["rouge2"].low.precision,
"Rouge_2 Low recall": results["rouge2"].low.recall,
"Rouge_2 Low F1": results["rouge2"].low.fmeasure,
"Rouge_2 Mid Precision": results["rouge2"].mid.precision,
"Rouge_2 Mid recall": results["rouge2"].mid.recall,
"Rouge_2 Mid F1": results["rouge2"].mid.fmeasure,
"Rouge_2 High Precision": results["rouge2"].high.precision,
"Rouge_2 High recall": results["rouge2"].high.recall,
"Rouge_2 High F1": results["rouge2"].high.fmeasure,
"Rouge_L Low Precision": results["rougeL"].low.precision,
"Rouge_L Low recall": results["rougeL"].low.recall,
"Rouge_L Low F1": results["rougeL"].low.fmeasure,
"Rouge_L Mid Precision": results["rougeL"].mid.precision,
"Rouge_L Mid recall": results["rougeL"].mid.recall,
"Rouge_L Mid F1": results["rougeL"].mid.fmeasure,
"Rouge_L High Precision": results["rougeL"].high.precision,
"Rouge_L High recall": results["rougeL"].high.recall,
"Rouge_L High F1": results["rougeL"].high.fmeasure,
"rougeLsum Low Precision": results["rougeLsum"].low.precision,
"rougeLsum Low recall": results["rougeLsum"].low.recall,
"rougeLsum Low F1": results["rougeLsum"].low.fmeasure,
"rougeLsum Mid Precision": results["rougeLsum"].mid.precision,
"rougeLsum Mid recall": results["rougeLsum"].mid.recall,
"rougeLsum Mid F1": results["rougeLsum"].mid.fmeasure,
"rougeLsum High Precision": results["rougeLsum"].high.precision,
"rougeLsum High recall": results["rougeLsum"].high.recall,
"rougeLsum High F1": results["rougeLsum"].high.fmeasure,
}
return output
def upload(self, hf_username, model_name):
hf_password = getpass("Enter your HuggingFace password")
if Path("./models").exists():
shutil.rmtree("./models")
token = HfApi().login(username=hf_username, password=hf_password)
del hf_password
model_url = HfApi().create_repo(token=token, name=model_name, exist_ok=True)
model_repo = Repository(
"./model",
clone_from=model_url,
use_auth_token=token,
git_email=f"{hf_username}@users.noreply.huggingface.co",
git_user=hf_username,
)
readme_txt = f"""
---
Summarisation model {model_name}
""".strip()
(Path(model_repo.local_dir) / "README.md").write_text(readme_txt)
self.save_model()
commit_url = model_repo.push_to_hub()
print("Check out your model at:")
print(commit_url)
print(f"https://huggingface.co/{hf_username}/{model_name}")