ML-SLRC / ML_SLRC.py
BecomeAllan
update funs
8bf76cf
from torch import nn
import torch
import numpy as np
from copy import deepcopy
import re
import unicodedata
from torch.utils.data import Dataset, DataLoader,TensorDataset, RandomSampler
from sklearn.model_selection import train_test_split
from torch.optim import Adam
from copy import deepcopy
import gc
import torch
import numpy as np
from torchmetrics import functional as fn
import random
# Pre-trained model
class Encoder(nn.Module):
def __init__(self, layers, freeze_bert, model):
super(Encoder, self).__init__()
# Dummy Parameter
self.dummy_param = nn.Parameter(torch.empty(0))
# Pre-trained model
self.model = deepcopy(model)
# Freezing bert parameters
if freeze_bert:
for param in self.model.parameters():
param.requires_grad = freeze_bert
# Selecting hidden layers of the pre-trained model
old_model_encoder = self.model.encoder.layer
new_model_encoder = nn.ModuleList()
for i in layers:
new_model_encoder.append(old_model_encoder[i])
self.model.encoder.layer = new_model_encoder
# Feed forward
def forward(self, **x):
return self.model(**x)['pooler_output']
# Complete model
class SLR_Classifier(nn.Module):
def __init__(self, **data):
super(SLR_Classifier, self).__init__()
# Dummy Parameter
self.dummy_param = nn.Parameter(torch.empty(0))
# Loss function
# Binary Cross Entropy with logits reduced to mean
self.loss_fn = nn.BCEWithLogitsLoss(reduction = 'mean',
pos_weight=torch.FloatTensor([data.get("pos_weight", 2.5)]))
# Pre-trained model
self.Encoder = Encoder(layers = data.get("bert_layers", range(12)),
freeze_bert = data.get("freeze_bert", False),
model = data.get("model"),
)
# Feature Map Layer
self.feature_map = nn.Sequential(
# nn.LayerNorm(self.Encoder.model.config.hidden_size),
nn.BatchNorm1d(self.Encoder.model.config.hidden_size),
# nn.Dropout(data.get("drop", 0.5)),
nn.Linear(self.Encoder.model.config.hidden_size, 200),
nn.Dropout(data.get("drop", 0.5)),
)
# Classifier Layer
self.classifier = nn.Sequential(
# nn.LayerNorm(self.Encoder.model.config.hidden_size),
# nn.Dropout(data.get("drop", 0.5)),
# nn.BatchNorm1d(self.Encoder.model.config.hidden_size),
# nn.Dropout(data.get("drop", 0.5)),
nn.Tanh(),
nn.Linear(200, 1)
)
# Initializing layer parameters
nn.init.normal_(self.feature_map[1].weight, mean=0, std=0.00001)
nn.init.zeros_(self.feature_map[1].bias)
# Feed forward
def forward(self, input_ids, attention_mask, token_type_ids, labels):
predict = self.Encoder(**{"input_ids":input_ids,
"attention_mask":attention_mask,
"token_type_ids":token_type_ids})
feature = self.feature_map(predict)
logit = self.classifier(feature)
predict = torch.sigmoid(logit)
# Loss function
loss = self.loss_fn(logit.to(torch.float), labels.to(torch.float).unsqueeze(1))
return [loss, [feature, logit], predict]
# Undesirable patterns within texts
patterns = {
'CONCLUSIONS AND IMPLICATIONS':'',
'BACKGROUND AND PURPOSE':'',
'EXPERIMENTAL APPROACH':'',
'KEY RESULTS AEA':'',
'©':'',
'®':'',
'μ':'',
'(C)':'',
'OBJECTIVE:':'',
'MATERIALS AND METHODS:':'',
'SIGNIFICANCE:':'',
'BACKGROUND:':'',
'RESULTS:':'',
'METHODS:':'',
'CONCLUSIONS:':'',
'AIM:':'',
'STUDY DESIGN:':'',
'CLINICAL RELEVANCE:':'',
'CONCLUSION:':'',
'HYPOTHESIS:':'',
'CLINICAL RELEVANCE:':'',
'Questions/Purposes:':'',
'Introduction:':'',
'PURPOSE:':'',
'PATIENTS AND METHODS:':'',
'FINDINGS:':'',
'INTERPRETATIONS:':'',
'FUNDING:':'',
'PROGRESS:':'',
'CONTEXT:':'',
'MEASURES:':'',
'DESIGN:':'',
'BACKGROUND AND OBJECTIVES:':'',
'<p>':'',
'</p>':'',
'<<ETX>>':'',
'+/-':'',
'\(.+\)':'',
'\[.+\]':'',
' \d ':'',
'<':'',
'>':'',
'- ':'',
' +':' ',
', ,':',',
',,':',',
'%':' percent',
'per cent':' percent'
}
patterns = {x.lower():y for x,y in patterns.items()}
LABEL_MAP = {'negative': 0,
'not included':0,
'0':0,
0:0,
'excluded':0,
'positive': 1,
'included':1,
'1':1,
1:1,
}
class SLR_DataSet(Dataset):
def __init__(self,treat_text =None, **args):
self.tokenizer = args.get('tokenizer')
self.data = args.get('data')
self.max_seq_length = args.get("max_seq_length", 512)
self.INPUT_NAME = args.get("input", 'x')
self.LABEL_NAME = args.get("output", 'y')
self.treat_text = treat_text
# Tokenizing and processing text
def encode_text(self, example):
comment_text = example[self.INPUT_NAME]
if self.treat_text:
comment_text = self.treat_text(comment_text)
try:
labels = LABEL_MAP[example[self.LABEL_NAME].lower()]
except:
labels = -1
encoding = self.tokenizer.encode_plus(
(comment_text, "It is great text"),
add_special_tokens=True,
max_length=self.max_seq_length,
return_token_type_ids=True,
padding="max_length",
truncation=True,
return_attention_mask=True,
return_tensors='pt',
)
return tuple((
encoding["input_ids"].flatten(),
encoding["attention_mask"].flatten(),
encoding["token_type_ids"].flatten(),
torch.tensor([torch.tensor(labels).to(int)])
))
def __len__(self):
return len(self.data)
# Returning data
def __getitem__(self, index: int):
# print(index)
data_row = self.data.reset_index().iloc[index]
temp_data = self.encode_text(data_row)
return temp_data
class Learner(nn.Module):
def __init__(self, **args):
"""
:param args:
"""
super(Learner, self).__init__()
self.inner_print = args.get('inner_print')
self.inner_batch_size = args.get('inner_batch_size')
self.outer_update_lr = args.get('outer_update_lr')
self.inner_update_lr = args.get('inner_update_lr')
self.inner_update_step = args.get('inner_update_step')
self.inner_update_step_eval = args.get('inner_update_step_eval')
self.model = args.get('model')
self.device = args.get('device')
# Outer optimizer
self.outer_optimizer = Adam(self.model.parameters(), lr=self.outer_update_lr)
self.model.train()
def forward(self, batch_tasks, training = True, valid_train = True):
"""
batch = [(support TensorDataset, query TensorDataset),
(support TensorDataset, query TensorDataset),
(support TensorDataset, query TensorDataset),
(support TensorDataset, query TensorDataset)]
# support = TensorDataset(all_input_ids, all_attention_mask, all_segment_ids, all_label_ids)
"""
task_accs = []
task_f1 = []
task_recall = []
sum_gradients = []
num_task = len(batch_tasks)
num_inner_update_step = self.inner_update_step if training else self.inner_update_step_eval
# Outer loop tasks
for task_id, task in enumerate(batch_tasks):
support = task[0]
query = task[1]
name = task[2]
# Copying model
fast_model = deepcopy(self.model)
fast_model.to(self.device)
# Inner trainer optimizer
inner_optimizer = Adam(fast_model.parameters(), lr=self.inner_update_lr)
# Creating training data loaders
if len(support) % self.inner_batch_size == 1 :
support_dataloader = DataLoader(support, sampler=RandomSampler(support),
batch_size=self.inner_batch_size,
drop_last=True)
else:
support_dataloader = DataLoader(support, sampler=RandomSampler(support),
batch_size=self.inner_batch_size,
drop_last=False)
# steps_per_epoch=len(support) // self.inner_batch_size
# total_training_steps = steps_per_epoch * 5
# warmup_steps = total_training_steps // 3
#
# scheduler = get_linear_schedule_with_warmup(
# inner_optimizer,
# num_warmup_steps=warmup_steps,
# num_training_steps=total_training_steps
# )
fast_model.train()
# Inner loop training epoch (support set)
if valid_train:
print('----Task',task_id,":", name, '----')
for i in range(0, num_inner_update_step):
all_loss = []
# Inner loop training batch (support set)
for inner_step, batch in enumerate(support_dataloader):
batch = tuple(t.to(self.device) for t in batch)
input_ids, attention_mask, token_type_ids, label_id = batch
# Feed Foward
loss, _, _ = fast_model(input_ids, attention_mask, token_type_ids=token_type_ids, labels = label_id)
# Computing gradients
loss.backward()
# torch.nn.utils.clip_grad_norm_(fast_model.parameters(), max_norm=1)
# Updating inner training parameters
inner_optimizer.step()
inner_optimizer.zero_grad()
# Appending losses
all_loss.append(loss.item())
del batch, input_ids, attention_mask, label_id
torch.cuda.empty_cache()
if valid_train:
if (i+1) % self.inner_print == 0:
print("Inner Loss: ", np.mean(all_loss))
fast_model.to(torch.device('cpu'))
# Inner training phase weights
if training:
meta_weights = list(self.model.parameters())
fast_weights = list(fast_model.parameters())
# Appending gradients
gradients = []
for i, (meta_params, fast_params) in enumerate(zip(meta_weights, fast_weights)):
gradient = meta_params - fast_params
if task_id == 0:
sum_gradients.append(gradient)
else:
sum_gradients[i] += gradient
# Inner test (query set)
fast_model.to(self.device)
fast_model.eval()
if valid_train:
# Inner test (query set)
fast_model.to(self.device)
fast_model.eval()
with torch.no_grad():
# Data loader
query_dataloader = DataLoader(query, sampler=None, batch_size=len(query))
query_batch = iter(query_dataloader).next()
query_batch = tuple(t.to(self.device) for t in query_batch)
q_input_ids, q_attention_mask, q_token_type_ids, q_label_id = query_batch
# Feedfoward
_, _, pre_label_id = fast_model(q_input_ids, q_attention_mask, q_token_type_ids, labels = q_label_id)
# Predictions
pre_label_id = pre_label_id.detach().cpu().squeeze()
# Labels
q_label_id = q_label_id.detach().cpu()
# Calculating metrics
acc = fn.accuracy(pre_label_id, q_label_id).item()
recall = fn.recall(pre_label_id, q_label_id).item(),
f1 = fn.f1_score(pre_label_id, q_label_id).item()
# appending metrics
task_accs.append(acc)
task_f1.append(f1)
task_recall.append(recall)
fast_model.to(torch.device('cpu'))
del fast_model, inner_optimizer
torch.cuda.empty_cache()
print("\n")
print("f1:",np.mean(task_f1))
print("recall:",np.mean(task_recall))
# Updating outer training parameters
if training:
# Mean of gradients
for i in range(0,len(sum_gradients)):
sum_gradients[i] = sum_gradients[i] / float(num_task)
# Indexing parameters to model
for i, params in enumerate(self.model.parameters()):
params.grad = sum_gradients[i]
# Updating parameters
self.outer_optimizer.step()
self.outer_optimizer.zero_grad()
del sum_gradients
gc.collect()
torch.cuda.empty_cache()
if valid_train:
return np.mean(task_accs)
else:
return np.array(0)
# Creating Meta Tasks
class MetaTask(Dataset):
def __init__(self, examples, num_task, k_support, k_query,
tokenizer, training=True, max_seq_length=512,
treat_text =None, **args):
"""
:param samples: list of samples
:param num_task: number of training tasks.
:param k_support: number of classes support samples per task
:param k_query: number of classes query sample per task
"""
self.examples = examples
self.num_task = num_task
self.k_support = k_support
self.k_query = k_query
self.tokenizer = tokenizer
self.max_seq_length = max_seq_length
self.treat_text = treat_text
# Randomly generating tasks
self.create_batch(self.num_task, training)
# Creating batch
def create_batch(self, num_task, training):
self.supports = [] # support set
self.queries = [] # query set
self.task_names = [] # Name of task
self.supports_indexs = [] # index of supports
self.queries_indexs = [] # index of queries
self.num_task=num_task
# Available tasks
domains = self.examples['domain'].unique()
# If not training, create all tasks
if not(training):
self.task_names = domains
num_task = len(self.task_names)
self.num_task=num_task
for b in range(num_task): # For each task,
total_per_class = self.k_support + self.k_query
task_size = 2*self.k_support + 2*self.k_query
# Select a task at random
if training:
domain = random.choice(domains)
self.task_names.append(domain)
else:
domain = self.task_names[b]
# Task data
domainExamples = self.examples[self.examples['domain'] == domain]
# Minimal label quantity
min_per_class = min(domainExamples['label'].value_counts())
if total_per_class > min_per_class:
total_per_class = min_per_class
# Select k_support + k_query task examples
# Sample (n) from each label(class)
selected_examples = domainExamples.groupby("label").sample(total_per_class, replace = False)
# Split data into support (training) and query (testing) sets
s, q = train_test_split(selected_examples,
stratify= selected_examples["label"],
test_size= 2*self.k_query/task_size,
shuffle=True)
# Permutating data
s = s.sample(frac=1)
q = q.sample(frac=1)
# Appending indexes
if not(training):
self.supports_indexs.append(s.index)
self.queries_indexs.append(q.index)
# Creating list of support (training) and query (testing) tasks
self.supports.append(s.to_dict('records'))
self.queries.append(q.to_dict('records'))
# Creating task tensors
def create_feature_set(self, examples):
all_input_ids = torch.empty(len(examples), self.max_seq_length, dtype = torch.long)
all_attention_mask = torch.empty(len(examples), self.max_seq_length, dtype = torch.long)
all_token_type_ids = torch.empty(len(examples), self.max_seq_length, dtype = torch.long)
all_label_ids = torch.empty(len(examples), dtype = torch.long)
for _id, e in enumerate(examples):
all_input_ids[_id], all_attention_mask[_id], all_token_type_ids[_id], all_label_ids[_id] = self.encode_text(e)
return TensorDataset(
all_input_ids,
all_attention_mask,
all_token_type_ids,
all_label_ids
)
# Data encoding
def encode_text(self, example):
comment_text = example["text"]
if self.treat_text:
comment_text = self.treat_text(comment_text)
labels = LABEL_MAP[example["label"]]
encoding = self.tokenizer.encode_plus(
(comment_text, "It is a great text."),
add_special_tokens=True,
max_length=self.max_seq_length,
return_token_type_ids=True,
padding="max_length",
truncation=True,
return_attention_mask=True,
return_tensors='pt',
)
return tuple((
encoding["input_ids"].flatten(),
encoding["attention_mask"].flatten(),
encoding["token_type_ids"].flatten(),
torch.tensor([torch.tensor(labels).to(int)])
))
# Returns data upon calling
def __getitem__(self, index):
support_set = self.create_feature_set(self.supports[index])
query_set = self.create_feature_set(self.queries[index])
name = self.task_names[index]
return support_set, query_set, name
def __len__(self):
return self.num_task
class treat_text:
def __init__(self, patterns):
self.patterns = patterns
def __call__(self,text):
text = unicodedata.normalize("NFKD",str(text))
text = multiple_replace(self.patterns,text.lower())
text = re.sub('(\(.+\))|(\[.+\])|( \d )|(<)|(>)|(- )','', text)
text = re.sub('( +)',' ', text)
text = re.sub('(, ,)|(,,)',',', text)
text = re.sub('(%)|(per cent)',' percent', text)
return text
# Regex multiple replace function
def multiple_replace(dict, text):
# Building regex from dict keys
regex = re.compile("(%s)" % "|".join(map(re.escape, dict.keys())))
# Substitution
return regex.sub(lambda mo: dict[mo.string[mo.start():mo.end()]], text)