|
import torch |
|
import transformers |
|
from torch.utils.data import Dataset, DataLoader |
|
from transformers import RobertaModel, RobertaTokenizer, BertModel, BertTokenizer |
|
import pandas as pd |
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
|
MAX_LEN = 128 |
|
BATCH_SIZE = 20 |
|
text_col_name = 'sentence' |
|
|
|
def scoring_data_prep(dataset): |
|
out = [] |
|
target = [] |
|
mask = [] |
|
|
|
for i in range(len(dataset)): |
|
rec = dataset[i] |
|
out.append(rec['ids'].reshape(-1,MAX_LEN)) |
|
mask.append(rec['mask'].reshape(-1,MAX_LEN)) |
|
|
|
out_stack = torch.cat(out, dim = 0) |
|
mask_stack = torch.cat(mask, dim =0 ) |
|
out_stack = out_stack.to(device, dtype = torch.long) |
|
mask_stack = mask_stack.to(device, dtype = torch.long) |
|
|
|
return out_stack, mask_stack |
|
|
|
class Triage(Dataset): |
|
""" |
|
This is a subclass of torch packages Dataset class. It processes input to create ids, masks and targets required for model training. |
|
""" |
|
|
|
def __init__(self, dataframe, tokenizer, max_len, text_col_name): |
|
self.len = len(dataframe) |
|
self.data = dataframe |
|
self.tokenizer = tokenizer |
|
self.max_len = max_len |
|
self.text_col_name = text_col_name |
|
|
|
|
|
def __getitem__(self, index): |
|
title = str(self.data[self.text_col_name][index]) |
|
title = " ".join(title.split()) |
|
inputs = self.tokenizer.encode_plus( |
|
title, |
|
None, |
|
add_special_tokens=True, |
|
max_length=self.max_len, |
|
pad_to_max_length=True, |
|
return_token_type_ids=True, |
|
truncation=True, |
|
) |
|
ids = inputs["input_ids"] |
|
mask = inputs["attention_mask"] |
|
|
|
return { |
|
"ids": torch.tensor(ids, dtype=torch.long), |
|
"mask": torch.tensor(mask, dtype=torch.long), |
|
|
|
} |
|
|
|
def __len__(self): |
|
return self.len |
|
|
|
class BERTClass(torch.nn.Module): |
|
def __init__(self, num_class, task): |
|
super(BERTClass, self).__init__() |
|
self.num_class = num_class |
|
if task =="sustanability": |
|
self.l1 = RobertaModel.from_pretrained("roberta-base") |
|
else: |
|
self.l1 = BertModel.from_pretrained("ProsusAI/finbert") |
|
self.pre_classifier = torch.nn.Linear(768, 768) |
|
self.dropout = torch.nn.Dropout(0.3) |
|
self.classifier = torch.nn.Linear(768, self.num_class) |
|
self.history = dict() |
|
|
|
def forward(self, input_ids, attention_mask): |
|
output_1 = self.l1(input_ids=input_ids, attention_mask=attention_mask) |
|
hidden_state = output_1[0] |
|
pooler = hidden_state[:, 0] |
|
pooler = self.pre_classifier(pooler) |
|
pooler = torch.nn.ReLU()(pooler) |
|
pooler = self.dropout(pooler) |
|
output = self.classifier(pooler) |
|
return output |
|
|
|
def do_predict(model, tokenizer, test_df): |
|
test_set = Triage(test_df, tokenizer, MAX_LEN, text_col_name) |
|
test_params = {'batch_size' : BATCH_SIZE, 'shuffle': False, 'num_workers':0} |
|
test_loader = DataLoader(test_set, **test_params) |
|
out_stack, mask_stack = scoring_data_prep(dataset = test_set) |
|
n = 0 |
|
combined_output = [] |
|
model.eval() |
|
with torch.no_grad(): |
|
while n < test_df.shape[0]: |
|
output = model(out_stack[n:n+BATCH_SIZE,:],mask_stack[n:n+BATCH_SIZE,:]) |
|
n = n + BATCH_SIZE |
|
combined_output.append(output) |
|
combined_output = torch.cat(combined_output, dim = 0) |
|
preds = torch.argsort(combined_output, axis = 1, descending = True) |
|
preds = preds.to('cpu') |
|
actual_predictions = [i[0] for i in preds.tolist()] |
|
combined_output = combined_output.to('cpu') |
|
prob_predictions= [i[1] for i in combined_output.tolist()] |
|
return (actual_predictions, prob_predictions) |
|
|