KyanChen's picture
Upload 303 files
4d0eb62
raw
history blame
10.5 kB
# Copyright (c) OpenMMLab. All rights reserved.
# Copyright 2018 The Google AI Language Team Authors and The HuggingFace Inc.
# Copyright (c) 2018, NVIDIA CORPORATION. All rights reserved.
# flake8: noqa
import math
import torch
from torch import nn
from torch.utils.checkpoint import checkpoint
try:
from transformers.models.bert.configuration_bert import BertConfig
except:
BertConfig = None
from mmpretrain.registry import MODELS
from ..blip.language_model import BertAttention, BertIntermediate, BertOutput
def gelu(x):
"""Original Implementation of the gelu activation function in Google Bert
repo when initially created.
For information: OpenAI GPT's gelu is slightly different (and gives
slightly different results):
0.5 * x * (1 + torch.tanh(math.sqrt(2 / math.pi) * (x + 0.044715 * torch.pow(x, 3))))
Also see https://arxiv.org/abs/1606.08415
""" # noqa
return x * 0.5 * (1.0 + torch.erf(x / math.sqrt(2.0)))
def gelu_new(x):
"""Implementation of the gelu activation function currently in Google Bert
repo (identical to OpenAI GPT) https://arxiv.org/abs/1606.08415."""
return 0.5 * x * (1 + torch.tanh(
math.sqrt(2 / math.pi) * (x + 0.044715 * torch.pow(x, 3))))
def swish(x):
return x * torch.sigmoid(x)
ACT2FN = {
'gelu': gelu,
'relu': torch.nn.functional.relu,
'swish': swish,
'gelu_new': gelu_new
}
class BertEmbeddings(nn.Module):
"""Construct the embeddings from word, position and token_type
embeddings."""
def __init__(self, config):
super(BertEmbeddings, self).__init__()
self.word_embeddings = nn.Embedding(
config.vocab_size, config.hidden_size, padding_idx=0)
self.position_embeddings = nn.Embedding(config.max_position_embeddings,
config.hidden_size)
self.token_type_embeddings = nn.Embedding(config.type_vocab_size,
config.hidden_size)
# self.LayerNorm is not snake-cased to stick with TensorFlow model
# variable name and be able to load any TensorFlow checkpoint file
self.LayerNorm = nn.LayerNorm(
config.hidden_size, eps=config.layer_norm_eps)
self.dropout = nn.Dropout(config.hidden_dropout_prob)
def forward(self, input_ids, token_type_ids=None, position_ids=None):
seq_length = input_ids.size(1)
if position_ids is None:
position_ids = torch.arange(
seq_length, dtype=torch.long, device=input_ids.device)
position_ids = position_ids.unsqueeze(0).expand_as(input_ids)
if token_type_ids is None:
token_type_ids = torch.zeros_like(input_ids)
words_embeddings = self.word_embeddings(input_ids)
position_embeddings = self.position_embeddings(position_ids)
token_type_embeddings = self.token_type_embeddings(token_type_ids)
embeddings = words_embeddings + position_embeddings \
+ token_type_embeddings
embeddings = self.LayerNorm(embeddings)
embeddings = self.dropout(embeddings)
return embeddings
class BertLayer(nn.Module):
def __init__(self, config):
super(BertLayer, self).__init__()
self.attention = BertAttention(config)
self.intermediate = BertIntermediate(config)
self.output = BertOutput(config)
def forward(self, hidden_states, attention_mask=None, head_mask=None):
attention_outputs = self.attention(hidden_states, attention_mask,
head_mask)
attention_output = attention_outputs[0]
intermediate_output = self.intermediate(attention_output)
layer_output = self.output(intermediate_output, attention_output)
outputs = (layer_output, ) + attention_outputs[
1:] # add attentions if we output them
if len(outputs) == 1:
return outputs[0]
return outputs
class BertEncoder(nn.Module):
def __init__(self, config):
super(BertEncoder, self).__init__()
self.output_attentions = config.output_attentions
self.output_hidden_states = config.output_hidden_states
self.grad_checkpointing = False
self.layer = nn.ModuleList(
[BertLayer(config) for _ in range(config.num_hidden_layers)])
def forward(self, hidden_states, attention_mask=None, head_mask=None):
all_hidden_states = ()
all_attentions = ()
for i, layer_module in enumerate(self.layer):
if self.output_hidden_states:
all_hidden_states = all_hidden_states + (hidden_states, )
if self.grad_checkpointing and not torch.jit.is_scripting():
layer_outputs = checkpoint(layer_module, hidden_states,
attention_mask, head_mask[i])
else:
layer_outputs = layer_module(hidden_states, attention_mask,
head_mask[i])
if not isinstance(layer_outputs, tuple):
layer_outputs = (layer_outputs, )
hidden_states = layer_outputs[0]
if self.output_attentions:
all_attentions = all_attentions + (layer_outputs[1], )
# Add last layer
if self.output_hidden_states:
all_hidden_states = all_hidden_states + (hidden_states, )
outputs = (hidden_states, )
if self.output_hidden_states:
outputs = outputs + (all_hidden_states, )
if self.output_attentions:
outputs = outputs + (all_attentions, )
# last-layer hidden state, (all hidden states), (all attentions)
return outputs
class BertPreTrainedModel(nn.Module):
base_model_prefix = 'bert'
def __init__(self, config):
super(BertPreTrainedModel, self).__init__()
self.config = config
def _init_weights(self, module):
"""Initialize the weights."""
if isinstance(module, (nn.Linear, nn.Embedding)):
# Slightly different from the TF version
# which uses truncated_normal for initialization
# cf https://github.com/pytorch/pytorch/pull/5617
module.weight.data.normal_(
mean=0.0, std=self.config.initializer_range)
elif isinstance(module, nn.LayerNorm):
module.bias.data.zero_()
module.weight.data.fill_(1.0)
if isinstance(module, nn.Linear) and module.bias is not None:
module.bias.data.zero_()
@MODELS.register_module()
class BertModelCN(BertPreTrainedModel):
"""The BERT model implementation for Chinese CLIP."""
def __init__(self, config):
config = BertConfig.from_dict(config)
super(BertModelCN, self).__init__(config)
self.embeddings = BertEmbeddings(config)
self.encoder = BertEncoder(config)
self.apply(self._init_weights)
@torch.jit.ignore
def set_grad_checkpointing(self, enable=True):
if enable:
assert not self.config.output_attentions, \
'Grad checkpointing is currently conflict with ' \
'output_attentions for BertEncoder, ' \
'please set it to False in BertConfig'
self.encoder.grad_checkpointing = enable
def forward(self,
input_ids,
attention_mask=None,
token_type_ids=None,
position_ids=None,
head_mask=None):
if attention_mask is None:
attention_mask = torch.ones_like(input_ids)
if token_type_ids is None:
token_type_ids = torch.zeros_like(input_ids)
# We create a 3D attention mask from a 2D tensor mask.
# Sizes are [batch_size, 1, 1, to_seq_length]
# So we can broadcast to [batch_size, num_heads, from_seq_length, to_seq_length]
# this attention mask is more simple than the triangular masking of causal attention
# used in OpenAI GPT, we just need to prepare the broadcast dimension here.
extended_attention_mask = attention_mask.unsqueeze(1).unsqueeze(2)
# Since attention_mask is 1.0 for positions we want to attend and 0.0 for
# masked positions, this operation will create a tensor which is 0.0 for
# positions we want to attend and -10000.0 for masked positions.
# Since we are adding it to the raw scores before the softmax, this is
# effectively the same as removing these entirely.
extended_attention_mask = extended_attention_mask.to(
dtype=next(self.parameters()).dtype) # fp16 compatibility
extended_attention_mask = (1.0 - extended_attention_mask) * -10000.0
# Prepare head mask if needed
# 1.0 in head_mask indicate we keep the head
# attention_probs has shape bsz x n_heads x N x N
# input head_mask has shape [num_heads] or [num_hidden_layers x num_heads]
# and head_mask is converted to shape [num_hidden_layers x batch x num_heads x seq_length x seq_length]
if head_mask is not None:
if head_mask.dim() == 1:
head_mask = head_mask.unsqueeze(0).unsqueeze(0).unsqueeze(
-1).unsqueeze(-1)
head_mask = head_mask.expand(self.config.num_hidden_layers, -1,
-1, -1, -1)
elif head_mask.dim() == 2:
head_mask = head_mask.unsqueeze(1).unsqueeze(-1).unsqueeze(
-1) # We can specify head_mask for each layer
head_mask = head_mask.to(dtype=next(self.parameters(
)).dtype) # switch to fload if need + fp16 compatibility
else:
head_mask = [None] * self.config.num_hidden_layers
embedding_output = self.embeddings(
input_ids,
position_ids=position_ids,
token_type_ids=token_type_ids)
encoder_outputs = self.encoder(
embedding_output, extended_attention_mask, head_mask=head_mask)
sequence_output = encoder_outputs[0]
# pooled_output = self.pooler(sequence_output)
pooled_output = None
# add hidden_states and attentions if they are here
outputs = (
sequence_output,
pooled_output,
) + encoder_outputs[1:]
# sequence_output, pooled_output, (hidden_states), (attentions)
return outputs