Spaces:
Runtime error
Runtime error
# -*- coding: utf-8 -*- | |
"""Finetuning Language Models - Can I Patent This?.ipynb | |
Automatically generated by Colaboratory. | |
Original file is located at | |
https://colab.research.google.com/drive/1x9XfLKvGNBsajOK8rztsZnCoD2ucGfO6 | |
# Finetuning Language Models - Can I Patent This? | |
Using the [Harvard USPTO patent dataset](https://github.com/suzgunmirac/hupd), we will fine-tune a DistilBERT model | |
obtained from Hugging Face that can predict whether a patent is accepted or rejected based off of its abstract and claims. | |
""" | |
import gc | |
import argparse | |
import numpy as np | |
import torch | |
from torch.utils.data import DataLoader | |
from torch.optim import AdamW | |
from datasets import load_dataset, load_from_disk | |
from transformers import DistilBertForSequenceClassification, DistilBertTokenizer, DistilBertConfig | |
# Initializing global variables | |
file_path = '/app/models/content/' | |
decision_to_str = {'REJECTED': 0, 'ACCEPTED': 1, 'PENDING': 2, 'CONT-REJECTED': 3, 'CONT-ACCEPTED': 4, 'CONT-PENDING': 5} | |
criterion = torch.nn.CrossEntropyLoss() | |
def create_dataloaders(dataset_dict, section): | |
# Initializing the tokenizer | |
model_name = 'distilbert-base-uncased' | |
tokenizer = DistilBertTokenizer.from_pretrained(model_name, do_lower_case=True) | |
train_set, val_set = dataset_dict['train'], dataset_dict['validation'] | |
# Training set | |
train_set = train_set.map( | |
lambda e: tokenizer((e[section]), truncation=True, padding='max_length'), | |
batched=True) | |
# Validation set | |
val_set = val_set.map( | |
lambda e: tokenizer((e[section]), truncation=True, padding='max_length'), | |
batched=True) | |
train_set.set_format(type='torch', | |
columns=['input_ids', 'attention_mask', 'decision']) | |
val_set.set_format(type='torch', | |
columns=['input_ids', 'attention_mask', 'decision']) | |
train_loader = DataLoader(train_set, batch_size=8, shuffle=True) | |
val_loader = DataLoader(val_set, batch_size=8, shuffle=False) | |
return train_loader, val_loader, tokenizer | |
def measure_accuracy(outputs, labels): | |
# This function will accept a model's outputs and the actual decisions | |
# and return test accuracy and number of samples. | |
preds = np.argmax(outputs, axis=1).flatten() | |
labels = labels.flatten() | |
correct = np.sum(preds == labels) | |
return correct, len(labels) | |
def validation(model, val_loader): | |
# This function accepts a model and a validation set DataLoader as its parameters | |
# and returns the test accuracy. | |
model.eval() | |
total_correct = 0 | |
total_samples = 0 | |
for batch in val_loader: | |
input_ids = batch['input_ids'].to(device) | |
labels = batch['decision'].to(device) | |
with torch.no_grad(): | |
outputs = model(input_ids=input_ids, labels=labels) | |
logits = outputs.logits | |
num_correct, num_samples = measure_accuracy(logits.cpu().numpy(), labels.cpu().numpy()) | |
total_correct += num_correct | |
total_samples += num_samples | |
del input_ids, labels, logits | |
gc.collect() | |
torch.cuda.empty_cache() | |
return (total_correct/total_samples) * 100 | |
def train(device, model, tokenizer, train_loader, val_loader, section): | |
# This function will accept a model, the training set DataLoader, validation set | |
# DataLoader, and section as its parameters and return the trained model. | |
model.train() | |
# Define optimizer. | |
optim = AdamW(model.parameters(), lr=5e-5) | |
num_epochs = 5 | |
best_val_acc = 0 | |
for epoch in range(num_epochs): | |
for batch in train_loader: | |
optim.zero_grad() | |
input_ids = batch['input_ids'].to(device, non_blocking=True) | |
attention_mask = batch['attention_mask'].to(device, non_blocking=True) | |
labels = batch['decision'].to(device, non_blocking=True) | |
outputs = model(input_ids, attention_mask=attention_mask, labels=labels).logits | |
loss = criterion(outputs, labels) | |
loss.backward() | |
optim.step() | |
del input_ids, attention_mask, labels | |
gc.collect() | |
torch.cuda.empty_cache() | |
# Calculate test accuracy. | |
val_acc = validation(model, val_loader) | |
# Save the model that yields the best test accuracy | |
if best_val_acc < val_acc: | |
best_val_acc = val_acc | |
model.save_pretrained(file_path + section + '/') | |
tokenizer.save_pretrained(file_path + section + '_model_tokenizer/') | |
model.train() | |
return model | |
if __name__ == '__main__': | |
device = 'cuda' if torch.cuda.is_available() else 'cpu' | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--section', type=str) | |
args = parser.parse_args() | |
section = args.section | |
dataset_dict = load_from_disk(file_path + 'dataset_dict') | |
train_loader, val_loader, tokenizer = create_dataloaders(dataset_dict, section) | |
del dataset_dict | |
gc.collect() | |
torch.cuda.empty_cache() | |
# Defining the models. | |
config = DistilBertConfig(num_classes=2, output_hidden_states=False) | |
model = DistilBertForSequenceClassification(config=config) | |
model.to(device) | |
# Train the model. | |
model = train(device, model, tokenizer, train_loader, val_loader, section) | |
val_acc = validation(model, val_loader) | |
print(f'*** Accuracy on the validation set ({section}): {val_acc}') | |