mart9992's picture
m
06ba6ce
raw
history blame
26.2 kB
# coding=utf-8
# Copyright 2022 The Trajectory Transformers paper authors and The HuggingFace Inc. team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" PyTorch TrajectoryTransformer model."""
import math
import os
from dataclasses import dataclass
from typing import Optional, Tuple, Union
import numpy as np
import torch
import torch.utils.checkpoint
from torch import nn
from torch.nn import functional as F
from ....modeling_utils import PreTrainedModel
from ....utils import (
ModelOutput,
add_start_docstrings,
add_start_docstrings_to_model_forward,
logging,
replace_return_docstrings,
)
from .configuration_trajectory_transformer import TrajectoryTransformerConfig
logger = logging.get_logger(__name__)
_CHECKPOINT_FOR_DOC = "CarlCochet/trajectory-transformer-halfcheetah-medium-v2"
_CONFIG_FOR_DOC = "TrajectoryTransformerConfig"
TRAJECTORY_TRANSFORMER_PRETRAINED_MODEL_ARCHIVE_LIST = [
"CarlCochet/trajectory-transformer-halfcheetah-medium-v2",
# See all TrajectoryTransformer models at https://huggingface.co/models?filter=trajectory_transformer
]
def load_tf_weights_in_trajectory_transformer(model, config, tf_checkpoint_path):
"""Load tf checkpoints in a pytorch model."""
try:
import re
import numpy as np
import tensorflow as tf
except ImportError:
logger.error(
"Loading a TensorFlow model in PyTorch, requires TensorFlow to be installed. Please see "
"https://www.tensorflow.org/install/ for installation instructions."
)
raise
tf_path = os.path.abspath(tf_checkpoint_path)
logger.info(f"Converting TensorFlow checkpoint from {tf_path}")
# Load weights from TF model
init_vars = tf.train.list_variables(tf_path)
names = []
arrays = []
for name, shape in init_vars:
logger.info(f"Loading TF weight {name} with shape {shape}")
array = tf.train.load_variable(tf_path, name)
names.append(name)
arrays.append(array)
for name, array in zip(names, arrays):
name = name.split("/")
# adam_v and adam_m are variables used in AdamWeightDecayOptimizer to calculated m and v
# which are not required for using pretrained model
if any(
n in ["adam_v", "adam_m", "AdamWeightDecayOptimizer", "AdamWeightDecayOptimizer_1", "global_step"]
for n in name
):
logger.info(f"Skipping {'/'.join(name)}")
continue
pointer = model
for m_name in name:
if re.fullmatch(r"[A-Za-z]+_\d+", m_name):
scope_names = re.split(r"_(\d+)", m_name)
else:
scope_names = [m_name]
if scope_names[0] == "kernel" or scope_names[0] == "gamma":
pointer = getattr(pointer, "weight")
elif scope_names[0] == "output_bias" or scope_names[0] == "beta":
pointer = getattr(pointer, "bias")
elif scope_names[0] == "output_weights":
pointer = getattr(pointer, "weight")
elif scope_names[0] == "squad":
pointer = getattr(pointer, "classifier")
else:
try:
pointer = getattr(pointer, scope_names[0])
except AttributeError:
logger.info(f"Skipping {'/'.join(name)}")
continue
if len(scope_names) >= 2:
num = int(scope_names[1])
pointer = pointer[num]
if m_name[-11:] == "_embeddings":
pointer = getattr(pointer, "weight")
elif m_name == "kernel":
array = np.transpose(array)
try:
if pointer.shape != array.shape:
raise ValueError(f"Pointer shape {pointer.shape} and array shape {array.shape} mismatched")
except AssertionError as e:
e.args += (pointer.shape, array.shape)
raise
logger.info(f"Initialize PyTorch weight {name}")
pointer.data = torch.from_numpy(array)
return model
@dataclass
class TrajectoryTransformerOutput(ModelOutput):
"""
Base class for model's outputs that also contains a pooling of the last hidden states.
Args:
loss (`torch.FloatTensor` of shape `(1,)`, *optional*, returned when `labels` is provided):
Language modeling loss.
logits (`torch.FloatTensor` of shape `(batch_size, sequence_length, config.vocab_size)`):
Prediction scores of the language modeling head (scores for each vocabulary token before SoftMax).
past_key_values (`Tuple[Tuple[torch.Tensor]]`, *optional*, returned when `use_cache=True` is passed or when `config.use_cache=True`):
Tuple of length `config.n_layers`, containing tuples of tensors of shape `(batch_size, num_heads,
sequence_length, embed_size_per_head)`). Contains pre-computed hidden-states (key and values in the
attention blocks) that can be used (see `past_key_values` input) to speed up sequential decoding.
hidden_states (`tuple(torch.FloatTensor)`, *optional*, returned when `output_hidden_states=True` is passed or when `config.output_hidden_states=True`):
Tuple of `torch.FloatTensor` (one for the output of the embeddings + one for the output of each layer) of
shape `(batch_size, sequence_length, hidden_size)`. Hidden-states of the model at the output of each layer
plus the initial embedding outputs.
attentions (`tuple(torch.FloatTensor)`, *optional*, returned when `output_attentions=True` is passed or when `config.output_attentions=True`):
Tuple of `torch.FloatTensor` (one for each layer) of shape `(batch_size, num_heads, sequence_length,
sequence_length)`. GPT2Attentions weights after the attention softmax, used to compute the weighted average
in the self-attention heads.
"""
loss: Optional[torch.FloatTensor] = None
logits: torch.FloatTensor = None
past_key_values: Optional[Tuple[Tuple[torch.FloatTensor]]] = None
hidden_states: Optional[Tuple[torch.FloatTensor]] = None
attentions: Optional[Tuple[torch.FloatTensor]] = None
class TrajectoryTransformerPreTrainedModel(PreTrainedModel):
"""
An abstract class to handle weights initialization and a simple interface for downloading and loading pretrained
models.
"""
config_class = TrajectoryTransformerConfig
load_tf_weights = load_tf_weights_in_trajectory_transformer
base_model_prefix = "trajectory_transformer"
main_input_name = "trajectories"
supports_gradient_checkpointing = True
def _set_gradient_checkpointing(self, module, value=False):
if isinstance(module, TrajectoryTransformerModel):
module.gradient_checkpointing = value
def _init_weights(self, module):
if isinstance(module, (nn.Linear, nn.Embedding)):
module.weight.data.normal_(mean=0.0, std=self.config.initializer_range)
if isinstance(module, nn.Linear) and module.bias is not None:
module.bias.data.zero_()
elif isinstance(module, nn.LayerNorm):
module.bias.data.zero_()
module.weight.data.fill_(1.0)
elif isinstance(module, EinLinear):
for i in range(module.n_models):
nn.init.kaiming_uniform_(module.weight[i], a=math.sqrt(5) / self.config.kaiming_initializer_range)
if module.bias is not None:
fan_in, _ = nn.init._calculate_fan_in_and_fan_out(module.weight[i])
bound = (1 / math.sqrt(fan_in)) * self.config.initializer_range
nn.init.uniform_(module.bias[i], -bound, bound)
TRAJECTORY_TRANSFORMER_START_DOCSTRING = r"""
This model is a PyTorch [torch.nn.Module](https://pytorch.org/docs/stable/nn.html#torch.nn.Module) sub-class. Use
it as a regular PyTorch Module and refer to the PyTorch documentation for all matter related to general usage and
behavior.
Parameters:
config ([`TrajectoryTransformerConfig`]): Model configuration class with all the parameters of the model.
Initializing with a config file does not load the weights associated with the model, only the
configuration. Check out the [`~PreTrainedModel.from_pretrained`] method to load the model weights.
"""
TRAJECTORY_TRANSFORMER_INPUTS_DOCSTRING = r"""
Args:
trajectories (`torch.LongTensor` of shape `(batch_size, sequence_length)`):
Batch of trajectories, where a trajectory is a sequence of states, actions and rewards.
past_key_values (`Tuple[Tuple[torch.Tensor]]` of length `config.n_layers`, *optional*):
Contains precomputed hidden-states (key and values in the attention blocks) as computed by the model (see
`past_key_values` output below). Can be used to speed up sequential decoding. The `input_ids` which have
their past given to this model should not be passed as `input_ids` as they have already been computed.
targets (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*):
Desired targets used to compute the loss.
attention_mask (`torch.FloatTensor` of shape `(batch_size, sequence_length)`, *optional*):
Mask to avoid performing attention on padding token indices. Mask values selected in `[0, 1]`:
- 1 for tokens that are **not masked**,
- 0 for tokens that are **masked**.
[What are attention masks?](../glossary#attention-mask)
use_cache (`bool`, *optional*):
If set to `True`, `past_key_values` key value states are returned and can be used to speed up decoding (see
`past_key_values`).
output_attentions (`bool`, *optional*):
Whether or not to return the attentions tensors of all attention layers. See `attentions` under returned
tensors for more detail.
output_hidden_states (`bool`, *optional*):
Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors for
more detail.
return_dict (`bool`, *optional*):
Whether or not to return a [`~utils.ModelOutput`] instead of a plain tuple.
"""
class EinLinear(nn.Module):
def __init__(self, n_models, in_features, out_features, bias):
super().__init__()
self.n_models = n_models
self.out_features = out_features
self.in_features = in_features
self.weight = nn.Parameter(torch.Tensor(n_models, out_features, in_features))
if bias:
self.bias = nn.Parameter(torch.Tensor(n_models, out_features))
else:
self.register_parameter("bias", None)
def reset_parameters(self):
for i in range(self.n_models):
nn.init.kaiming_uniform_(self.weight[i], a=math.sqrt(5))
if self.bias is not None:
fan_in, _ = nn.init._calculate_fan_in_and_fan_out(self.weight[i])
bound = 1 / math.sqrt(fan_in)
nn.init.uniform_(self.bias[i], -bound, bound)
def forward(self, input):
"""
Args:
input (`torch.FloatTensor` of shape `(B, n_models, input_dim)`):
The input to the layer.
"""
# [ batch_size x n_models x output_dim ]
output = torch.einsum("eoi,bei->beo", self.weight, input)
if self.bias is not None:
raise RuntimeError()
return output
class CausalSelfAttention(nn.Module):
def __init__(self, config):
super().__init__()
if config.n_embd % config.n_head != 0:
raise ValueError(f"n_head ({config.n_head}) should be a divisor of n_embd ({config.n_embd})")
# key, query, value projections for all heads
self.key = nn.Linear(config.n_embd, config.n_embd)
self.query = nn.Linear(config.n_embd, config.n_embd)
self.value = nn.Linear(config.n_embd, config.n_embd)
# regularization
self.attn_drop = nn.Dropout(config.attn_pdrop)
self.resid_drop = nn.Dropout(config.resid_pdrop)
# output projection
self.proj = nn.Linear(config.n_embd, config.n_embd)
# causal mask to ensure that attention is only applied to the left in the input sequence
self.register_buffer(
"mask",
torch.tril(torch.ones(config.block_size, config.block_size)).view(
1, 1, config.block_size, config.block_size
),
persistent=False,
)
# mask previous value estimates
joined_dim = config.observation_dim + config.action_dim + 2
self.mask.squeeze()[:, joined_dim - 1 :: joined_dim] = 0
self.n_head = config.n_head
def forward(
self,
hidden_states: Optional[Tuple[torch.FloatTensor]],
layer_past: Optional[Tuple[torch.Tensor]] = None,
use_cache: Optional[bool] = False,
output_attentions: Optional[bool] = False,
):
batch_size, sequence_length, embedding_dim = hidden_states.size()
# calculate query, key, values for all heads in batch and move head forward to be the batch dim
# [ batch_size x n_heads x sequence_length x head_dim ]
key = (
self.key(hidden_states)
.view(batch_size, sequence_length, self.n_head, embedding_dim // self.n_head)
.transpose(1, 2)
)
query = (
self.query(hidden_states)
.view(batch_size, sequence_length, self.n_head, embedding_dim // self.n_head)
.transpose(1, 2)
)
value = (
self.value(hidden_states)
.view(batch_size, sequence_length, self.n_head, embedding_dim // self.n_head)
.transpose(1, 2)
)
if layer_past is not None:
past_key, past_value = layer_past
key = torch.cat((past_key, key), dim=-2)
value = torch.cat((past_value, value), dim=-2)
if use_cache is True:
present = (key, value)
else:
present = None
# causal self-attention
# [ batch_size x n_heads x sequence_length x sequence_length ]
attn_weights = (torch.matmul(query, key.transpose(-2, -1))) * (1.0 / math.sqrt(key.size(-1)))
attn_weights = attn_weights.masked_fill(
self.mask[:, :, :sequence_length, :sequence_length] == 0, torch.finfo(attn_weights.dtype).min
)
attn_weights = F.softmax(attn_weights, dim=-1)
self._attn_map = attn_weights.clone()
attn_weights = self.attn_drop(attn_weights)
output = torch.matmul(attn_weights, value)
# [ batch_size x sequence_length x embedding_dim ]
# re-assemble all head outputs side by side
output = output.transpose(1, 2).contiguous().view(batch_size, sequence_length, embedding_dim)
# output projection
output = self.resid_drop(self.proj(output))
outputs = (output, present)
if output_attentions:
outputs += (attn_weights,)
return outputs
class Block(nn.Module):
def __init__(self, config):
super().__init__()
self.ln1 = nn.LayerNorm(config.n_embd)
self.ln2 = nn.LayerNorm(config.n_embd)
self.attn = CausalSelfAttention(config)
# MLP
self.l1 = nn.Linear(config.n_embd, 4 * config.n_embd)
self.act = nn.GELU()
self.l2 = nn.Linear(4 * config.n_embd, config.n_embd)
self.drop = nn.Dropout(config.resid_pdrop)
def forward(
self,
hidden_states: Optional[Tuple[torch.FloatTensor]],
layer_past: Optional[Tuple[torch.Tensor]] = None,
use_cache: Optional[bool] = False,
output_attentions: Optional[bool] = False,
):
residual = hidden_states
hidden_states = self.ln1(hidden_states)
attn_outputs = self.attn(
hidden_states, layer_past=layer_past, use_cache=use_cache, output_attentions=output_attentions
)
attn_output = attn_outputs[0]
outputs = attn_outputs[1:]
hidden_states = attn_output + residual
residual = hidden_states
hidden_states = self.ln2(hidden_states)
hidden_states = self.l1(hidden_states)
hidden_states = self.act(hidden_states)
hidden_states = self.l2(hidden_states)
hidden_states = residual + self.drop(hidden_states)
if use_cache:
outputs = (hidden_states,) + outputs
else:
outputs = (hidden_states,) + outputs[1:]
return outputs
@add_start_docstrings(
"The bare TrajectoryTransformer Model transformer outputting raw hidden-states without any specific head on top.",
TRAJECTORY_TRANSFORMER_START_DOCSTRING,
)
class TrajectoryTransformerModel(TrajectoryTransformerPreTrainedModel):
"""the full GPT language model, with a context size of block_size"""
def __init__(self, config):
super().__init__(config)
# input embedding stem (+1 for stop token)
self.tok_emb = nn.Embedding(config.vocab_size * config.transition_dim + 1, config.n_embd)
self.pos_emb = nn.Parameter(torch.zeros(1, config.block_size, config.n_embd))
self.drop = nn.Dropout(config.embd_pdrop)
# transformer
self.blocks = nn.ModuleList([Block(config) for _ in range(config.n_layer)])
# decoder head
self.ln_f = nn.LayerNorm(config.n_embd)
self.head = EinLinear(config.transition_dim, config.n_embd, config.vocab_size + 1, bias=False)
self.vocab_size = config.vocab_size
self.stop_token = config.vocab_size * config.transition_dim
self.block_size = config.block_size
self.observation_dim = config.observation_dim
self.action_dim = config.action_dim
self.transition_dim = config.transition_dim
self.embedding_dim = config.n_embd
self.action_weight = config.action_weight
self.reward_weight = config.reward_weight
self.value_weight = config.value_weight
self.gradient_checkpointing = False
self.post_init()
def get_block_size(self):
return self.block_size
def offset_tokens(self, trajectories):
_, sequence_length = trajectories.shape
n_states = int(np.ceil(sequence_length / self.transition_dim))
offsets = torch.arange(self.transition_dim) * self.vocab_size
offsets = offsets.repeat(n_states).to(trajectories.device)
offset_trajectories = trajectories + offsets[:sequence_length]
offset_trajectories[trajectories == self.vocab_size] = self.stop_token
return offset_trajectories
def pad_to_full_observation(self, hidden_states):
batch_size, sequence_length, _ = hidden_states.shape
n_pad = (self.transition_dim - sequence_length % self.transition_dim) % self.transition_dim
padding = torch.zeros(batch_size, n_pad, self.embedding_dim, device=hidden_states.device)
# [ batch_size x padded_sequence_length' x embedding_dim ]
hidden_states_pad = torch.cat([hidden_states, padding], dim=1)
hidden_states_pad = hidden_states_pad.view(-1, self.transition_dim, self.embedding_dim)
return hidden_states_pad, n_pad
@add_start_docstrings_to_model_forward(
TRAJECTORY_TRANSFORMER_INPUTS_DOCSTRING.format("batch_size, sequence_length")
)
@replace_return_docstrings(output_type=TrajectoryTransformerOutput, config_class=_CONFIG_FOR_DOC)
def forward(
self,
trajectories: Optional[torch.LongTensor] = None,
past_key_values: Optional[Tuple[Tuple[torch.Tensor]]] = None,
targets: Optional[torch.FloatTensor] = None,
attention_mask: Optional[torch.FloatTensor] = None,
use_cache: Optional[bool] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
) -> Union[Tuple[torch.Tensor], TrajectoryTransformerOutput]:
r"""
Returns:
Examples:
```python
>>> from transformers import TrajectoryTransformerModel
>>> import torch
>>> model = TrajectoryTransformerModel.from_pretrained(
... "CarlCochet/trajectory-transformer-halfcheetah-medium-v2"
... )
>>> model.to(device)
>>> model.eval()
>>> observations_dim, action_dim, batch_size = 17, 6, 256
>>> seq_length = observations_dim + action_dim + 1
>>> trajectories = torch.LongTensor([np.random.permutation(self.seq_length) for _ in range(batch_size)]).to(
... device
... )
>>> targets = torch.LongTensor([np.random.permutation(self.seq_length) for _ in range(batch_size)]).to(device)
>>> outputs = model(
... trajectories,
... targets=targets,
... use_cache=True,
... output_attentions=True,
... output_hidden_states=True,
... return_dict=True,
... )
```
"""
output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
output_hidden_states = (
output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
)
if past_key_values is None:
past_key_values = tuple([None] * len(self.blocks))
batch_size, sequence_length = trajectories.size()
if sequence_length > self.block_size:
raise ValueError("Cannot forward, model block size is exhausted.")
offset_trajectories = self.offset_tokens(trajectories)
# [ batch_size x sequence_length x embedding_dim ]
# forward the GPT model
token_embeddings = self.tok_emb(offset_trajectories) # each index maps to a (learnable) vector
position_embeddings = self.pos_emb[:, :sequence_length, :] # each position maps to a (learnable) vector
hidden_states = self.drop(token_embeddings + position_embeddings)
if self.gradient_checkpointing and self.training:
if use_cache:
logger.warning_once(
"`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`..."
)
use_cache = False
presents = () if use_cache else None
all_self_attentions = () if output_attentions else None
all_hidden_states = () if output_hidden_states else None
for i, (block, layer_past) in enumerate(zip(self.blocks, past_key_values)):
if output_hidden_states:
all_hidden_states = all_hidden_states + (hidden_states,)
if self.gradient_checkpointing and self.training:
def create_custom_forward(module):
def custom_forward(*inputs):
return module(*inputs)
return custom_forward
outputs = torch.utils.checkpoint.checkpoint(
create_custom_forward(block),
hidden_states,
layer_past,
use_cache,
output_attentions,
)
else:
outputs = block(hidden_states, layer_past, use_cache, output_attentions)
hidden_states = outputs[0]
if use_cache is True:
presents = presents + (outputs[1],)
if output_attentions:
all_self_attentions = all_self_attentions + (outputs[2 if use_cache else 1],)
# [ batch_size x sequence_length x embedding_dim ]
hidden_state = self.ln_f(hidden_states)
if output_hidden_states:
all_hidden_states = all_hidden_states + (hidden_states,)
hidden_states_pad, n_pad = self.pad_to_full_observation(hidden_state)
logits = self.head(hidden_states_pad)
logits = logits.reshape(batch_size, sequence_length + n_pad, self.vocab_size + 1)
logits = logits[:, :sequence_length]
# if we are given some desired targets also calculate the loss
if targets is not None:
loss = F.cross_entropy(logits.reshape(-1, logits.size(-1)), targets.view(-1), reduction="none")
if self.action_weight != 1 or self.reward_weight != 1 or self.value_weight != 1:
# make weights
n_states = int(np.ceil(sequence_length / self.transition_dim))
weights = torch.cat(
[
torch.ones(self.observation_dim, device=trajectories.device),
torch.ones(self.action_dim, device=trajectories.device) * self.action_weight,
torch.ones(1, device=trajectories.device) * self.reward_weight,
torch.ones(1, device=trajectories.device) * self.value_weight,
]
)
weights = weights.repeat(n_states)
weights = weights[1:].repeat(batch_size, 1)
loss = loss * weights.view(-1)
loss = (loss * attention_mask.view(-1)).mean()
else:
loss = None
if not return_dict:
return tuple(v for v in [loss, logits, presents, all_hidden_states, all_self_attentions] if v is not None)
return TrajectoryTransformerOutput(
loss=loss,
logits=logits,
past_key_values=presents,
hidden_states=all_hidden_states,
attentions=all_self_attentions,
)