| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | """PyTorch BERT model.""" |
| |
|
| | from __future__ import absolute_import |
| | from __future__ import division |
| | from __future__ import print_function |
| |
|
| | import os |
| | import copy |
| | import json |
| | import math |
| | import logging |
| | import tarfile |
| | import tempfile |
| | import shutil |
| |
|
| | import torch |
| | from torch import nn |
| | import torch.nn.functional as F |
| | from .file_utils import cached_path |
| | from .until_config import PretrainedConfig |
| | from .until_module import PreTrainedModel, LayerNorm, ACT2FN |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | PRETRAINED_MODEL_ARCHIVE_MAP = { |
| | 'bert-base-uncased': "https://s3.amazonaws.com/models.huggingface.co/bert/bert-base-uncased.tar.gz", |
| | 'bert-large-uncased': "https://s3.amazonaws.com/models.huggingface.co/bert/bert-large-uncased.tar.gz", |
| | 'bert-base-cased': "https://s3.amazonaws.com/models.huggingface.co/bert/bert-base-cased.tar.gz", |
| | 'bert-large-cased': "https://s3.amazonaws.com/models.huggingface.co/bert/bert-large-cased.tar.gz", |
| | 'bert-base-multilingual-uncased': "https://s3.amazonaws.com/models.huggingface.co/bert/bert-base-multilingual-uncased.tar.gz", |
| | 'bert-base-multilingual-cased': "https://s3.amazonaws.com/models.huggingface.co/bert/bert-base-multilingual-cased.tar.gz", |
| | 'bert-base-chinese': "https://s3.amazonaws.com/models.huggingface.co/bert/bert-base-chinese.tar.gz", |
| | } |
| |
|
| | CONFIG_NAME = 'bert_config.json' |
| | WEIGHTS_NAME = 'pytorch_model.bin' |
| |
|
| |
|
| | class BertConfig(PretrainedConfig): |
| | """Configuration class to store the configuration of a `BertModel`. |
| | """ |
| | pretrained_model_archive_map = PRETRAINED_MODEL_ARCHIVE_MAP |
| | config_name = CONFIG_NAME |
| | weights_name = WEIGHTS_NAME |
| |
|
| | def __init__(self, |
| | vocab_size_or_config_json_file, |
| | hidden_size=768, |
| | num_hidden_layers=12, |
| | num_attention_heads=12, |
| | intermediate_size=3072, |
| | hidden_act="gelu", |
| | hidden_dropout_prob=0.1, |
| | attention_probs_dropout_prob=0.1, |
| | max_position_embeddings=512, |
| | type_vocab_size=2, |
| | initializer_range=0.02): |
| | """Constructs BertConfig. |
| | |
| | Args: |
| | vocab_size_or_config_json_file: Vocabulary size of `inputs_ids` in `BertModel`. |
| | hidden_size: Size of the encoder layers and the pooler layer. |
| | num_hidden_layers: Number of hidden layers in the Transformer encoder. |
| | num_attention_heads: Number of attention heads for each attention layer in |
| | the Transformer encoder. |
| | intermediate_size: The size of the "intermediate" (i.e., feed-forward) |
| | layer in the Transformer encoder. |
| | hidden_act: The non-linear activation function (function or string) in the |
| | encoder and pooler. If string, "gelu", "relu" and "swish" are supported. |
| | hidden_dropout_prob: The dropout probabilitiy for all fully connected |
| | layers in the embeddings, encoder, and pooler. |
| | attention_probs_dropout_prob: The dropout ratio for the attention |
| | probabilities. |
| | max_position_embeddings: The maximum sequence length that this model might |
| | ever be used with. Typically set this to something large just in case |
| | (e.g., 512 or 1024 or 2048). |
| | type_vocab_size: The vocabulary size of the `token_type_ids` passed into |
| | `BertModel`. |
| | initializer_range: The sttdev of the truncated_normal_initializer for |
| | initializing all weight matrices. |
| | """ |
| | if isinstance(vocab_size_or_config_json_file, str): |
| | with open(vocab_size_or_config_json_file, "r", encoding='utf-8') as reader: |
| | json_config = json.loads(reader.read()) |
| | for key, value in json_config.items(): |
| | self.__dict__[key] = value |
| | elif isinstance(vocab_size_or_config_json_file, int): |
| | self.vocab_size = vocab_size_or_config_json_file |
| | self.hidden_size = hidden_size |
| | self.num_hidden_layers = num_hidden_layers |
| | self.num_attention_heads = num_attention_heads |
| | self.hidden_act = hidden_act |
| | self.intermediate_size = intermediate_size |
| | self.hidden_dropout_prob = hidden_dropout_prob |
| | self.attention_probs_dropout_prob = attention_probs_dropout_prob |
| | self.max_position_embeddings = max_position_embeddings |
| | self.type_vocab_size = type_vocab_size |
| | self.initializer_range = initializer_range |
| | else: |
| | raise ValueError("First argument must be either a vocabulary size (int)" |
| | "or the path to a pretrained model config file (str)") |
| |
|
| | class BertEmbeddings(nn.Module): |
| | """Construct the embeddings from word, position and token_type embeddings. |
| | """ |
| | def __init__(self, config): |
| | super(BertEmbeddings, self).__init__() |
| | self.word_embeddings = nn.Embedding(config.vocab_size, config.hidden_size) |
| | self.position_embeddings = nn.Embedding(config.max_position_embeddings, config.hidden_size) |
| | self.token_type_embeddings = nn.Embedding(config.type_vocab_size, config.hidden_size) |
| |
|
| | |
| | |
| | self.LayerNorm = LayerNorm(config.hidden_size, eps=1e-12) |
| | self.dropout = nn.Dropout(config.hidden_dropout_prob) |
| |
|
| | def forward(self, input_ids, token_type_ids=None): |
| | seq_length = input_ids.size(1) |
| | position_ids = torch.arange(seq_length, dtype=torch.long, device=input_ids.device) |
| | position_ids = position_ids.unsqueeze(0).expand_as(input_ids) |
| | if token_type_ids is None: |
| | token_type_ids = torch.zeros_like(input_ids) |
| |
|
| | words_embeddings = self.word_embeddings(input_ids) |
| | position_embeddings = self.position_embeddings(position_ids) |
| | token_type_embeddings = self.token_type_embeddings(token_type_ids) |
| |
|
| | embeddings = words_embeddings + position_embeddings + token_type_embeddings |
| | embeddings = self.LayerNorm(embeddings) |
| | embeddings = self.dropout(embeddings) |
| | return embeddings |
| |
|
| |
|
| | class BertSelfAttention(nn.Module): |
| | def __init__(self, config): |
| | super(BertSelfAttention, self).__init__() |
| | if config.hidden_size % config.num_attention_heads != 0: |
| | raise ValueError( |
| | "The hidden size (%d) is not a multiple of the number of attention " |
| | "heads (%d)" % (config.hidden_size, config.num_attention_heads)) |
| | self.num_attention_heads = config.num_attention_heads |
| | self.attention_head_size = int(config.hidden_size / config.num_attention_heads) |
| | self.all_head_size = self.num_attention_heads * self.attention_head_size |
| |
|
| | self.query = nn.Linear(config.hidden_size, self.all_head_size) |
| | self.key = nn.Linear(config.hidden_size, self.all_head_size) |
| | self.value = nn.Linear(config.hidden_size, self.all_head_size) |
| |
|
| | self.dropout = nn.Dropout(config.attention_probs_dropout_prob) |
| |
|
| | def transpose_for_scores(self, x): |
| | new_x_shape = x.size()[:-1] + (self.num_attention_heads, self.attention_head_size) |
| | x = x.view(*new_x_shape) |
| | return x.permute(0, 2, 1, 3) |
| |
|
| | def forward(self, hidden_states, attention_mask): |
| | mixed_query_layer = self.query(hidden_states) |
| | mixed_key_layer = self.key(hidden_states) |
| | mixed_value_layer = self.value(hidden_states) |
| |
|
| | query_layer = self.transpose_for_scores(mixed_query_layer) |
| | key_layer = self.transpose_for_scores(mixed_key_layer) |
| | value_layer = self.transpose_for_scores(mixed_value_layer) |
| |
|
| | |
| | attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2)) |
| | attention_scores = attention_scores / math.sqrt(self.attention_head_size) |
| | |
| | attention_scores = attention_scores + attention_mask |
| |
|
| | |
| | attention_probs = nn.Softmax(dim=-1)(attention_scores) |
| |
|
| | |
| | |
| | attention_probs = self.dropout(attention_probs) |
| |
|
| | context_layer = torch.matmul(attention_probs, value_layer) |
| | context_layer = context_layer.permute(0, 2, 1, 3).contiguous() |
| | new_context_layer_shape = context_layer.size()[:-2] + (self.all_head_size,) |
| | context_layer = context_layer.view(*new_context_layer_shape) |
| | return context_layer |
| |
|
| |
|
| | class BertSelfOutput(nn.Module): |
| | def __init__(self, config): |
| | super(BertSelfOutput, self).__init__() |
| | self.dense = nn.Linear(config.hidden_size, config.hidden_size) |
| | self.LayerNorm = LayerNorm(config.hidden_size, eps=1e-12) |
| | self.dropout = nn.Dropout(config.hidden_dropout_prob) |
| |
|
| | def forward(self, hidden_states, input_tensor): |
| | hidden_states = self.dense(hidden_states) |
| | hidden_states = self.dropout(hidden_states) |
| | hidden_states = self.LayerNorm(hidden_states + input_tensor) |
| | return hidden_states |
| |
|
| |
|
| | class BertAttention(nn.Module): |
| | def __init__(self, config): |
| | super(BertAttention, self).__init__() |
| | self.self = BertSelfAttention(config) |
| | self.output = BertSelfOutput(config) |
| |
|
| | def forward(self, input_tensor, attention_mask): |
| | self_output = self.self(input_tensor, attention_mask) |
| | attention_output = self.output(self_output, input_tensor) |
| | return attention_output |
| |
|
| |
|
| | class BertIntermediate(nn.Module): |
| | def __init__(self, config): |
| | super(BertIntermediate, self).__init__() |
| | self.dense = nn.Linear(config.hidden_size, config.intermediate_size) |
| | self.intermediate_act_fn = ACT2FN[config.hidden_act] \ |
| | if isinstance(config.hidden_act, str) else config.hidden_act |
| |
|
| | def forward(self, hidden_states): |
| | hidden_states = self.dense(hidden_states) |
| | hidden_states = self.intermediate_act_fn(hidden_states) |
| | return hidden_states |
| |
|
| |
|
| | class BertOutput(nn.Module): |
| | def __init__(self, config): |
| | super(BertOutput, self).__init__() |
| | self.dense = nn.Linear(config.intermediate_size, config.hidden_size) |
| | self.LayerNorm = LayerNorm(config.hidden_size, eps=1e-12) |
| | self.dropout = nn.Dropout(config.hidden_dropout_prob) |
| |
|
| | def forward(self, hidden_states, input_tensor): |
| | hidden_states = self.dense(hidden_states) |
| | hidden_states = self.dropout(hidden_states) |
| | hidden_states = self.LayerNorm(hidden_states + input_tensor) |
| | return hidden_states |
| |
|
| |
|
| | class BertLayer(nn.Module): |
| | def __init__(self, config): |
| | super(BertLayer, self).__init__() |
| | self.attention = BertAttention(config) |
| | self.intermediate = BertIntermediate(config) |
| | self.output = BertOutput(config) |
| |
|
| | def forward(self, hidden_states, attention_mask): |
| | attention_output = self.attention(hidden_states, attention_mask) |
| | intermediate_output = self.intermediate(attention_output) |
| | layer_output = self.output(intermediate_output, attention_output) |
| | return layer_output |
| |
|
| |
|
| | class BertEncoder(nn.Module): |
| | def __init__(self, config): |
| | super(BertEncoder, self).__init__() |
| | layer = BertLayer(config) |
| | self.layer = nn.ModuleList([copy.deepcopy(layer) for _ in range(config.num_hidden_layers)]) |
| |
|
| | def forward(self, hidden_states, attention_mask, output_all_encoded_layers=True): |
| | all_encoder_layers = [] |
| | for layer_module in self.layer: |
| | hidden_states = layer_module(hidden_states, attention_mask) |
| | if output_all_encoded_layers: |
| | all_encoder_layers.append(hidden_states) |
| | if not output_all_encoded_layers: |
| | all_encoder_layers.append(hidden_states) |
| | return all_encoder_layers |
| |
|
| |
|
| | class BertPooler(nn.Module): |
| | def __init__(self, config): |
| | super(BertPooler, self).__init__() |
| | self.dense = nn.Linear(config.hidden_size, config.hidden_size) |
| | self.activation = nn.Tanh() |
| |
|
| | def forward(self, hidden_states): |
| | |
| | |
| | first_token_tensor = hidden_states[:, 0] |
| | pooled_output = self.dense(first_token_tensor) |
| | pooled_output = self.activation(pooled_output) |
| | return pooled_output |
| |
|
| |
|
| | class BertPredictionHeadTransform(nn.Module): |
| | def __init__(self, config): |
| | super(BertPredictionHeadTransform, self).__init__() |
| | self.dense = nn.Linear(config.hidden_size, config.hidden_size) |
| | self.transform_act_fn = ACT2FN[config.hidden_act] \ |
| | if isinstance(config.hidden_act, str) else config.hidden_act |
| | self.LayerNorm = LayerNorm(config.hidden_size, eps=1e-12) |
| |
|
| | def forward(self, hidden_states): |
| | hidden_states = self.dense(hidden_states) |
| | hidden_states = self.transform_act_fn(hidden_states) |
| | hidden_states = self.LayerNorm(hidden_states) |
| | return hidden_states |
| |
|
| |
|
| | class BertLMPredictionHead(nn.Module): |
| | def __init__(self, config, bert_model_embedding_weights): |
| | super(BertLMPredictionHead, self).__init__() |
| | self.transform = BertPredictionHeadTransform(config) |
| |
|
| | |
| | |
| | self.decoder = nn.Linear(bert_model_embedding_weights.size(1), |
| | bert_model_embedding_weights.size(0), |
| | bias=False) |
| | self.decoder.weight = bert_model_embedding_weights |
| | self.bias = nn.Parameter(torch.zeros(bert_model_embedding_weights.size(0))) |
| |
|
| | def forward(self, hidden_states): |
| | hidden_states = self.transform(hidden_states) |
| | hidden_states = self.decoder(hidden_states) + self.bias |
| | return hidden_states |
| |
|
| |
|
| | class BertOnlyMLMHead(nn.Module): |
| | def __init__(self, config, bert_model_embedding_weights): |
| | super(BertOnlyMLMHead, self).__init__() |
| | self.predictions = BertLMPredictionHead(config, bert_model_embedding_weights) |
| |
|
| | def forward(self, sequence_output): |
| | prediction_scores = self.predictions(sequence_output) |
| | return prediction_scores |
| |
|
| |
|
| | class BertOnlyNSPHead(nn.Module): |
| | def __init__(self, config): |
| | super(BertOnlyNSPHead, self).__init__() |
| | self.seq_relationship = nn.Linear(config.hidden_size, 2) |
| |
|
| | def forward(self, pooled_output): |
| | seq_relationship_score = self.seq_relationship(pooled_output) |
| | return seq_relationship_score |
| |
|
| |
|
| | class BertPreTrainingHeads(nn.Module): |
| | def __init__(self, config, bert_model_embedding_weights): |
| | super(BertPreTrainingHeads, self).__init__() |
| | self.predictions = BertLMPredictionHead(config, bert_model_embedding_weights) |
| | self.seq_relationship = nn.Linear(config.hidden_size, 2) |
| |
|
| | def forward(self, sequence_output, pooled_output): |
| | prediction_scores = self.predictions(sequence_output) |
| | seq_relationship_score = self.seq_relationship(pooled_output) |
| | return prediction_scores, seq_relationship_score |
| |
|
| | class BertModel(PreTrainedModel): |
| | """BERT model ("Bidirectional Embedding Representations from a Transformer"). |
| | |
| | Params: |
| | config: a BertConfig class instance with the configuration to build a new model |
| | |
| | Inputs: |
| | `type`: a str, indicates which masking will be used in the attention, choice from [`bi`, `seq`, `gen`] |
| | `input_ids`: a torch.LongTensor of shape [batch_size, sequence_length] |
| | with the word token indices in the vocabulary(see the tokens preprocessing logic in the scripts |
| | `extract_features.py`, `run_classifier.py` and `run_squad.py`) |
| | `token_type_ids`: an optional torch.LongTensor of shape [batch_size, sequence_length] with the token |
| | types indices selected in [0, 1]. Type 0 corresponds to a `sentence A` and type 1 corresponds to |
| | a `sentence B` token (see BERT paper for more details). |
| | `attention_mask`: an optional torch.LongTensor of shape [batch_size, sequence_length] with indices |
| | selected in [0, 1]. It's a mask to be used if the input sequence length is smaller than the max |
| | input sequence length in the current batch. It's the mask that we typically use for attention when |
| | a batch has varying length sentences. |
| | `output_all_encoded_layers`: boolean which controls the content of the `encoded_layers` output as described below. Default: `True`. |
| | |
| | Outputs: Tuple of (encoded_layers, pooled_output) |
| | `encoded_layers`: controled by `output_all_encoded_layers` argument: |
| | - `output_all_encoded_layers=True`: outputs a list of the full sequences of encoded-hidden-states at the end |
| | of each attention block (i.e. 12 full sequences for BERT-base, 24 for BERT-large), each |
| | encoded-hidden-state is a torch.FloatTensor of size [batch_size, sequence_length, hidden_size], |
| | - `output_all_encoded_layers=False`: outputs only the full sequence of hidden-states corresponding |
| | to the last attention block of shape [batch_size, sequence_length, hidden_size], |
| | `pooled_output`: a torch.FloatTensor of size [batch_size, hidden_size] which is the output of a |
| | classifier pretrained on top of the hidden state associated to the first character of the |
| | input (`CLF`) to train on the Next-Sentence task (see BERT's paper). |
| | |
| | Example usage: |
| | ```python |
| | # Already been converted into WordPiece token ids |
| | input_ids = torch.LongTensor([[31, 51, 99], [15, 5, 0]]) |
| | input_mask = torch.LongTensor([[1, 1, 1], [1, 1, 0]]) |
| | token_type_ids = torch.LongTensor([[0, 0, 1], [0, 1, 0]]) |
| | |
| | config = modeling.BertConfig(vocab_size_or_config_json_file=32000, hidden_size=768, |
| | num_hidden_layers=12, num_attention_heads=12, intermediate_size=3072) |
| | |
| | model = modeling.BertModel(config=config) |
| | all_encoder_layers, pooled_output = model(input_ids, token_type_ids, input_mask) |
| | ``` |
| | """ |
| | def __init__(self, config): |
| | super(BertModel, self).__init__(config) |
| | self.embeddings = BertEmbeddings(config) |
| | self.encoder = BertEncoder(config) |
| | self.pooler = BertPooler(config) |
| | self.apply(self.init_weights) |
| |
|
| |
|
| | def forward(self, input_ids, token_type_ids=None, attention_mask=None, output_all_encoded_layers=True): |
| |
|
| | if attention_mask is None: |
| | attention_mask = torch.ones_like(input_ids) |
| | if token_type_ids is None: |
| | token_type_ids = torch.zeros_like(input_ids) |
| |
|
| | |
| | |
| | |
| | |
| | |
| | extended_attention_mask = attention_mask.unsqueeze(1).unsqueeze(2) |
| |
|
| | |
| | |
| | |
| | |
| | |
| | extended_attention_mask = extended_attention_mask.to(dtype=self.dtype) |
| | extended_attention_mask = (1.0 - extended_attention_mask) * -10000.0 |
| |
|
| | embedding_output = self.embeddings(input_ids, token_type_ids) |
| | encoded_layers = self.encoder(embedding_output, |
| | extended_attention_mask, |
| | output_all_encoded_layers=output_all_encoded_layers) |
| | sequence_output = encoded_layers[-1] |
| | pooled_output = self.pooler(sequence_output) |
| | if not output_all_encoded_layers: |
| | encoded_layers = encoded_layers[-1] |
| | return encoded_layers, pooled_output |