Spaces:
Runtime error
Runtime error
import math | |
from typing import Optional, Tuple | |
from transformers import AdamW, get_linear_schedule_with_warmup, AutoConfig | |
from transformers import BertForPreTraining, BertModel, RobertaModel, AlbertModel, AlbertForMaskedLM, RobertaForMaskedLM | |
import torch | |
import torch.nn as nn | |
import pytorch_lightning as pl | |
from sklearn.metrics import f1_score | |
from dataclasses import dataclass | |
class BERTAlignModel(pl.LightningModule): | |
def __init__(self, model='bert-base-uncased', using_pretrained=True, *args, **kwargs) -> None: | |
super().__init__() | |
# Already defined in lightning: self.device | |
self.save_hyperparameters() | |
self.model = model | |
if 'muppet' in model: | |
assert using_pretrained == True, "Only support pretrained muppet!" | |
self.base_model = RobertaModel.from_pretrained(model) | |
self.mlm_head = RobertaForMaskedLM(AutoConfig.from_pretrained(model)).lm_head | |
elif 'roberta' in model: | |
if using_pretrained: | |
self.base_model = RobertaModel.from_pretrained(model) | |
self.mlm_head = RobertaForMaskedLM.from_pretrained(model).lm_head | |
else: | |
self.base_model = RobertaModel(AutoConfig.from_pretrained(model)) | |
self.mlm_head = RobertaForMaskedLM(AutoConfig.from_pretrained(model)).lm_head | |
elif 'albert' in model: | |
if using_pretrained: | |
self.base_model = AlbertModel.from_pretrained(model) | |
self.mlm_head = AlbertForMaskedLM.from_pretrained(model).predictions | |
else: | |
self.base_model = AlbertModel(AutoConfig.from_pretrained(model)) | |
self.mlm_head = AlbertForMaskedLM(AutoConfig.from_pretrained(model)).predictions | |
elif 'bert' in model: | |
if using_pretrained: | |
self.base_model = BertModel.from_pretrained(model) | |
self.mlm_head = BertForPreTraining.from_pretrained(model).cls.predictions | |
else: | |
self.base_model = BertModel(AutoConfig.from_pretrained(model)) | |
self.mlm_head = BertForPreTraining(AutoConfig.from_pretrained(model)).cls.predictions | |
elif 'electra' in model: | |
self.generator = BertModel(AutoConfig.from_pretrained('prajjwal1/bert-small')) | |
self.generator_mlm = BertForPreTraining(AutoConfig.from_pretrained('prajjwal1/bert-small')).cls.predictions | |
self.base_model = BertModel(AutoConfig.from_pretrained('bert-base-uncased')) | |
self.discriminator_predictor = ElectraDiscriminatorPredictions(self.base_model.config) | |
self.bin_layer = nn.Linear(self.base_model.config.hidden_size, 2) | |
self.tri_layer = nn.Linear(self.base_model.config.hidden_size, 3) | |
self.reg_layer = nn.Linear(self.base_model.config.hidden_size, 1) | |
self.dropout = nn.Dropout(p=0.1) | |
self.need_mlm = True | |
self.is_finetune = False | |
self.mlm_loss_factor = 0.5 | |
self.softmax = nn.Softmax(dim=-1) | |
def forward(self, batch): | |
if 'electra' in self.model: | |
return self.electra_forward(batch) | |
base_model_output = self.base_model( | |
input_ids = batch['input_ids'], | |
attention_mask = batch['attention_mask'], | |
token_type_ids = batch['token_type_ids'] if 'token_type_ids' in batch.keys() else None | |
) | |
prediction_scores = self.mlm_head(base_model_output.last_hidden_state) ## sequence_output for mlm | |
seq_relationship_score = self.bin_layer(self.dropout(base_model_output.pooler_output)) ## pooled output for classification | |
tri_label_score = self.tri_layer(self.dropout(base_model_output.pooler_output)) | |
reg_label_score = self.reg_layer(base_model_output.pooler_output) | |
total_loss = None | |
if 'mlm_label' in batch.keys(): ### 'mlm_label' and 'align_label' when training | |
ce_loss_fct = nn.CrossEntropyLoss(reduction='sum') | |
masked_lm_loss = ce_loss_fct(prediction_scores.view(-1, self.base_model.config.vocab_size), batch['mlm_label'].view(-1)) #/ self.con vocabulary | |
next_sentence_loss = ce_loss_fct(seq_relationship_score.view(-1, 2), batch['align_label'].view(-1)) / math.log(2) | |
tri_label_loss = ce_loss_fct(tri_label_score.view(-1, 3), batch['tri_label'].view(-1)) / math.log(3) | |
reg_label_loss = self.mse_loss(reg_label_score.view(-1), batch['reg_label'].view(-1), reduction='sum') | |
masked_lm_loss_num = torch.sum(batch['mlm_label'].view(-1) != -100) | |
next_sentence_loss_num = torch.sum(batch['align_label'].view(-1) != -100) | |
tri_label_loss_num = torch.sum(batch['tri_label'].view(-1) != -100) | |
reg_label_loss_num = torch.sum(batch['reg_label'].view(-1) != -100.0) | |
return ModelOutput( | |
loss=total_loss, | |
all_loss=[masked_lm_loss, next_sentence_loss, tri_label_loss, reg_label_loss] if 'mlm_label' in batch.keys() else None, | |
loss_nums=[masked_lm_loss_num, next_sentence_loss_num, tri_label_loss_num, reg_label_loss_num] if 'mlm_label' in batch.keys() else None, | |
prediction_logits=prediction_scores, | |
seq_relationship_logits=seq_relationship_score, | |
tri_label_logits=tri_label_score, | |
reg_label_logits=reg_label_score, | |
hidden_states=base_model_output.hidden_states, | |
attentions=base_model_output.attentions | |
) | |
def electra_forward(self, batch): | |
if 'mlm_label' in batch.keys(): | |
ce_loss_fct = nn.CrossEntropyLoss() | |
generator_output = self.generator_mlm(self.generator( | |
input_ids = batch['input_ids'], | |
attention_mask = batch['attention_mask'], | |
token_type_ids = batch['token_type_ids'] if 'token_type_ids' in batch.keys() else None | |
).last_hidden_state) | |
masked_lm_loss = ce_loss_fct(generator_output.view(-1, self.generator.config.vocab_size), batch['mlm_label'].view(-1)) | |
hallucinated_tokens = batch['input_ids'].clone() | |
hallucinated_tokens[batch['mlm_label']!=-100] = torch.argmax(generator_output, dim=-1)[batch['mlm_label']!=-100] | |
replaced_token_label = (batch['input_ids'] == hallucinated_tokens).long()#.type(torch.LongTensor) #[batch['mlm_label'] == -100] = -100 | |
replaced_token_label[batch['mlm_label']!=-100] = (batch['mlm_label'] == hallucinated_tokens)[batch['mlm_label']!=-100].long() | |
replaced_token_label[batch['input_ids'] == 0] = -100 ### ignore paddings | |
base_model_output = self.base_model( | |
input_ids = hallucinated_tokens if 'mlm_label' in batch.keys() else batch['input_ids'], | |
attention_mask = batch['attention_mask'], | |
token_type_ids = batch['token_type_ids'] if 'token_type_ids' in batch.keys() else None | |
) | |
hallu_detect_score = self.discriminator_predictor(base_model_output.last_hidden_state) | |
seq_relationship_score = self.bin_layer(self.dropout(base_model_output.pooler_output)) ## pooled output for classification | |
tri_label_score = self.tri_layer(self.dropout(base_model_output.pooler_output)) | |
reg_label_score = self.reg_layer(base_model_output.pooler_output) | |
total_loss = None | |
if 'mlm_label' in batch.keys(): ### 'mlm_label' and 'align_label' when training | |
total_loss = [] | |
ce_loss_fct = nn.CrossEntropyLoss() | |
hallu_detect_loss = ce_loss_fct(hallu_detect_score.view(-1,2),replaced_token_label.view(-1)) | |
next_sentence_loss = ce_loss_fct(seq_relationship_score.view(-1, 2), batch['align_label'].view(-1)) | |
tri_label_loss = ce_loss_fct(tri_label_score.view(-1, 3), batch['tri_label'].view(-1)) | |
reg_label_loss = self.mse_loss(reg_label_score.view(-1), batch['reg_label'].view(-1)) | |
total_loss.append(10.0 * hallu_detect_loss if not torch.isnan(hallu_detect_loss).item() else 0.) | |
total_loss.append(0.2 * masked_lm_loss if (not torch.isnan(masked_lm_loss).item() and self.need_mlm) else 0.) | |
total_loss.append(next_sentence_loss if not torch.isnan(next_sentence_loss).item() else 0.) | |
total_loss.append(tri_label_loss if not torch.isnan(tri_label_loss).item() else 0.) | |
total_loss.append(reg_label_loss if not torch.isnan(reg_label_loss).item() else 0.) | |
total_loss = sum(total_loss) | |
return ModelOutput( | |
loss=total_loss, | |
all_loss=[masked_lm_loss, next_sentence_loss, tri_label_loss, reg_label_loss, hallu_detect_loss] if 'mlm_label' in batch.keys() else None, | |
prediction_logits=hallu_detect_score, | |
seq_relationship_logits=seq_relationship_score, | |
tri_label_logits=tri_label_score, | |
reg_label_logits=reg_label_score, | |
hidden_states=base_model_output.hidden_states, | |
attentions=base_model_output.attentions | |
) | |
def training_step(self, train_batch, batch_idx): | |
output = self(train_batch) | |
return {'losses': output.all_loss, 'loss_nums': output.loss_nums} | |
def training_step_end(self, step_output): | |
losses = step_output['losses'] | |
loss_nums = step_output['loss_nums'] | |
assert len(loss_nums) == len(losses), 'loss_num should be the same length as losses' | |
loss_mlm_num = torch.sum(loss_nums[0]) | |
loss_bin_num = torch.sum(loss_nums[1]) | |
loss_tri_num = torch.sum(loss_nums[2]) | |
loss_reg_num = torch.sum(loss_nums[3]) | |
loss_mlm = torch.sum(losses[0]) / loss_mlm_num if loss_mlm_num > 0 else 0. | |
loss_bin = torch.sum(losses[1]) / loss_bin_num if loss_bin_num > 0 else 0. | |
loss_tri = torch.sum(losses[2]) / loss_tri_num if loss_tri_num > 0 else 0. | |
loss_reg = torch.sum(losses[3]) / loss_reg_num if loss_reg_num > 0 else 0. | |
total_loss = self.mlm_loss_factor * loss_mlm + loss_bin + loss_tri + loss_reg | |
self.log('train_loss', total_loss)# , sync_dist=True | |
self.log('mlm_loss', loss_mlm) | |
self.log('bin_label_loss', loss_bin) | |
self.log('tri_label_loss', loss_tri) | |
self.log('reg_label_loss', loss_reg) | |
return total_loss | |
def validation_step(self, val_batch, batch_idx): | |
if not self.is_finetune: | |
with torch.no_grad(): | |
output = self(val_batch) | |
return {'losses': output.all_loss, 'loss_nums': output.loss_nums} | |
with torch.no_grad(): | |
output = self(val_batch)['seq_relationship_logits'] | |
output = self.softmax(output)[:, 1].tolist() | |
pred = [int(align_prob>0.5) for align_prob in output] | |
labels = val_batch['align_label'].tolist() | |
return {"pred": pred, 'labels': labels}#, "preds":preds, "labels":x['labels']} | |
def validation_step_end(self, step_output): | |
losses = step_output['losses'] | |
loss_nums = step_output['loss_nums'] | |
assert len(loss_nums) == len(losses), 'loss_num should be the same length as losses' | |
loss_mlm_num = torch.sum(loss_nums[0]) | |
loss_bin_num = torch.sum(loss_nums[1]) | |
loss_tri_num = torch.sum(loss_nums[2]) | |
loss_reg_num = torch.sum(loss_nums[3]) | |
loss_mlm = torch.sum(losses[0]) / loss_mlm_num if loss_mlm_num > 0 else 0. | |
loss_bin = torch.sum(losses[1]) / loss_bin_num if loss_bin_num > 0 else 0. | |
loss_tri = torch.sum(losses[2]) / loss_tri_num if loss_tri_num > 0 else 0. | |
loss_reg = torch.sum(losses[3]) / loss_reg_num if loss_reg_num > 0 else 0. | |
total_loss = self.mlm_loss_factor * loss_mlm + loss_bin + loss_tri + loss_reg | |
self.log('train_loss', total_loss)# , sync_dist=True | |
self.log('mlm_loss', loss_mlm) | |
self.log('bin_label_loss', loss_bin) | |
self.log('tri_label_loss', loss_tri) | |
self.log('reg_label_loss', loss_reg) | |
return total_loss | |
def validation_epoch_end(self, outputs): | |
if not self.is_finetune: | |
total_loss = torch.stack(outputs).mean() | |
self.log("val_loss", total_loss, prog_bar=True, sync_dist=True) | |
else: | |
all_predictions = [] | |
all_labels = [] | |
for each_output in outputs: | |
all_predictions.extend(each_output['pred']) | |
all_labels.extend(each_output['labels']) | |
self.log("f1", f1_score(all_labels, all_predictions), prog_bar=True, sync_dist=True) | |
def configure_optimizers(self): | |
"""Prepare optimizer and schedule (linear warmup and decay)""" | |
no_decay = ["bias", "LayerNorm.weight"] | |
optimizer_grouped_parameters = [ | |
{ | |
"params": [p for n, p in self.named_parameters() if not any(nd in n for nd in no_decay)], | |
"weight_decay": self.hparams.weight_decay, | |
}, | |
{ | |
"params": [p for n, p in self.named_parameters() if any(nd in n for nd in no_decay)], | |
"weight_decay": 0.0, | |
}, | |
] | |
optimizer = AdamW(optimizer_grouped_parameters, lr=self.hparams.learning_rate, eps=self.hparams.adam_epsilon) | |
scheduler = get_linear_schedule_with_warmup( | |
optimizer, | |
num_warmup_steps=int(self.hparams.warmup_steps_portion * self.trainer.estimated_stepping_batches), | |
num_training_steps=self.trainer.estimated_stepping_batches, | |
) | |
scheduler = {"scheduler": scheduler, "interval": "step", "frequency": 1} | |
return [optimizer], [scheduler] | |
def mse_loss(self, input, target, ignored_index=-100.0, reduction='mean'): | |
mask = (target == ignored_index) | |
out = (input[~mask]-target[~mask])**2 | |
if reduction == "mean": | |
return out.mean() | |
elif reduction == "sum": | |
return out.sum() | |
class ElectraDiscriminatorPredictions(nn.Module): | |
"""Prediction module for the discriminator, made up of two dense layers.""" | |
def __init__(self, config): | |
super().__init__() | |
self.dense = nn.Linear(config.hidden_size, config.hidden_size) | |
self.dense_prediction = nn.Linear(config.hidden_size, 2) | |
self.config = config | |
self.gelu = nn.GELU() | |
def forward(self, discriminator_hidden_states): | |
hidden_states = self.dense(discriminator_hidden_states) | |
hidden_states = self.gelu(hidden_states) | |
logits = self.dense_prediction(hidden_states).squeeze(-1) | |
return logits | |
class ModelOutput(): | |
loss: Optional[torch.FloatTensor] = None | |
all_loss: Optional[list] = None | |
loss_nums: Optional[list] = None | |
prediction_logits: torch.FloatTensor = None | |
seq_relationship_logits: torch.FloatTensor = None | |
tri_label_logits: torch.FloatTensor = None | |
reg_label_logits: torch.FloatTensor = None | |
hidden_states: Optional[Tuple[torch.FloatTensor]] = None | |
attentions: Optional[Tuple[torch.FloatTensor]] = None |