Spaces:
Sleeping
Sleeping
import collections.abc | |
import math | |
import pdb | |
import torch | |
import torch.utils.checkpoint | |
from torch import nn | |
from torch.nn import CrossEntropyLoss, MSELoss, MarginRankingLoss | |
import transformers | |
from transformers.activations import ACT2FN | |
from transformers.file_utils import add_start_docstrings, add_start_docstrings_to_model_forward, replace_return_docstrings | |
from transformers.modeling_outputs import BaseModelOutput, BaseModelOutputWithPooling, SequenceClassifierOutput | |
from transformers.modeling_utils import PreTrainedModel, find_pruneable_heads_and_indices, prune_linear_layer | |
from transformers.utils import logging | |
from transformers.modeling_utils import ( | |
PreTrainedModel, | |
apply_chunking_to_forward, | |
find_pruneable_heads_and_indices, | |
prune_linear_layer, | |
) | |
from transformers.modeling_outputs import ( | |
BaseModelOutputWithPastAndCrossAttentions, | |
BaseModelOutputWithPoolingAndCrossAttentions, | |
CausalLMOutputWithCrossAttentions, | |
MaskedLMOutput, | |
MultipleChoiceModelOutput, | |
NextSentencePredictorOutput, | |
QuestionAnsweringModelOutput, | |
SequenceClassifierOutput, | |
TokenClassifierOutput, | |
) | |
from transformers.models.bert.modeling_bert import BertPooler, BertEncoder, BertLayer, BertOnlyMLMHead | |
from transformers.models.bert.modeling_bert import load_tf_weights_in_bert | |
from transformers.models.bert.modeling_bert import BertModel, BertPreTrainedModel | |
from transformers.configuration_utils import PretrainedConfig | |
""" | |
Acknowledge: SpaBERT code is derived/adapted from the BERT model from HuggingFace | |
https://huggingface.co/docs/transformers/model_doc/bert | |
""" | |
class SpatialBertConfig(PretrainedConfig): | |
r""" | |
This is the configuration class to store the configuration of a :class:`SpatialBertModel` | |
It is used to instantiate a SpatialBERT model according to the specified arguments, | |
defining the model architecture. | |
Args: | |
vocab_size (:obj:`int`, `optional`, defaults to 30522): | |
hidden_size (:obj:`int`, `optional`, defaults to 768): | |
Dimensionality of the encoder layers and the pooler layer. | |
num_hidden_layers (:obj:`int`, `optional`, defaults to 12): | |
Number of hidden layers in the Transformer encoder. | |
num_attention_heads (:obj:`int`, `optional`, defaults to 12): | |
Number of attention heads for each attention layer in the Transformer encoder. | |
intermediate_size (:obj:`int`, `optional`, defaults to 3072): | |
Dimensionality of the "intermediate" (often named feed-forward) layer in the Transformer encoder. | |
hidden_act (:obj:`str` or :obj:`Callable`, `optional`, defaults to :obj:`"gelu"`): | |
The non-linear activation function (function or string) in the encoder and pooler. If string, | |
:obj:`"gelu"`, :obj:`"relu"`, :obj:`"silu"` and :obj:`"gelu_new"` are supported. | |
hidden_dropout_prob (:obj:`float`, `optional`, defaults to 0.1): | |
The dropout probability for all fully connected layers in the embeddings, encoder, and pooler. | |
attention_probs_dropout_prob (:obj:`float`, `optional`, defaults to 0.1): | |
The dropout ratio for the attention probabilities. | |
max_position_embeddings (:obj:`int`, `optional`, defaults to 512): | |
The maximum sequence length that this model might ever be used with. Typically set this to something large | |
just in case (e.g., 512 or 1024 or 2048). | |
type_vocab_size (:obj:`int`, `optional`, defaults to 2): | |
The vocabulary size of the :obj:`token_type_ids` passed when calling :class:`~transformers.BertModel` or | |
:class:`~transformers.TFBertModel`. | |
initializer_range (:obj:`float`, `optional`, defaults to 0.02): | |
The standard deviation of the truncated_normal_initializer for initializing all weight matrices. | |
layer_norm_eps (:obj:`float`, `optional`, defaults to 1e-12): | |
The epsilon used by the layer normalization layers. | |
position_embedding_type (:obj:`str`, `optional`, defaults to :obj:`"absolute"`): | |
Type of position embedding. Choose one of :obj:`"absolute"`, :obj:`"relative_key"`, | |
:obj:`"relative_key_query"`. For positional embeddings use :obj:`"absolute"`. For more information on | |
:obj:`"relative_key"`, please refer to `Self-Attention with Relative Position Representations (Shaw et al.) | |
<https://arxiv.org/abs/1803.02155>`__. For more information on :obj:`"relative_key_query"`, please refer to | |
`Method 4` in `Improve Transformer Models with Better Relative Position Embeddings (Huang et al.) | |
<https://arxiv.org/abs/2009.13658>`__. | |
use_cache (:obj:`bool`, `optional`, defaults to :obj:`True`): | |
Whether or not the model should return the last key/values attentions (not used by all models). Only | |
relevant if ``config.is_decoder=True``. | |
use_spatial_distance_embedding (:obj:`bool`, `optional`, defaults to :obj:`True`): | |
Whether or not use spatial_distance_embedding | |
classifier_dropout (:obj:`float`, `optional`): | |
The dropout ratio for the classification head. | |
""" | |
model_type = "bert" | |
def __init__( | |
self, | |
vocab_size=30522, | |
num_semantic_types=97, | |
hidden_size=768, | |
num_hidden_layers=12, | |
num_attention_heads=12, | |
intermediate_size=3072, | |
hidden_act="gelu", | |
hidden_dropout_prob=0.1, | |
attention_probs_dropout_prob=0.1, | |
max_position_embeddings=512, | |
type_vocab_size=2, | |
initializer_range=0.02, | |
layer_norm_eps=1e-12, | |
pad_token_id=0, | |
position_embedding_type="absolute", | |
use_cache=True, | |
use_spatial_distance_embedding = True, | |
classifier_dropout=None, | |
**kwargs | |
): | |
super().__init__(pad_token_id=pad_token_id, **kwargs) | |
self.vocab_size = vocab_size | |
self.hidden_size = hidden_size | |
self.num_hidden_layers = num_hidden_layers | |
self.num_attention_heads = num_attention_heads | |
self.hidden_act = hidden_act | |
self.intermediate_size = intermediate_size | |
self.hidden_dropout_prob = hidden_dropout_prob | |
self.attention_probs_dropout_prob = attention_probs_dropout_prob | |
self.max_position_embeddings = max_position_embeddings | |
self.type_vocab_size = type_vocab_size | |
self.initializer_range = initializer_range | |
self.layer_norm_eps = layer_norm_eps | |
self.position_embedding_type = position_embedding_type | |
self.use_cache = use_cache | |
self.use_spatial_distance_embedding = use_spatial_distance_embedding | |
self.classifier_dropout = classifier_dropout | |
self.num_semantic_types = num_semantic_types | |
class ContinuousSpatialPositionalEmbedding(nn.Module): | |
def __init__(self, hidden_size): | |
super().__init__() | |
self.emb_dim = int(hidden_size/2) # dimension of the embedding | |
inv_freq = 1 / (10000 ** (torch.arange(0.0, self.emb_dim) / self.emb_dim)) #(emb_dim) | |
self.register_buffer("inv_freq", inv_freq) | |
def forward(self, x ): | |
bsz, seq_len = x.shape[0], x.shape[1] # get batch size | |
flat_x = torch.flatten(x) # (bsize, seq_len) -> bsize * seq_len | |
flat_sinusoid_inp = torch.ger(flat_x, self.inv_freq) # outer-product, out_shape: (bsize * seq_len, emb_dim) | |
sinusoid_inp = flat_sinusoid_inp.reshape(bsz, seq_len, self.emb_dim) #(bsize * seq_len, emb_dim) -> (bsize, seq_len, emb_dim) | |
ret_pos_emb = torch.cat([sinusoid_inp.sin(), sinusoid_inp.cos()], dim=-1) # (bsize, seq_len, 2*emb_dim) | |
return ret_pos_emb | |
class SpatialBertPreTrainedModel(PreTrainedModel): | |
""" | |
An abstract class to handle weights initialization and a simple interface for downloading and loading pretrained | |
models. | |
""" | |
config_class = SpatialBertConfig | |
load_tf_weights = load_tf_weights_in_bert | |
base_model_prefix = 'spatialbert' | |
supports_gradient_checkpointing = True | |
_keys_to_ignore_on_load_missing = [r"position_ids"] | |
def _init_weights(self, module): | |
"""Initialize the weights""" | |
if isinstance(module, nn.Linear): | |
# Slightly different from the TF version which uses truncated_normal for initialization | |
# cf https://github.com/pytorch/pytorch/pull/5617 | |
module.weight.data.normal_(mean=0.0, std=self.config.initializer_range) | |
if module.bias is not None: | |
module.bias.data.zero_() | |
elif isinstance(module, nn.Embedding): | |
module.weight.data.normal_(mean=0.0, std=self.config.initializer_range) | |
if module.padding_idx is not None: | |
module.weight.data[module.padding_idx].zero_() | |
elif isinstance(module, nn.LayerNorm): | |
module.bias.data.zero_() | |
module.weight.data.fill_(1.0) | |
def _set_gradient_checkpointing(self, module, value=False): | |
if isinstance(module, BertEncoder): | |
module.gradient_checkpointing = value | |
class SpatialEmbedding(nn.Module): | |
# position_embedding_type controls the type for both sent_position_embedding and spatial_position_embedding | |
def __init__(self, config): | |
super().__init__() | |
self.word_embeddings = nn.Embedding(config.vocab_size, config.hidden_size, padding_idx=config.pad_token_id) | |
self.position_embeddings = nn.Embedding(config.max_position_embeddings, config.hidden_size) | |
self.sent_position_embedding = self.position_embeddings # a trick to simplify the weight loading from Bert | |
self.sent_position_embedding_type = getattr(config, "position_embedding_type", "absolute") | |
self.spatial_position_embedding = ContinuousSpatialPositionalEmbedding(hidden_size = config.hidden_size) | |
self.spatial_position_embedding_type = getattr(config, "position_embedding_type", "absolute") | |
# self.LayerNorm is not snake-cased to stick with TensorFlow model variable name and be able to load | |
# any TensorFlow checkpoint file | |
self.LayerNorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) | |
self.dropout = nn.Dropout(config.hidden_dropout_prob) | |
# position_ids (1, len position emb) is contiguous in memory and exported when serialized | |
self.use_spatial_distance_embedding = config.use_spatial_distance_embedding | |
def forward( | |
self, | |
input_ids=None, | |
sent_position_ids = None, | |
position_list_x=None, | |
position_list_y = None, | |
): | |
input_shape = input_ids.size() | |
seq_length = input_shape[1] | |
embeddings = self.word_embeddings(input_ids) | |
#pdb.set_trace() | |
if self.use_spatial_distance_embedding: | |
if len(position_list_x) != 0 and len(position_list_y) !=0: | |
if self.spatial_position_embedding_type == "absolute": | |
pos_emb_x = self.spatial_position_embedding(position_list_x) | |
pos_emb_y = self.spatial_position_embedding(position_list_y) | |
embeddings += 0.01* pos_emb_x | |
embeddings += 0.01* pos_emb_y | |
else: | |
raise NotImplementedError("Invalid spatial position embedding type") | |
# TODO: if relative, need to look at BertSelfAttention module as well | |
else: | |
pass | |
else: | |
pass | |
if self.sent_position_embedding_type == "absolute": | |
pos_emb_sent = self.sent_position_embedding(sent_position_ids) | |
embeddings += pos_emb_sent | |
else: | |
raise NotImplementedError("Invalid sentence position embedding type") | |
# TODO: if relative, need to look at BertSelfAttention module as well | |
#pdb.set_trace() | |
embeddings = self.LayerNorm(embeddings) | |
embeddings = self.dropout(embeddings) | |
return embeddings | |
class PivotEntityPooler(nn.Module): | |
def __init__(self): | |
super().__init__() | |
def forward(self, hidden_states, pivot_len_list): | |
# We "pool" the model by simply taking the hidden state corresponding | |
# to the tokens of pivot entity | |
bsize = hidden_states.shape[0] | |
tensor_list = [] | |
for i in torch.arange(0, bsize): | |
pivot_token_full = hidden_states[i, 1:pivot_len_list[i]+1] | |
pivot_token_tensor = torch.mean(torch.unsqueeze(pivot_token_full, 0), dim = 1) | |
tensor_list.append(pivot_token_tensor) | |
batch_pivot_tensor = torch.cat(tensor_list, dim = 0) | |
return batch_pivot_tensor | |
class SpatialBertModel(SpatialBertPreTrainedModel): | |
def __init__(self, config, add_pooling_layer=True): | |
super().__init__(config) | |
self.config = config | |
self.embeddings = SpatialEmbedding(config) | |
self.encoder = BertEncoder(config) | |
self.pooler = BertPooler(config) if add_pooling_layer else None | |
self.init_weights() | |
def get_input_embeddings(self): | |
return self.embeddings.word_embeddings | |
def set_input_embeddings(self, value): | |
self.embeddings.word_embeddings = value | |
def _prune_heads(self, heads_to_prune): | |
""" | |
Prunes heads of the model. heads_to_prune: dict of {layer_num: list of heads to prune in this layer} See base | |
class PreTrainedModel | |
""" | |
for layer, heads in heads_to_prune.items(): | |
self.encoder.layer[layer].attention.prune_heads(heads) | |
def forward( | |
self, | |
input_ids=None, | |
attention_mask=None, | |
sent_position_ids = None, | |
position_list_x = None, | |
position_list_y = None, | |
#pivot_len_list = None, | |
head_mask=None, | |
encoder_hidden_states=None, | |
encoder_attention_mask=None, | |
past_key_values=None, | |
use_cache=None, | |
output_attentions=None, | |
output_hidden_states=None, | |
return_dict=None, | |
): | |
output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions | |
output_hidden_states = ( | |
output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states | |
) | |
return_dict = return_dict if return_dict is not None else self.config.use_return_dict | |
if self.config.is_decoder: | |
use_cache = use_cache if use_cache is not None else self.config.use_cache | |
else: | |
use_cache = False | |
assert input_ids is not None | |
input_shape = input_ids.size() | |
batch_size, seq_length = input_shape | |
device = input_ids.device if input_ids is not None else inputs_embeds.device | |
# past_key_values_length | |
past_key_values_length = past_key_values[0][0].shape[2] if past_key_values is not None else 0 | |
if attention_mask is None: | |
attention_mask = torch.ones(((batch_size, seq_length + past_key_values_length)), device=device) | |
# We can provide a self-attention mask of dimensions [batch_size, from_seq_length, to_seq_length] | |
# ourselves in which case we just need to make it broadcastable to all heads. | |
extended_attention_mask: torch.Tensor = self.get_extended_attention_mask(attention_mask, input_shape, device) | |
# If a 2D or 3D attention mask is provided for the cross-attention | |
# we need to make broadcastable to [batch_size, num_heads, seq_length, seq_length] | |
if self.config.is_decoder and encoder_hidden_states is not None: | |
encoder_batch_size, encoder_sequence_length, _ = encoder_hidden_states.size() | |
encoder_hidden_shape = (encoder_batch_size, encoder_sequence_length) | |
if encoder_attention_mask is None: | |
encoder_attention_mask = torch.ones(encoder_hidden_shape, device=device) | |
encoder_extended_attention_mask = self.invert_attention_mask(encoder_attention_mask) | |
else: | |
encoder_extended_attention_mask = None | |
# Prepare head mask if needed | |
# 1.0 in head_mask indicate we keep the head | |
# attention_probs has shape bsz x n_heads x N x N | |
# input head_mask has shape [num_heads] or [num_hidden_layers x num_heads] | |
# and head_mask is converted to shape [num_hidden_layers x batch x num_heads x seq_length x seq_length] | |
head_mask = self.get_head_mask(head_mask, self.config.num_hidden_layers) | |
embedding_output = self.embeddings( | |
input_ids=input_ids, | |
sent_position_ids = sent_position_ids, | |
position_list_x= position_list_x, | |
position_list_y = position_list_y, | |
) | |
encoder_outputs = self.encoder( | |
embedding_output, | |
attention_mask=extended_attention_mask, | |
head_mask=head_mask, | |
encoder_hidden_states=encoder_hidden_states, | |
encoder_attention_mask=encoder_extended_attention_mask, | |
past_key_values=past_key_values, | |
use_cache=use_cache, | |
output_attentions=output_attentions, | |
output_hidden_states=output_hidden_states, | |
return_dict=return_dict, | |
) | |
sequence_output = encoder_outputs[0] | |
pooled_output = self.pooler(sequence_output) if self.pooler is not None else None | |
if not return_dict: | |
return (sequence_output, pooled_output) + encoder_outputs[1:] | |
return BaseModelOutputWithPoolingAndCrossAttentions( | |
last_hidden_state=sequence_output, | |
pooler_output=pooled_output, | |
past_key_values=encoder_outputs.past_key_values, | |
hidden_states=encoder_outputs.hidden_states, | |
attentions=encoder_outputs.attentions, | |
cross_attentions=encoder_outputs.cross_attentions, | |
) | |
class SpatialBertPredictionHeadTransform(nn.Module): | |
def __init__(self, config): | |
super().__init__() | |
self.dense = nn.Linear(config.hidden_size, config.hidden_size) | |
if isinstance(config.hidden_act, str): | |
self.transform_act_fn = ACT2FN[config.hidden_act] | |
else: | |
self.transform_act_fn = config.hidden_act | |
self.LayerNorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps) | |
def forward(self, hidden_states): | |
hidden_states = self.dense(hidden_states) | |
hidden_states = self.transform_act_fn(hidden_states) | |
hidden_states = self.LayerNorm(hidden_states) | |
return hidden_states | |
class SpatialBertLMPredictionHead(nn.Module): | |
def __init__(self, config): | |
super().__init__() | |
self.transform = SpatialBertPredictionHeadTransform(config) | |
# The output weights are the same as the input embeddings, but there is | |
# an output-only bias for each token. | |
self.decoder = nn.Linear(config.hidden_size, config.vocab_size, bias=False) | |
self.bias = nn.Parameter(torch.zeros(config.vocab_size)) | |
# Need a link between the two variables so that the bias is correctly resized with `resize_token_embeddings` | |
self.decoder.bias = self.bias | |
def forward(self, hidden_states): | |
hidden_states = self.transform(hidden_states) | |
hidden_states = self.decoder(hidden_states) | |
return hidden_states | |
class SpatialBertOnlyMLMHead(nn.Module): | |
def __init__(self, config): | |
super().__init__() | |
self.predictions = SpatialBertLMPredictionHead(config) | |
def forward(self, sequence_output): | |
prediction_scores = self.predictions(sequence_output) | |
return prediction_scores | |
class SpatialBertOnlyTypingHead(nn.Module): | |
def __init__(self, config): | |
super().__init__() | |
self.dense = nn.Linear(config.hidden_size, config.hidden_size) | |
self.activation = nn.Tanh() | |
self.seq_relationship = nn.Linear(config.hidden_size, config.num_semantic_types) | |
def forward(self, pivot_pooled_output): | |
pivot_pooled_output = self.dense(pivot_pooled_output) | |
pivot_pooled_output = self.activation(pivot_pooled_output) | |
seq_relationship_score = self.seq_relationship(pivot_pooled_output) | |
return seq_relationship_score | |
class SpatialBertPreTrainingHeads(nn.Module): | |
def __init__(self, config): | |
super().__init__() | |
self.predictions = SpatialBertLMPredictionHead(config) | |
self.seq_relationship = nn.Linear(config.hidden_size, config.num_semantic_types) | |
def forward(self, sequence_output, pooled_output): | |
prediction_scores = self.predictions(sequence_output) | |
seq_relationship_score = self.seq_relationship(pooled_output) | |
return prediction_scores, seq_relationship_score | |
class SpatialBertForMaskedLM(SpatialBertPreTrainedModel): | |
_keys_to_ignore_on_load_unexpected = [r"pooler"] | |
_keys_to_ignore_on_load_missing = [r"position_ids", r"predictions.decoder.bias"] | |
def __init__(self, config): | |
super().__init__(config) | |
if config.is_decoder: | |
logger.warning( | |
"If you want to use `SpatialBertForMaskedLM` make sure `config.is_decoder=False` for " | |
"bi-directional self-attention." | |
) | |
self.bert = SpatialBertModel(config, add_pooling_layer=False) | |
self.cls = SpatialBertOnlyMLMHead(config) | |
self.init_weights() | |
def forward( | |
self, | |
input_ids=None, | |
attention_mask=None, | |
sent_position_ids = None, | |
position_list_x = None, | |
position_list_y = None, | |
head_mask=None, | |
encoder_attention_mask=None, | |
labels=None, | |
output_attentions=None, | |
output_hidden_states=None, | |
return_dict=None, | |
): | |
r""" | |
labels (:obj:`torch.LongTensor` of shape :obj:`(batch_size, sequence_length)`, `optional`): | |
Labels for computing the masked language modeling loss. Indices should be in ``[-100, 0, ..., | |
config.vocab_size]`` (see ``input_ids`` docstring) Tokens with indices set to ``-100`` are ignored | |
(masked), the loss is only computed for the tokens with labels in ``[0, ..., config.vocab_size]`` | |
""" | |
return_dict = return_dict if return_dict is not None else self.config.use_return_dict | |
outputs = self.bert( | |
input_ids, | |
attention_mask=attention_mask, | |
sent_position_ids = sent_position_ids, | |
position_list_x = position_list_x, | |
position_list_y = position_list_y, | |
head_mask=head_mask, | |
encoder_attention_mask=encoder_attention_mask, | |
output_attentions=output_attentions, | |
output_hidden_states=output_hidden_states, | |
return_dict=return_dict, | |
) | |
sequence_output = outputs[0] | |
prediction_scores = self.cls(sequence_output) | |
masked_lm_loss = None | |
if labels is not None: | |
loss_fct = CrossEntropyLoss() # -100 index = padding token | |
masked_lm_loss = loss_fct(prediction_scores.view(-1, self.config.vocab_size), labels.view(-1)) | |
if not return_dict: | |
output = (prediction_scores,) + outputs[2:] | |
pdb.set_trace() | |
print('inside MLM', output.shape) | |
return ((masked_lm_loss,) + output) if masked_lm_loss is not None else output | |
return MaskedLMOutput( | |
loss=masked_lm_loss, | |
logits=prediction_scores, | |
hidden_states=outputs.hidden_states, | |
attentions=outputs.attentions, | |
) | |
def prepare_inputs_for_generation(self, input_ids, attention_mask=None, **model_kwargs): | |
input_shape = input_ids.shape | |
effective_batch_size = input_shape[0] | |
# add a dummy token | |
assert self.config.pad_token_id is not None, "The PAD token should be defined for generation" | |
attention_mask = torch.cat([attention_mask, attention_mask.new_zeros((attention_mask.shape[0], 1))], dim=-1) | |
dummy_token = torch.full( | |
(effective_batch_size, 1), self.config.pad_token_id, dtype=torch.long, device=input_ids.device | |
) | |
input_ids = torch.cat([input_ids, dummy_token], dim=1) | |
return {"input_ids": input_ids, "attention_mask": attention_mask} | |
class SpatialBertForSemanticTyping(SpatialBertPreTrainedModel): | |
def __init__(self, config): | |
super().__init__(config) | |
self.bert = SpatialBertModel(config) | |
self.pivot_pooler = PivotEntityPooler() | |
self.num_semantic_types = config.num_semantic_types | |
self.cls = SpatialBertOnlyTypingHead(config) | |
self.init_weights() | |
def forward( | |
self, | |
input_ids=None, | |
sent_position_ids = None, | |
position_list_x = None, | |
position_list_y = None, | |
pivot_len_list = None, | |
attention_mask=None, | |
head_mask=None, | |
labels=None, | |
output_attentions=None, | |
output_hidden_states=None, | |
return_dict=None, | |
): | |
return_dict = return_dict if return_dict is not None else self.config.use_return_dict | |
outputs = self.bert( | |
input_ids, | |
attention_mask=attention_mask, | |
sent_position_ids = sent_position_ids, | |
position_list_x = position_list_x, | |
position_list_y = position_list_y, | |
head_mask=head_mask, | |
output_attentions=output_attentions, | |
output_hidden_states=output_hidden_states, | |
return_dict=return_dict, | |
) | |
sequence_output = outputs[0] | |
pooled_output = self.pivot_pooler(sequence_output, pivot_len_list) | |
type_prediction_score = self.cls(pooled_output) | |
typing_loss = None | |
if labels is not None: | |
loss_fct = CrossEntropyLoss() | |
typing_loss = loss_fct(type_prediction_score.view(-1, self.num_semantic_types), labels.view(-1)) | |
if not return_dict: | |
output = (type_prediction_score,) + outputs[2:] | |
return ((typing_loss,) + output) if typing_loss is not None else output | |
return SequenceClassifierOutput( | |
loss=typing_loss, | |
logits=type_prediction_score, | |
hidden_states = pooled_output, | |
attentions=outputs.attentions, | |
) | |
class SpatialBertForMarginRanking(SpatialBertPreTrainedModel): | |
def __init__(self, config): | |
super().__init__(config) | |
self.bert = SpatialBertModel(config) | |
self.pivot_pooler = PivotEntityPooler() | |
self.init_weights() | |
def forward( | |
self, | |
geo_entity_data, | |
positive_type_data, | |
negative_type_data, | |
head_mask=None, | |
output_attentions=None, | |
output_hidden_states=None, | |
return_dict=None, | |
): | |
input_ids = geo_entity_data['pseudo_sentence'].to(device) | |
attention_mask = geo_entity_data['attention_mask'].to(device) | |
position_list_x = geo_entity_data['norm_lng_list'].to(device) | |
position_list_y = geo_entity_data['norm_lat_list'].to(device) | |
sent_position_ids = geo_entity_data['sent_position_ids'].to(device) | |
pivot_lens = batch['pivot_token_len'].to(device) | |
entity_outputs = model(input_ids, attention_mask = attention_mask, sent_position_ids = sent_position_ids, | |
position_list_x = position_list_x, position_list_y = position_list_y).pooler_output | |
input_ids = positive_type_data['pseudo_sentence'].to(device) | |
attention_mask = positive_type_data['attention_mask'].to(device) | |
position_list_x = positive_type_data['norm_lng_list'].to(device) | |
position_list_y = positive_type_data['norm_lat_list'].to(device) | |
sent_position_ids = positive_type_data['sent_position_ids'].to(device) | |
positive_outputs = model(input_ids, attention_mask = attention_mask, sent_position_ids = sent_position_ids, | |
position_list_x = position_list_x, position_list_y = position_list_y).pooler_output | |
input_ids = negative_type_data['pseudo_sentence'].to(device) | |
attention_mask = negative_type_data['attention_mask'].to(device) | |
position_list_x = negative_type_data['norm_lng_list'].to(device) | |
position_list_y = negative_type_data['norm_lat_list'].to(device) | |
sent_position_ids = negative_type_data['sent_position_ids'].to(device) | |
negative_outputs = model(input_ids, attention_mask = attention_mask, sent_position_ids = sent_position_ids, | |
position_list_x = position_list_x, position_list_y = position_list_y).pooler_output | |
return_dict = return_dict if return_dict is not None else self.config.use_return_dict | |
outputs = self.bert( | |
input_ids, | |
attention_mask=attention_mask, | |
sent_position_ids = sent_position_ids, | |
position_list_x = position_list_x, | |
position_list_y = position_list_y, | |
head_mask=head_mask, | |
output_attentions=output_attentions, | |
output_hidden_states=output_hidden_states, | |
return_dict=return_dict, | |
) | |
sequence_output = outputs[0] | |
pooled_output = self.pivot_pooler(sequence_output, pivot_len_list) | |
typing_loss = None | |
if labels is not None: | |
loss_fct = CrossEntropyLoss() | |
typing_loss = loss_fct(type_prediction_score.view(-1, self.num_semantic_types), labels.view(-1)) | |
if not return_dict: | |
output = (type_prediction_score,) + outputs[2:] | |
return ((typing_loss,) + output) if typing_loss is not None else output | |
return SequenceClassifierOutput( | |
loss=typing_loss, | |
logits=type_prediction_score, | |
hidden_states = pooled_output, | |
attentions=outputs.attentions, | |
) | |