Spaces:
Sleeping
Sleeping
# -*- coding: utf-8 -*- | |
# @Time : 2023/5/6 3:53 p.m. | |
# @Author : JianingWang | |
# @File : actor.py | |
from typing import Optional, Tuple, Union | |
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
from transformers import AutoModelForCausalLM, AutoConfig | |
from models.basic_modules.generation import generate | |
""" | |
Actor model. | |
""" | |
class Actor(nn.Module): | |
""" | |
Actor model base class. | |
Args: | |
model (nn.Module): Actor Model. | |
""" | |
def __init__(self, model: nn.Module) -> None: | |
self.model = model | |
def log_probs_from_logits(logits: torch.Tensor, labels: torch.Tensor) -> torch.Tensor: | |
log_probs = F.log_softmax(logits, dim=-1) | |
log_probs_labels = log_probs.gather(dim=-1, index=labels.unsqueeze(-1)) | |
return log_probs_labels.squeeze(-1) | |
""" | |
For generative model, needs generate function. | |
""" | |
def generate( | |
self, | |
input_ids: torch.Tensor, | |
return_action_mask: bool = True, | |
**kwargs | |
) -> Union[Tuple[torch.LongTensor, torch.LongTensor], Tuple[torch.LongTensor, torch.LongTensor, torch.BoolTensor]]: | |
sequences = generate(self.model, input_ids, **kwargs) | |
attention_mask = None | |
pad_token_id = kwargs.get('pad_token_id', None) | |
if pad_token_id is not None: | |
attention_mask = sequences.not_equal(pad_token_id).to(dtype=torch.long, device=sequences.device) | |
if not return_action_mask: | |
return sequences, attention_mask, None | |
input_len = input_ids.size(1) | |
eos_token_id = kwargs.get('eos_token_id', None) | |
if eos_token_id is None: | |
action_mask = torch.ones_like(sequences, dtype=torch.bool) | |
else: | |
# left padding may be applied, only mask action | |
action_mask = (sequences[:, input_len:] == eos_token_id).cumsum(dim=-1) == 0 | |
action_mask = F.pad(action_mask, (1 + input_len, -1), value=True) # include eos token and input | |
action_mask[:, :input_len] = False | |
action_mask = action_mask[:, 1:] | |
return sequences, attention_mask, action_mask[:, -(sequences.size(1) - input_len):] | |
def forward(self, | |
sequences: torch.LongTensor, | |
num_actions: int, | |
attention_mask: Optional[torch.Tensor] = None) -> torch.Tensor: | |
"""Returns action log probs | |
""" | |
output = self.model(sequences, attention_mask=attention_mask) | |
logits = output['logits'] | |
log_probs = self.log_probs_from_logits(logits[:, :-1, :], sequences[:, 1:]) | |
return log_probs[:, -num_actions:] | |
def get_base_model(self): | |
return self.model | |
""" | |
Causal LM as a actor, e.g., GPT-2, OPT, BLOOM, etc. | |
""" | |
class CausalActor(Actor): | |
""" | |
Causal LM Actor model. | |
Args: | |
pretrained (str): Pretrained model name or path. | |
config (AutoConfig): Model config. | |
checkpoint (bool): Enable gradient checkpointing. | |
""" | |
def __init__(self, | |
pretrained: str = None, | |
config: Optional[AutoConfig] = None, | |
checkpoint: bool = False) -> None: | |
if pretrained is not None: | |
model = AutoModelForCausalLM.from_pretrained(pretrained) | |
elif config is not None: | |
model = AutoModelForCausalLM(config) | |
else: | |
model = AutoModelForCausalLM(AutoConfig()) | |
if checkpoint: | |
model.gradient_checkpointing_enable() | |
super().__init__(model) |