|
from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer, PreTrainedModel |
|
import torch |
|
import torch.nn as nn |
|
from typing import List, Optional, Tuple, Union |
|
|
|
from transformers.utils import ( |
|
add_code_sample_docstrings, |
|
add_start_docstrings, |
|
add_start_docstrings_to_model_forward, |
|
logging, |
|
replace_return_docstrings, |
|
) |
|
|
|
from transformers.modeling_outputs import ( |
|
BaseModelOutputWithPast, |
|
CausalLMOutputWithPast, |
|
QuestionAnsweringModelOutput, |
|
SequenceClassifierOutputWithPast, |
|
) |
|
|
|
from transformers import OPTConfig |
|
from transformers.models.opt.modeling_opt import OPTModel |
|
from transformers.models.opt.modeling_opt import OPTPreTrainedModel |
|
|
|
class OPT_PromptTuned_For_SentimentAnalysis(OPTPreTrainedModel): |
|
_tied_weights_keys = ["lm_head.weight"] |
|
_CONFIG_FOR_DOC = "OPTConfig" |
|
config_class = OPTConfig |
|
|
|
|
|
def __init__(self, config): |
|
|
|
self.config = config |
|
super().__init__(config) |
|
self.model = OPTModel(config) |
|
self.lm_head = nn.Linear(config.word_embed_proj_dim, config.vocab_size, bias=False) |
|
self.embedding = nn.Embedding(8, config.word_embed_proj_dim) |
|
self.post_init() |
|
|
|
|
|
def get_input_embeddings(self): |
|
return self.model.decoder.embed_tokens |
|
|
|
def set_input_embeddings(self, value): |
|
self.model.decoder.embed_tokens = value |
|
|
|
def get_output_embeddings(self): |
|
return self.lm_head |
|
|
|
def set_output_embeddings(self, new_embeddings): |
|
self.lm_head = new_embeddings |
|
|
|
def set_decoder(self, decoder): |
|
self.model.decoder = decoder |
|
|
|
def get_decoder(self): |
|
return self.model.decoder |
|
|
|
def load_prompts(self): |
|
self.embedding.load_state_dict(torch.load(self.config.prompt_dict_path)) |
|
return self |
|
|
|
@replace_return_docstrings(output_type=CausalLMOutputWithPast, config_class=_CONFIG_FOR_DOC) |
|
def forward( |
|
self, |
|
input_ids: torch.LongTensor = None, |
|
attention_mask: Optional[torch.Tensor] = None, |
|
head_mask: Optional[torch.Tensor] = None, |
|
past_key_values: Optional[List[torch.FloatTensor]] = None, |
|
inputs_embeds: Optional[torch.FloatTensor] = None, |
|
labels: Optional[torch.LongTensor] = None, |
|
use_cache: Optional[bool] = None, |
|
output_attentions: Optional[bool] = None, |
|
output_hidden_states: Optional[bool] = None, |
|
return_dict: Optional[bool] = None |
|
) -> Union[Tuple, CausalLMOutputWithPast]: |
|
r""" |
|
Args: |
|
input_ids (`torch.LongTensor` of shape `(batch_size, sequence_length)`): |
|
Indices of input sequence tokens in the vocabulary. Padding will be ignored by default should you |
|
provide it. |
|
|
|
Indices can be obtained using [`AutoTokenizer`]. See [`PreTrainedTokenizer.encode`] and |
|
[`PreTrainedTokenizer.__call__`] for details. |
|
|
|
[What are input IDs?](../glossary#input-ids) |
|
attention_mask (`torch.Tensor` of shape `(batch_size, sequence_length)`, *optional*): |
|
Mask to avoid performing attention on padding token indices. Mask values selected in `[0, 1]`: |
|
|
|
- 1 for tokens that are **not masked**, |
|
- 0 for tokens that are **masked**. |
|
|
|
[What are attention masks?](../glossary#attention-mask) |
|
head_mask (`torch.Tensor` of shape `(num_hidden_layers, num_attention_heads)`, *optional*): |
|
Mask to nullify selected heads of the attention modules. Mask values selected in `[0, 1]`: |
|
|
|
- 1 indicates the head is **not masked**, |
|
- 0 indicates the head is **masked**. |
|
|
|
past_key_values (`tuple(tuple(torch.FloatTensor))`, *optional*, returned when `use_cache=True` is passed or when `config.use_cache=True`): |
|
Tuple of `tuple(torch.FloatTensor)` of length `config.n_layers`, with each tuple having 2 tensors of |
|
shape `(batch_size, num_heads, sequence_length, embed_size_per_head)`) and 2 additional tensors of |
|
shape `(batch_size, num_heads, encoder_sequence_length, embed_size_per_head)`. The two additional |
|
tensors are only required when the model is used as a decoder in a Sequence to Sequence model. |
|
|
|
Contains pre-computed hidden-states (key and values in the self-attention blocks and in the |
|
cross-attention blocks) that can be used (see `past_key_values` input) to speed up sequential decoding. |
|
|
|
If `past_key_values` are used, the user can optionally input only the last `decoder_input_ids` (those |
|
that don't have their past key value states given to this model) of shape `(batch_size, 1)` instead of |
|
all `decoder_input_ids` of shape `(batch_size, sequence_length)`. |
|
inputs_embeds (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`, *optional*): |
|
Optionally, instead of passing `input_ids` you can choose to directly pass an embedded representation. |
|
This is useful if you want more control over how to convert `input_ids` indices into associated vectors |
|
than the model's internal embedding lookup matrix. |
|
labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*): |
|
Labels for computing the masked language modeling loss. Indices should either be in `[0, ..., |
|
config.vocab_size]` or -100 (see `input_ids` docstring). Tokens with indices set to `-100` are ignored |
|
(masked), the loss is only computed for the tokens with labels in `[0, ..., config.vocab_size]`. |
|
use_cache (`bool`, *optional*): |
|
If set to `True`, `past_key_values` key value states are returned and can be used to speed up decoding |
|
(see `past_key_values`). |
|
output_attentions (`bool`, *optional*): |
|
Whether or not to return the attentions tensors of all attention layers. See `attentions` under |
|
returned tensors for more detail. |
|
output_hidden_states (`bool`, *optional*): |
|
Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors |
|
for more detail. |
|
return_dict (`bool`, *optional*): |
|
Whether or not to return a [`~utils.ModelOutput`] instead of a plain tuple. |
|
|
|
Returns: |
|
|
|
Example: |
|
|
|
```python |
|
>>> from transformers import AutoTokenizer, OPTForCausalLM |
|
|
|
>>> model = OPTForCausalLM.from_pretrained("facebook/opt-350m") |
|
>>> tokenizer = AutoTokenizer.from_pretrained("facebook/opt-350m") |
|
|
|
>>> prompt = "Hey, are you conscious? Can you talk to me?" |
|
>>> inputs = tokenizer(prompt, return_tensors="pt") |
|
|
|
>>> # Generate |
|
>>> generate_ids = model.generate(inputs.input_ids, max_length=30) |
|
>>> tokenizer.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0] |
|
"Hey, are you conscious? Can you talk to me?\nI'm not conscious. I'm just a little bit of a weirdo." |
|
```""" |
|
|
|
output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions |
|
output_hidden_states = ( |
|
output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states |
|
) |
|
return_dict = return_dict if return_dict is not None else self.config.use_return_dict |
|
|
|
|
|
|
|
|
|
outputs = self.model.decoder( |
|
input_ids=input_ids, |
|
attention_mask=attention_mask, |
|
head_mask=head_mask, |
|
past_key_values=past_key_values, |
|
inputs_embeds=inputs_embeds, |
|
use_cache=True, |
|
output_attentions=output_attentions, |
|
output_hidden_states=output_hidden_states, |
|
return_dict=return_dict |
|
) |
|
|
|
logits = self.lm_head(outputs[0]).contiguous() |
|
|
|
loss = None |
|
if labels is not None: |
|
|
|
labels = labels.to(logits.device) |
|
|
|
shift_logits = logits[..., :-1, :].contiguous() |
|
shift_labels = labels[..., 1:].contiguous() |
|
|
|
loss_fct = CrossEntropyLoss() |
|
loss = loss_fct(shift_logits.view(-1, self.config.vocab_size), shift_labels.view(-1)) |
|
|
|
if not return_dict: |
|
output = (logits,) + outputs[1:] |
|
return (loss,) + output if loss is not None else output |
|
|
|
return CausalLMOutputWithPast( |
|
loss=loss, |
|
logits=logits, |
|
past_key_values=outputs.past_key_values, |
|
hidden_states=outputs.hidden_states, |
|
attentions=outputs.attentions, |
|
) |
|
|
|
def prepare_inputs_for_generation( |
|
self, input_ids, past_key_values=None, attention_mask=None, inputs_embeds=None, **kwargs |
|
): |
|
|
|
if past_key_values: |
|
input_ids = input_ids[:, -1:] |
|
|
|
if inputs_embeds is not None and past_key_values is None: |
|
input = torch.tensor([0,1,2,3,4,5,6,7]).to(inputs_embeds.device) |
|
inputs_embeds = torch.cat([self.embedding(input).unsqueeze(0), inputs_embeds], dim=1) |
|
attention_mask = torch.cat( |
|
[torch.ones((attention_mask.shape[0], 8), |
|
device=attention_mask.device), |
|
attention_mask], dim=1) |
|
model_inputs = {"inputs_embeds": inputs_embeds} |
|
else: |
|
model_inputs = {"input_ids": input_ids} |
|
attention_mask = torch.cat( |
|
[torch.ones((attention_mask.shape[0], 8), |
|
device=attention_mask.device), |
|
attention_mask], dim=1) |
|
|
|
model_inputs.update( |
|
{ |
|
"past_key_values": past_key_values, |
|
"use_cache": kwargs.get("use_cache"), |
|
"attention_mask": attention_mask, |
|
} |
|
) |
|
return model_inputs |
|
|
|
def generate(self, input_ids, past_key_values=None, attention_mask=None, inputs_embeds=None, **kwargs): |
|
max_new_tokens = 3 |
|
input_embeddings = self.get_input_embeddings()(input_ids).to(input_ids.device) |
|
|
|
return super().generate(input_ids=input_ids, inputs_embeds=input_embeddings,max_new_tokens=max_new_tokens,attention_mask=attention_mask, **kwargs) |
|
|
|
@staticmethod |
|
def _reorder_cache(past_key_values, beam_idx): |
|
reordered_past = () |
|
for layer_past in past_key_values: |
|
reordered_past += (tuple(past_state.index_select(0, beam_idx) for past_state in layer_past),) |
|
return reordered_past |