import torch import torch.nn as nn import torch.nn.functional as F from transformers.models.bert.modeling_bert import BertPreTrainedModel, BertModel from transformers.models.roberta.modeling_roberta import RobertaPreTrainedModel, RobertaModel from transformers.models.albert.modeling_albert import AlbertPreTrainedModel, AlbertModel from transformers.models.megatron_bert.modeling_megatron_bert import MegatronBertPreTrainedModel, MegatronBertModel from transformers.modeling_outputs import TokenClassifierOutput from torch.nn import CrossEntropyLoss from loss.focal_loss import FocalLoss from loss.label_smoothing import LabelSmoothingCrossEntropy from models.basic_modules.crf import CRF from tools.model_utils.parameter_freeze import ParameterFreeze from tools.runner_utils.log_util import logging logger = logging.getLogger(__name__) freezer = ParameterFreeze() """ BERT for token-level classification with softmax head. """ class BertSoftmaxForSequenceLabeling(BertPreTrainedModel): def __init__(self, config): super(BertSoftmaxForSequenceLabeling, self).__init__(config) self.num_labels = config.num_labels self.bert = BertModel(config) if self.config.use_freezing: self.bert = freezer.freeze_lm(self.bert) self.dropout = nn.Dropout(config.hidden_dropout_prob) self.classifier = nn.Linear(config.hidden_size, config.num_labels) self.loss_type = config.loss_type self.init_weights() def forward( self, input_ids, attention_mask=None, token_type_ids=None, position_ids=None, head_mask=None, labels=None, return_dict=False, ): outputs = self.bert(input_ids = input_ids,attention_mask=attention_mask,token_type_ids=token_type_ids) sequence_output = outputs[0] sequence_output = self.dropout(sequence_output) logits = self.classifier(sequence_output) outputs = (logits,) + outputs[2:] # add hidden states and attention if they are here if labels is not None: assert self.loss_type in ["lsr", "focal", "ce"] if self.loss_type == "lsr": loss_fct = LabelSmoothingCrossEntropy(ignore_index=0) elif self.loss_type == "focal": loss_fct = FocalLoss(ignore_index=0) else: loss_fct = CrossEntropyLoss(ignore_index=0) # Only keep active parts of the loss if attention_mask is not None: active_loss = attention_mask.view(-1) == 1 active_logits = logits.view(-1, self.num_labels)[active_loss] active_labels = labels.view(-1)[active_loss] loss = loss_fct(active_logits, active_labels) else: loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1)) if not return_dict: outputs = (loss,) + outputs return outputs # (loss), scores, (hidden_states), (attentions) return TokenClassifierOutput( loss=loss, logits=logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions, ) """ RoBERTa for token-level classification with softmax head. """ class RobertaSoftmaxForSequenceLabeling(RobertaPreTrainedModel): def __init__(self, config): super(RobertaSoftmaxForSequenceLabeling, self).__init__(config) self.num_labels = config.num_labels self.roberta = RobertaModel(config) if self.config.use_freezing: self.roberta = freezer.freeze_lm(self.roberta) self.dropout = nn.Dropout(config.hidden_dropout_prob) self.classifier = nn.Linear(config.hidden_size, config.num_labels) self.loss_type = config.loss_type self.init_weights() def forward( self, input_ids, attention_mask=None, token_type_ids=None, position_ids=None, head_mask=None, labels=None, return_dict=False, ): outputs = self.roberta(input_ids = input_ids,attention_mask=attention_mask,token_type_ids=token_type_ids) sequence_output = outputs[0] sequence_output = self.dropout(sequence_output) logits = self.classifier(sequence_output) outputs = (logits,) + outputs[2:] # add hidden states and attention if they are here if labels is not None: assert self.loss_type in ["lsr", "focal", "ce"] if self.loss_type == "lsr": loss_fct = LabelSmoothingCrossEntropy(ignore_index=0) elif self.loss_type == "focal": loss_fct = FocalLoss(ignore_index=0) else: loss_fct = CrossEntropyLoss(ignore_index=0) # Only keep active parts of the loss if attention_mask is not None: active_loss = attention_mask.view(-1) == 1 active_logits = logits.view(-1, self.num_labels)[active_loss] active_labels = labels.view(-1)[active_loss] loss = loss_fct(active_logits, active_labels) else: loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1)) if not return_dict: outputs = (loss,) + outputs return outputs # (loss), scores, (hidden_states), (attentions) return TokenClassifierOutput( loss=loss, logits=logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions, ) """ ALBERT for token-level classification with softmax head. """ class AlbertSoftmaxForSequenceLabeling(AlbertPreTrainedModel): def __init__(self, config): super(AlbertSoftmaxForSequenceLabeling, self).__init__(config) self.num_labels = config.num_labels self.loss_type = config.loss_type self.bert = AlbertModel(config) if self.config.use_freezing: self.bert = freezer.freeze_lm(self.bert) self.dropout = nn.Dropout(config.hidden_dropout_prob) self.classifier = nn.Linear(config.hidden_size, config.num_labels) self.init_weights() def forward( self, input_ids, attention_mask=None, token_type_ids=None, position_ids=None, head_mask=None, labels=None, return_dict=False, ): outputs = self.bert(input_ids = input_ids,attention_mask=attention_mask,token_type_ids=token_type_ids, position_ids=position_ids,head_mask=head_mask) sequence_output = outputs[0] sequence_output = self.dropout(sequence_output) logits = self.classifier(sequence_output) outputs = (logits,) + outputs[2:] # add hidden states and attention if they are here if labels is not None: assert self.loss_type in ["lsr", "focal", "ce"] if self.loss_type =="lsr": loss_fct = LabelSmoothingCrossEntropy(ignore_index=0) elif self.loss_type == "focal": loss_fct = FocalLoss(ignore_index=0) else: loss_fct = CrossEntropyLoss(ignore_index=0) # Only keep active parts of the loss if attention_mask is not None: active_loss = attention_mask.view(-1) == 1 active_logits = logits.view(-1, self.num_labels)[active_loss] active_labels = labels.view(-1)[active_loss] loss = loss_fct(active_logits, active_labels) else: loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1)) if not return_dict: outputs = (loss,) + outputs return outputs # (loss), scores, (hidden_states), (attentions) return TokenClassifierOutput( loss=loss, logits=logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions, ) """ MegatronBERT for token-level classification with softmax head. """ class MegatronBertSoftmaxForSequenceLabeling(MegatronBertPreTrainedModel): def __init__(self, config): super(MegatronBertSoftmaxForSequenceLabeling, self).__init__(config) self.num_labels = config.num_labels self.bert = MegatronBertModel(config) if self.config.use_freezing: self.bert = freezer.freeze_lm(self.bert) self.dropout = nn.Dropout(config.hidden_dropout_prob) self.classifier = nn.Linear(config.hidden_size, config.num_labels) self.loss_type = config.loss_type self.init_weights() def forward( self, input_ids, attention_mask=None, token_type_ids=None, position_ids=None, head_mask=None, labels=None, return_dict=False, ): outputs = self.bert(input_ids = input_ids,attention_mask=attention_mask,token_type_ids=token_type_ids) sequence_output = outputs[0] sequence_output = self.dropout(sequence_output) logits = self.classifier(sequence_output) outputs = (logits,) + outputs[2:] # add hidden states and attention if they are here if labels is not None: assert self.loss_type in ["lsr", "focal", "ce"] if self.loss_type == "lsr": loss_fct = LabelSmoothingCrossEntropy(ignore_index=0) elif self.loss_type == "focal": loss_fct = FocalLoss(ignore_index=0) else: loss_fct = CrossEntropyLoss(ignore_index=0) # Only keep active parts of the loss if attention_mask is not None: active_loss = attention_mask.view(-1) == 1 active_logits = logits.view(-1, self.num_labels)[active_loss] active_labels = labels.view(-1)[active_loss] loss = loss_fct(active_logits, active_labels) else: loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1)) if not return_dict: outputs = (loss,) + outputs return outputs # (loss), scores, (hidden_states), (attentions) return TokenClassifierOutput( loss=loss, logits=logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions, ) """ BERT for token-level classification with CRF head. """ class BertCrfForSequenceLabeling(BertPreTrainedModel): def __init__(self, config): super(BertCrfForSequenceLabeling, self).__init__(config) self.bert = BertModel(config) if self.config.use_freezing: self.bert = freezer.freeze_lm(self.bert) self.dropout = nn.Dropout(config.hidden_dropout_prob) self.classifier = nn.Linear(config.hidden_size, config.num_labels) self.crf = CRF(num_tags=config.num_labels, batch_first=True) self.init_weights() def forward( self, input_ids, attention_mask=None, token_type_ids=None, position_ids=None, head_mask=None, labels=None, return_dict=False, ): outputs =self.bert(input_ids = input_ids,attention_mask=attention_mask,token_type_ids=token_type_ids) sequence_output = outputs[0] sequence_output = self.dropout(sequence_output) logits = self.classifier(sequence_output) outputs = (logits,) if labels is not None: loss = self.crf(emissions = logits, tags=labels, mask=attention_mask) outputs =(-1*loss,)+outputs if not return_dict: return outputs # (loss), scores, (hidden_states), (attentions) return TokenClassifierOutput( loss=loss, logits=logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions, ) """ RoBERTa for token-level classification with CRF head. """ class RobertaCrfForSequenceLabeling(RobertaPreTrainedModel): def __init__(self, config): super(RobertaCrfForSequenceLabeling, self).__init__(config) self.roberta = RobertaModel(config) if self.config.use_freezing: self.roberta = freezer.freeze_lm(self.roberta) self.dropout = nn.Dropout(config.hidden_dropout_prob) self.classifier = nn.Linear(config.hidden_size, config.num_labels) self.crf = CRF(num_tags=config.num_labels, batch_first=True) self.init_weights() def forward( self, input_ids, attention_mask=None, token_type_ids=None, position_ids=None, head_mask=None, labels=None, return_dict=False, ): outputs =self.roberta(input_ids = input_ids,attention_mask=attention_mask,token_type_ids=token_type_ids) sequence_output = outputs[0] sequence_output = self.dropout(sequence_output) logits = self.classifier(sequence_output) outputs = (logits,) if labels is not None: loss = self.crf(emissions = logits, tags=labels, mask=attention_mask) outputs =(-1*loss,)+outputs if not return_dict: return outputs # (loss), scores, (hidden_states), (attentions) return TokenClassifierOutput( loss=loss, logits=logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions, ) """ ALBERT for token-level classification with CRF head. """ class AlbertCrfForSequenceLabeling(AlbertPreTrainedModel): def __init__(self, config): super(AlbertCrfForSequenceLabeling, self).__init__(config) self.bert = AlbertModel(config) if self.config.use_freezing: self.bert = freezer.freeze_lm(self.bert) self.dropout = nn.Dropout(config.hidden_dropout_prob) self.classifier = nn.Linear(config.hidden_size, config.num_labels) self.crf = CRF(num_tags=config.num_labels, batch_first=True) self.init_weights() def forward( self, input_ids, attention_mask=None, token_type_ids=None, position_ids=None, head_mask=None, labels=None, return_dict=False, ): outputs = self.bert(input_ids = input_ids,attention_mask=attention_mask,token_type_ids=token_type_ids) sequence_output = outputs[0] sequence_output = self.dropout(sequence_output) logits = self.classifier(sequence_output) outputs = (logits,) if labels is not None: loss = self.crf(emissions = logits, tags=labels, mask=attention_mask) outputs =(-1*loss,)+outputs if not return_dict: return outputs # (loss), scores, (hidden_states), (attentions) return TokenClassifierOutput( loss=loss, logits=logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions, ) """ MegatronBERT for token-level classification with CRF head. """ class MegatronBertCrfForSequenceLabeling(MegatronBertPreTrainedModel): def __init__(self, config): super(MegatronBertCrfForSequenceLabeling, self).__init__(config) self.bert = MegatronBertModel(config) if self.config.use_freezing: self.bert = freezer.freeze_lm(self.bert) self.dropout = nn.Dropout(config.hidden_dropout_prob) self.classifier = nn.Linear(config.hidden_size, config.num_labels) self.crf = CRF(num_tags=config.num_labels, batch_first=True) self.init_weights() def forward( self, input_ids, attention_mask=None, token_type_ids=None, position_ids=None, head_mask=None, labels=None, return_dict=False, ): outputs =self.bert(input_ids = input_ids,attention_mask=attention_mask,token_type_ids=token_type_ids) sequence_output = outputs[0] sequence_output = self.dropout(sequence_output) logits = self.classifier(sequence_output) outputs = (logits,) if labels is not None: loss = self.crf(emissions = logits, tags=labels, mask=attention_mask) outputs =(-1*loss,)+outputs if not return_dict: return outputs # (loss), scores, (hidden_states), (attentions) return TokenClassifierOutput( loss=loss, logits=logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions, )