Tarsier2-7b / tools /conversation.py
omni-research's picture
init
97a05c0
raw
history blame
7.16 kB
# Copyright (2024) Bytedance Ltd. and/or its affiliates
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# copy and modify from: https://github.com/OpenGVLab/Ask-Anything/blob/main/video_chat2/conversation.py
from PIL import Image
import torch
from transformers import StoppingCriteria, StoppingCriteriaList
from enum import auto, Enum
import os
from dataset.processor import Processor
import re
IMAGE_TOKEN = "<image>"
VIDEO_TOKEN = "<video>"
class SeparatorStyle(Enum):
"""Different separator style."""
SINGLE = auto()
TWO = auto()
def get_prompt(conv):
ret = ""
if conv.system:
ret = conv.system + conv.sep1
for i, (role, message) in enumerate(conv.messages):
if message:
# In current version, the image should be add at the first conversation round.
# So we need to remove the special image tokens in following user input.
if i > 0:
message = re.sub(f"({IMAGE_TOKEN}|{VIDEO_TOKEN})\n*", "", message)
ret += role + ": " + message
if i % 2:
ret += conv.sep2
else:
ret += conv.sep1
else:
ret += role + ":"
return ret
class StoppingCriteriaSub(StoppingCriteria):
def __init__(self, stops=[], encounters=1):
super().__init__()
self.stops = stops
def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor):
for stop in self.stops:
if torch.all((stop == input_ids[0][-len(stop):])).item():
return True
return False
class Chat:
def __init__(self, model, processor: Processor, device='cuda', debug=False):
self.model = model
self.processor = processor
self.device = device
self.debug = debug
stop_words_ids = [torch.tensor([self.processor.tokenizer.eos_token_id]).to(device)]
self.stopping_criteria = StoppingCriteriaList([StoppingCriteriaSub(stops=stop_words_ids)])
def ask(self,text,conv):
conv.messages.append([conv.roles[0], text])
return conv
def prepare_model_inputs(self, conv, visual_data_file=None, images=None, n_frames=None):
conv.messages.append([conv.roles[1], None])
conv.messages[0][1] = re.sub(f"({IMAGE_TOKEN}|{VIDEO_TOKEN})\n*", "", conv.messages[0][1])
if images is None or isinstance(images, list) and len(images) == 0:
if isinstance(visual_data_file, str) and os.path.exists(visual_data_file):
images = self.processor.load_images(visual_data_file, n_frames)
elif isinstance(visual_data_file, Image.Image):
images = [visual_data_file]
elif visual_data_file is None or visual_data_file == "":
images = None
else:
raise NotImplementedError
if isinstance(images, list) and len(images) > 0:
conv.messages[0][1] = IMAGE_TOKEN*len(images) + '\n' + conv.messages[0][1]
prompt = get_prompt(conv)
if self.debug:
print(f"visual_data_file: {visual_data_file}")
print(f"Prompt: {prompt}", flush=True)
inputs = self.processor(prompt, images=images, edit_prompt=False, return_prompt=False)
inputs = {k:v.to(self.device) for k,v in inputs.items() if v is not None}
return inputs, conv, images
def answer(self, conv, visual_data_file=None, images=None, n_frames=None, max_new_tokens=512, num_beams=1, min_length=1, top_p=1.0,
repetition_penalty=1.0, length_penalty=1, temperature=0):
inputs, conv, images = self.prepare_model_inputs(conv, visual_data_file, images, n_frames)
if self.model is not None:
outputs = self.model.generate(
**inputs,
max_new_tokens=max_new_tokens,
stopping_criteria=self.stopping_criteria,
num_beams=num_beams,
do_sample=True if temperature > 0 else False,
min_length=min_length,
top_p=top_p,
repetition_penalty=repetition_penalty,
length_penalty=length_penalty,
temperature=temperature,
)
output_text = self.processor.tokenizer.decode(outputs[0][inputs['input_ids'][0].shape[0]:], skip_special_tokens=True)
else:
output_text = "Fake respone as launched in debug mode!"
conv.messages[-1][1] = output_text
return output_text, conv, images
class EasyDict(dict):
"""
Get attributes
>>> d = EasyDict({'foo':3})
>>> d['foo']
3
>>> d.foo
3
>>> d.bar
Traceback (most recent call last):
...
AttributeError: 'EasyDict' object has no attribute 'bar'
Works recursively
>>> d = EasyDict({'foo':3, 'bar':{'x':1, 'y':2}})
>>> isinstance(d.bar, dict)
True
>>> d.bar.x
1
"""
def __init__(self, d=None, **kwargs):
if d is None:
d = {}
if kwargs:
d.update(**kwargs)
for k, v in d.items():
setattr(self, k, v)
# Class attributes
for k in self.__class__.__dict__.keys():
if not (k.startswith("__") and k.endswith("__")) and not k in ("update", "pop"):
setattr(self, k, getattr(self, k))
def __setattr__(self, name, value):
if isinstance(value, (list, tuple)):
value = [self.__class__(x) if isinstance(x, dict) else x for x in value]
elif isinstance(value, dict) and not isinstance(value, self.__class__):
value = self.__class__(value)
super(EasyDict, self).__setattr__(name, value)
super(EasyDict, self).__setitem__(name, value)
__setitem__ = __setattr__
def update(self, e=None, **f):
d = e or dict()
d.update(f)
for k in d:
setattr(self, k, d[k])
def pop(self, k, d=None):
if hasattr(self, k):
delattr(self, k)
return super(EasyDict, self).pop(k, d)
conv_tarsier = EasyDict({
"system": "",
"roles": ("USER", "ASSISTANT"),
"messages": [],
"sep1": " ",
"sep2": "</s>",
}
)
conv_tarsier_yi = EasyDict({
"system": "",
"roles": ("USER", "ASSISTANT"),
"messages": [],
"sep1": " ",
"sep2": "<|endoftext|>",
}
)
conv_tarsier_qwen2 = EasyDict({
"system": "",
"roles": ("USER", "ASSISTANT"),
"messages": [],
"sep1": " ",
"sep2": "<|endoftext|>",
}
)
conv_templates = {
"tarsier-7b": conv_tarsier,
"tarsier-13b": conv_tarsier,
"tarsier-34b": conv_tarsier_yi,
"tarsier2-7b": conv_tarsier_qwen2
}