|
from typing import List, Optional, Tuple, Union |
|
|
|
import torch |
|
import torch.nn as nn |
|
import math |
|
import pdb |
|
from typing import Dict, Any |
|
from PIL import Image |
|
|
|
from transformers import AutoConfig, AutoModelForCausalLM, PretrainedConfig, PreTrainedModel |
|
|
|
|
|
from transformers.modeling_outputs import CausalLMOutputWithPast |
|
|
|
from .llava_arch import LlavaMetaModel, LlavaMetaForCausalLM |
|
|
|
from transformers.cache_utils import Cache, DynamicCache |
|
|
|
from transformers.generation.utils import GenerationConfig |
|
|
|
import sys |
|
from .modeling_phi import PhiForCausalLM, PhiModel, PhiConfig |
|
from .generation_utils import build_allava_input |
|
|
|
|
|
|
|
|
|
|
|
|
|
class LlavaPhiConfig(PhiConfig): |
|
model_type = "llava_phi" |
|
|
|
class LlavaPhiModel(LlavaMetaModel, PhiModel): |
|
config_class = LlavaPhiConfig |
|
|
|
def __init__(self, config: PhiConfig): |
|
super(LlavaPhiModel, self).__init__(config) |
|
|
|
|
|
|
|
class LlavaPhiForCausalLM(PhiForCausalLM, LlavaMetaForCausalLM): |
|
config_class = LlavaPhiConfig |
|
|
|
def __init__(self, config, init_vision_encoder_from_ckpt=True): |
|
|
|
config._attn_implementation = "flash_attention_2" |
|
|
|
super(PhiForCausalLM, self).__init__(config) |
|
|
|
self.model = LlavaPhiModel(config) |
|
if hasattr(self.model, '_use_flash_attention_2'): |
|
assert self.model._use_flash_attention_2, 'flash attn is not enabled. check it out!' |
|
self.vocab_size = config.vocab_size |
|
self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False) |
|
|
|
if init_vision_encoder_from_ckpt: |
|
vision_tower = self.get_vision_tower() |
|
print(f'loading from CLIP first. This should only be used at inference!!!') |
|
vision_tower.load_model() |
|
|
|
|
|
self.post_init() |
|
|
|
def get_model(self): |
|
return self.model |
|
|
|
def get_tokenizer(self): |
|
return self.tokenizer |
|
|
|
def get_processor(self): |
|
return self.model.vision_tower.image_processor |
|
|
|
|
|
def forward( |
|
self, |
|
input_ids: torch.LongTensor = None, |
|
attention_mask: Optional[torch.Tensor] = None, |
|
position_ids: Optional[torch.LongTensor] = None, |
|
past_key_values: Optional[List[torch.FloatTensor]] = None, |
|
inputs_embeds: Optional[torch.FloatTensor] = None, |
|
labels: Optional[torch.LongTensor] = None, |
|
use_cache: Optional[bool] = None, |
|
output_attentions: Optional[bool] = None, |
|
output_hidden_states: Optional[bool] = None, |
|
images: Optional[torch.FloatTensor] = None, |
|
return_dict: Optional[bool] = None, |
|
) -> Union[Tuple, CausalLMOutputWithPast]: |
|
|
|
|
|
if inputs_embeds is None: |
|
( |
|
input_ids, |
|
position_ids, |
|
attention_mask, |
|
past_key_values, |
|
inputs_embeds, |
|
labels |
|
|
|
) = self.prepare_inputs_labels_for_multimodal_new( |
|
input_ids, |
|
position_ids, |
|
attention_mask, |
|
past_key_values, |
|
labels, |
|
images |
|
) |
|
|
|
|
|
return super().forward( |
|
input_ids=input_ids, |
|
attention_mask=attention_mask, |
|
position_ids=position_ids, |
|
past_key_values=past_key_values, |
|
inputs_embeds=inputs_embeds, |
|
labels=labels, |
|
use_cache=use_cache, |
|
output_attentions=output_attentions, |
|
output_hidden_states=output_hidden_states, |
|
return_dict=return_dict |
|
) |
|
|
|
def prepare_inputs_for_generation(self, input_ids, past_key_values=None, attention_mask=None, inputs_embeds=None, **kwargs): |
|
''' |
|
This function is called for each token at inference |
|
''' |
|
|
|
images = kwargs.pop("images", None) |
|
|
|
|
|
|
|
|
|
|
|
if past_key_values is not None: |
|
if isinstance(past_key_values, Cache): |
|
cache_length = past_key_values.get_seq_length() |
|
past_length = past_key_values.seen_tokens |
|
max_cache_length = past_key_values.get_max_length() |
|
else: |
|
cache_length = past_length = past_key_values[0][0].shape[2] |
|
max_cache_length = None |
|
|
|
|
|
|
|
|
|
|
|
if attention_mask is not None and attention_mask.shape[1] > input_ids.shape[1]: |
|
input_ids = input_ids[:, -(attention_mask.shape[1] - past_length) :] |
|
|
|
|
|
elif past_length < input_ids.shape[1]: |
|
input_ids = input_ids[:, past_length:] |
|
|
|
elif past_length >= input_ids.shape[1]: |
|
input_ids = input_ids[:, [-1]] |
|
|
|
|
|
if ( |
|
max_cache_length is not None |
|
and attention_mask is not None |
|
and cache_length + input_ids.shape[1] > max_cache_length |
|
): |
|
attention_mask = attention_mask[:, -max_cache_length:] |
|
|
|
position_ids = kwargs.get("position_ids", None) |
|
if attention_mask is not None and position_ids is None: |
|
|
|
position_ids = attention_mask.long().cumsum(-1) - 1 |
|
position_ids.masked_fill_(attention_mask == 0, 1) |
|
if past_key_values: |
|
position_ids = position_ids[:, -input_ids.shape[1] :] |
|
|
|
|
|
if inputs_embeds is not None and past_key_values is None: |
|
model_inputs = {"inputs_embeds": inputs_embeds} |
|
else: |
|
model_inputs = {"input_ids": input_ids} |
|
|
|
model_inputs.update( |
|
{ |
|
"position_ids": position_ids, |
|
"past_key_values": past_key_values, |
|
"use_cache": kwargs.get("use_cache"), |
|
"attention_mask": attention_mask, |
|
} |
|
) |
|
|
|
|
|
|
|
|
|
|
|
if images is not None: |
|
model_inputs['images'] = images |
|
return model_inputs |
|
|
|
|
|
|
|
|
|
def chat( |
|
self, |
|
texts: Optional[str | list[list[str, str]]], |
|
images: Optional[str | list[str]] = None, |
|
history: Optional[list[str]] = None, |
|
stream = False, |
|
return_history = False, |
|
**kwargs |
|
): |
|
''' |
|
texts: if `str`, then generate for a single round; if list[dict], |
|
images: str (optional), local path to an image. |
|
''' |
|
use_cache = kwargs.pop('use_cache', True) |
|
|
|
|
|
|
|
|
|
|
|
input_ids, image_tensors, history = build_allava_input( |
|
tokenizer = self.get_tokenizer(), |
|
processor = self.get_processor(), |
|
texts = texts, |
|
images = images, |
|
history=history, |
|
return_history=return_history, |
|
device = self.device |
|
) |
|
|
|
|
|
|
|
|
|
|
|
if 'cuda' in str(self.device): |
|
device_type = 'cuda' |
|
else: |
|
device_type = 'cpu' |
|
|
|
with torch.autocast(device_type=device_type, dtype=self.dtype): |
|
output_ids = self.generate( |
|
inputs=input_ids, |
|
images=image_tensors, |
|
use_cache=use_cache, |
|
**kwargs) |
|
|
|
answer = self.get_tokenizer().decode(output_ids[0, input_ids.shape[1]:], skip_special_tokens=True).strip() |
|
|
|
if return_history: |
|
history[-1][-1] = answer |
|
return answer, history |
|
return answer |
|
|
|
|
|
AutoConfig.register("llava_phi", LlavaPhiConfig) |
|
AutoModelForCausalLM.register(LlavaPhiConfig, LlavaPhiForCausalLM) |