Spaces:
Sleeping
Sleeping
# coding=utf-8 | |
# Copyright 2022 EleutherAI and the HuggingFace Inc. team. All rights reserved. | |
# | |
# This code is based on EleutherAI's GPT-NeoX library and the GPT-NeoX | |
# and OPT implementations in this library. It has been modified from its | |
# original forms to accommodate minor architectural differences compared | |
# to GPT-NeoX and OPT used by the Meta AI team that trained the model. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
from typing import List, Optional, Tuple, Union | |
import torch | |
from torch import nn | |
from transformers.cache_utils import Cache | |
from transformers.modeling_outputs import CausalLMOutputWithPast | |
from transformers.models.llama import LlamaForCausalLM | |
class LlamaskForCausalLM(LlamaForCausalLM): | |
def __init__(self, config): | |
super().__init__(config) | |
self.special_tokens = nn.Embedding(2, config.hidden_size) # 0 -> mask encoding, 1 -> buffer token | |
self.post_init() | |
def generate( | |
self, | |
input_ids: torch.LongTensor = None, | |
attention_mask: Optional[torch.Tensor] = None, | |
max_tokens: int=32, | |
temperature: float=1.0, | |
): | |
eos_token_tensor = torch.tensor(self.config.eos_token_id, device=input_ids.device) | |
for _ in range(max_tokens): | |
outputs = self.forward(input_ids=input_ids, attention_mask=attention_mask) | |
logits = outputs['logits'][:,-1,:]/temperature | |
probs = torch.nn.functional.softmax(logits, dim=-1) | |
next_tokens = torch.multinomial(probs, num_samples=1).squeeze(1) | |
batch_size, seq_len, _ = attention_mask.shape | |
expanded_mask = torch.zeros(batch_size, seq_len + 1, seq_len + 1, dtype=attention_mask.dtype, device=attention_mask.device) | |
# Step 1: Copy the existing attention mask (top-left block of the expanded mask) | |
expanded_mask[:, :seq_len, :seq_len] = attention_mask | |
# Step 2: Copy the last row of the original attention mask into the new row (excluding the last position) | |
expanded_mask[:, seq_len, :seq_len] = attention_mask[:, -1, :] | |
# Step 3: Set the diagonal of the new token to attend to all previous tokens by setting the new last element to 1 | |
expanded_mask[:, seq_len, seq_len] = 1 | |
next_tokens = next_tokens[:, None] | |
input_ids = torch.cat([input_ids, next_tokens], dim=-1) | |
attention_mask = expanded_mask | |
if torch.all(torch.any(next_tokens==eos_token_tensor, dim=1)): | |
break | |
return input_ids | |
def forward( | |
self, | |
input_ids: torch.LongTensor = None, | |
num_buffer_token: Optional[int] = 0, | |
attention_mask: Optional[torch.Tensor] = None, | |
position_ids: Optional[torch.LongTensor] = None, | |
past_key_values: Optional[Union[Cache, List[torch.FloatTensor]]] = None, | |
inputs_embeds: Optional[torch.FloatTensor] = None, | |
labels: Optional[torch.LongTensor] = None, | |
use_cache: Optional[bool] = None, | |
output_attentions: Optional[bool] = None, | |
output_hidden_states: Optional[bool] = None, | |
return_dict: Optional[bool] = None, | |
cache_position: Optional[torch.LongTensor] = None | |
) -> Union[Tuple, CausalLMOutputWithPast]: | |
batch_size = input_ids.shape[0] | |
# print("BEWARE PRIVACY TAG DISABLE") | |
# privacy_tag = self.special_tokens(torch.tensor([0], device=input_ids.device)) | |
# buffer_token = self.special_tokens(torch.tensor([0], device=input_ids.device)).unsqueeze(0) | |
inputs_embeds = self.model.embed_tokens(input_ids) | |
# buffer_tokens = buffer_token.repeat(batch_size, num_buffer_token, 1) | |
# inputs_embeds = torch.cat([inputs_embeds, buffer_tokens], dim=1) | |
# inputs_embeds[attention_mask[:,-1,:]==0] = inputs_embeds[attention_mask[:,-1,:]==0] + privacy_tag | |
attention_mask = attention_mask.unsqueeze(1) | |
attention_mask = attention_mask.to(inputs_embeds.dtype) | |
attention_mask = attention_mask.masked_fill(attention_mask == 0, -1e9) | |
attention_mask = attention_mask.masked_fill(attention_mask == 1, float(0.0)) | |
outputs = super().forward( | |
input_ids=None, | |
attention_mask=attention_mask, | |
position_ids=position_ids, | |
past_key_values=past_key_values, | |
inputs_embeds=inputs_embeds, | |
labels=labels, | |
use_cache=use_cache, | |
output_attentions=output_attentions, | |
output_hidden_states=output_hidden_states, | |
return_dict=return_dict, | |
cache_position=cache_position, | |
) | |
return outputs | |