Spaces:
Runtime error
Runtime error
# Copyright (c) OpenMMLab. All rights reserved. | |
# Copyright 2018 The Google AI Language Team Authors and The HuggingFace Inc. | |
# Copyright (c) 2018, NVIDIA CORPORATION. All rights reserved. | |
# flake8: noqa | |
import math | |
import torch | |
from torch import nn | |
from torch.utils.checkpoint import checkpoint | |
try: | |
from transformers.models.bert.configuration_bert import BertConfig | |
except: | |
BertConfig = None | |
from mmpretrain.registry import MODELS | |
from ..blip.language_model import BertAttention, BertIntermediate, BertOutput | |
def gelu(x): | |
"""Original Implementation of the gelu activation function in Google Bert | |
repo when initially created. | |
For information: OpenAI GPT's gelu is slightly different (and gives | |
slightly different results): | |
0.5 * x * (1 + torch.tanh(math.sqrt(2 / math.pi) * (x + 0.044715 * torch.pow(x, 3)))) | |
Also see https://arxiv.org/abs/1606.08415 | |
""" # noqa | |
return x * 0.5 * (1.0 + torch.erf(x / math.sqrt(2.0))) | |
def gelu_new(x): | |
"""Implementation of the gelu activation function currently in Google Bert | |
repo (identical to OpenAI GPT) https://arxiv.org/abs/1606.08415.""" | |
return 0.5 * x * (1 + torch.tanh( | |
math.sqrt(2 / math.pi) * (x + 0.044715 * torch.pow(x, 3)))) | |
def swish(x): | |
return x * torch.sigmoid(x) | |
ACT2FN = { | |
'gelu': gelu, | |
'relu': torch.nn.functional.relu, | |
'swish': swish, | |
'gelu_new': gelu_new | |
} | |
class BertEmbeddings(nn.Module): | |
"""Construct the embeddings from word, position and token_type | |
embeddings.""" | |
def __init__(self, config): | |
super(BertEmbeddings, self).__init__() | |
self.word_embeddings = nn.Embedding( | |
config.vocab_size, config.hidden_size, padding_idx=0) | |
self.position_embeddings = nn.Embedding(config.max_position_embeddings, | |
config.hidden_size) | |
self.token_type_embeddings = nn.Embedding(config.type_vocab_size, | |
config.hidden_size) | |
# self.LayerNorm is not snake-cased to stick with TensorFlow model | |
# variable name and be able to load any TensorFlow checkpoint file | |
self.LayerNorm = nn.LayerNorm( | |
config.hidden_size, eps=config.layer_norm_eps) | |
self.dropout = nn.Dropout(config.hidden_dropout_prob) | |
def forward(self, input_ids, token_type_ids=None, position_ids=None): | |
seq_length = input_ids.size(1) | |
if position_ids is None: | |
position_ids = torch.arange( | |
seq_length, dtype=torch.long, device=input_ids.device) | |
position_ids = position_ids.unsqueeze(0).expand_as(input_ids) | |
if token_type_ids is None: | |
token_type_ids = torch.zeros_like(input_ids) | |
words_embeddings = self.word_embeddings(input_ids) | |
position_embeddings = self.position_embeddings(position_ids) | |
token_type_embeddings = self.token_type_embeddings(token_type_ids) | |
embeddings = words_embeddings + position_embeddings \ | |
+ token_type_embeddings | |
embeddings = self.LayerNorm(embeddings) | |
embeddings = self.dropout(embeddings) | |
return embeddings | |
class BertLayer(nn.Module): | |
def __init__(self, config): | |
super(BertLayer, self).__init__() | |
self.attention = BertAttention(config) | |
self.intermediate = BertIntermediate(config) | |
self.output = BertOutput(config) | |
def forward(self, hidden_states, attention_mask=None, head_mask=None): | |
attention_outputs = self.attention(hidden_states, attention_mask, | |
head_mask) | |
attention_output = attention_outputs[0] | |
intermediate_output = self.intermediate(attention_output) | |
layer_output = self.output(intermediate_output, attention_output) | |
outputs = (layer_output, ) + attention_outputs[ | |
1:] # add attentions if we output them | |
if len(outputs) == 1: | |
return outputs[0] | |
return outputs | |
class BertEncoder(nn.Module): | |
def __init__(self, config): | |
super(BertEncoder, self).__init__() | |
self.output_attentions = config.output_attentions | |
self.output_hidden_states = config.output_hidden_states | |
self.grad_checkpointing = False | |
self.layer = nn.ModuleList( | |
[BertLayer(config) for _ in range(config.num_hidden_layers)]) | |
def forward(self, hidden_states, attention_mask=None, head_mask=None): | |
all_hidden_states = () | |
all_attentions = () | |
for i, layer_module in enumerate(self.layer): | |
if self.output_hidden_states: | |
all_hidden_states = all_hidden_states + (hidden_states, ) | |
if self.grad_checkpointing and not torch.jit.is_scripting(): | |
layer_outputs = checkpoint(layer_module, hidden_states, | |
attention_mask, head_mask[i]) | |
else: | |
layer_outputs = layer_module(hidden_states, attention_mask, | |
head_mask[i]) | |
if not isinstance(layer_outputs, tuple): | |
layer_outputs = (layer_outputs, ) | |
hidden_states = layer_outputs[0] | |
if self.output_attentions: | |
all_attentions = all_attentions + (layer_outputs[1], ) | |
# Add last layer | |
if self.output_hidden_states: | |
all_hidden_states = all_hidden_states + (hidden_states, ) | |
outputs = (hidden_states, ) | |
if self.output_hidden_states: | |
outputs = outputs + (all_hidden_states, ) | |
if self.output_attentions: | |
outputs = outputs + (all_attentions, ) | |
# last-layer hidden state, (all hidden states), (all attentions) | |
return outputs | |
class BertPreTrainedModel(nn.Module): | |
base_model_prefix = 'bert' | |
def __init__(self, config): | |
super(BertPreTrainedModel, self).__init__() | |
self.config = config | |
def _init_weights(self, module): | |
"""Initialize the weights.""" | |
if isinstance(module, (nn.Linear, nn.Embedding)): | |
# Slightly different from the TF version | |
# which uses truncated_normal for initialization | |
# cf https://github.com/pytorch/pytorch/pull/5617 | |
module.weight.data.normal_( | |
mean=0.0, std=self.config.initializer_range) | |
elif isinstance(module, nn.LayerNorm): | |
module.bias.data.zero_() | |
module.weight.data.fill_(1.0) | |
if isinstance(module, nn.Linear) and module.bias is not None: | |
module.bias.data.zero_() | |
class BertModelCN(BertPreTrainedModel): | |
"""The BERT model implementation for Chinese CLIP.""" | |
def __init__(self, config): | |
config = BertConfig.from_dict(config) | |
super(BertModelCN, self).__init__(config) | |
self.embeddings = BertEmbeddings(config) | |
self.encoder = BertEncoder(config) | |
self.apply(self._init_weights) | |
def set_grad_checkpointing(self, enable=True): | |
if enable: | |
assert not self.config.output_attentions, \ | |
'Grad checkpointing is currently conflict with ' \ | |
'output_attentions for BertEncoder, ' \ | |
'please set it to False in BertConfig' | |
self.encoder.grad_checkpointing = enable | |
def forward(self, | |
input_ids, | |
attention_mask=None, | |
token_type_ids=None, | |
position_ids=None, | |
head_mask=None): | |
if attention_mask is None: | |
attention_mask = torch.ones_like(input_ids) | |
if token_type_ids is None: | |
token_type_ids = torch.zeros_like(input_ids) | |
# We create a 3D attention mask from a 2D tensor mask. | |
# Sizes are [batch_size, 1, 1, to_seq_length] | |
# So we can broadcast to [batch_size, num_heads, from_seq_length, to_seq_length] | |
# this attention mask is more simple than the triangular masking of causal attention | |
# used in OpenAI GPT, we just need to prepare the broadcast dimension here. | |
extended_attention_mask = attention_mask.unsqueeze(1).unsqueeze(2) | |
# Since attention_mask is 1.0 for positions we want to attend and 0.0 for | |
# masked positions, this operation will create a tensor which is 0.0 for | |
# positions we want to attend and -10000.0 for masked positions. | |
# Since we are adding it to the raw scores before the softmax, this is | |
# effectively the same as removing these entirely. | |
extended_attention_mask = extended_attention_mask.to( | |
dtype=next(self.parameters()).dtype) # fp16 compatibility | |
extended_attention_mask = (1.0 - extended_attention_mask) * -10000.0 | |
# Prepare head mask if needed | |
# 1.0 in head_mask indicate we keep the head | |
# attention_probs has shape bsz x n_heads x N x N | |
# input head_mask has shape [num_heads] or [num_hidden_layers x num_heads] | |
# and head_mask is converted to shape [num_hidden_layers x batch x num_heads x seq_length x seq_length] | |
if head_mask is not None: | |
if head_mask.dim() == 1: | |
head_mask = head_mask.unsqueeze(0).unsqueeze(0).unsqueeze( | |
-1).unsqueeze(-1) | |
head_mask = head_mask.expand(self.config.num_hidden_layers, -1, | |
-1, -1, -1) | |
elif head_mask.dim() == 2: | |
head_mask = head_mask.unsqueeze(1).unsqueeze(-1).unsqueeze( | |
-1) # We can specify head_mask for each layer | |
head_mask = head_mask.to(dtype=next(self.parameters( | |
)).dtype) # switch to fload if need + fp16 compatibility | |
else: | |
head_mask = [None] * self.config.num_hidden_layers | |
embedding_output = self.embeddings( | |
input_ids, | |
position_ids=position_ids, | |
token_type_ids=token_type_ids) | |
encoder_outputs = self.encoder( | |
embedding_output, extended_attention_mask, head_mask=head_mask) | |
sequence_output = encoder_outputs[0] | |
# pooled_output = self.pooler(sequence_output) | |
pooled_output = None | |
# add hidden_states and attentions if they are here | |
outputs = ( | |
sequence_output, | |
pooled_output, | |
) + encoder_outputs[1:] | |
# sequence_output, pooled_output, (hidden_states), (attentions) | |
return outputs | |