|
import streamlit as st |
|
import torch |
|
import copy |
|
import os |
|
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig |
|
from typing import Optional |
|
from my_model.gen_utilities import free_gpu_resources |
|
from my_model.captioner.image_captioning import ImageCaptioningModel |
|
from my_model.object_detection import ObjectDetector |
|
|
|
|
|
class KBVQA(): |
|
|
|
def __init__(self): |
|
self.kbvqa_model_name = "m7mdal7aj/fine_tunned_llama_2_merged" |
|
self.quantization='4bit' |
|
self.bnb_config = self.create_bnb_config() |
|
self.max_context_window = 4000 |
|
self.add_eos_token = False |
|
self.trust_remote = False |
|
self.use_fast = True |
|
self.kbvqa_tokenizer = None |
|
self.captioner = None |
|
self.detector = None |
|
self.detection_model = None |
|
self.detection_confidence = None |
|
self.kbvqa_model = None |
|
self.access_token = os.getenv("HUGGINGFACE_TOKEN") |
|
|
|
|
|
|
|
def create_bnb_config(self) -> BitsAndBytesConfig: |
|
""" |
|
Creates a BitsAndBytes configuration based on the quantization setting. |
|
Returns: |
|
BitsAndBytesConfig: Configuration for BitsAndBytes optimized model. |
|
""" |
|
if self.quantization == '4bit': |
|
return BitsAndBytesConfig( |
|
load_in_4bit=True, |
|
bnb_4bit_use_double_quant=True, |
|
bnb_4bit_quant_type="nf4", |
|
bnb_4bit_compute_dtype=torch.bfloat16 |
|
) |
|
elif self.quantization == '8bit': |
|
return BitsAndBytesConfig( |
|
load_in_8bit=True, |
|
bnb_8bit_use_double_quant=True, |
|
bnb_8bit_quant_type="nf4", |
|
bnb_8bit_compute_dtype=torch.bfloat16 |
|
) |
|
|
|
|
|
def load_caption_model(self): |
|
self.captioner = ImageCaptioningModel() |
|
self.captioner.load_model() |
|
|
|
def get_caption(self, img): |
|
|
|
return self.captioner.generate_caption(img) |
|
|
|
def load_detector(self, model): |
|
|
|
self.detector = ObjectDetector() |
|
self.detector.load_model(model) |
|
|
|
def detect_objects(self, img): |
|
image = self.detector.process_image(img) |
|
detected_objects_string, detected_objects_list = self.detector.detect_objects(image, threshold=self.detection_confidence) |
|
image_with_boxes = self.detector.draw_boxes(img, detected_objects_list) |
|
return image_with_boxes, detected_objects_string |
|
|
|
def load_fine_tuned_model(self): |
|
|
|
self.kbvqa_model = AutoModelForCausalLM.from_pretrained(self.kbvqa_model_name, |
|
device_map="auto", |
|
low_cpu_mem_usage=True, |
|
quantization_config=self.bnb_config, |
|
token=self.access_token) |
|
|
|
self.kbvqa_tokenizer = AutoTokenizer.from_pretrained(self.kbvqa_model_name, |
|
use_fast=self.use_fast, |
|
low_cpu_mem_usage=True, |
|
trust_remote_code=self.trust_remote, |
|
add_eos_token=self.add_eos_token, |
|
token=self.access_token) |
|
|
|
|
|
@property |
|
def all_models_loaded(self): |
|
return self.kbvqa_model is not None and self.captioner is not None and self.detector is not None |
|
|
|
def force_reload_model(self): |
|
free_gpu_resources() |
|
if self.kbvqa_model is not None: |
|
del self.kbvqa_model |
|
if self.captioner is not None: |
|
del self.captioner |
|
if self.detector is not None: |
|
del self.detector |
|
|
|
free_gpu_resources() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def format_prompt(self, current_query, history = None , sys_prompt=None, caption=None, objects=None): |
|
|
|
if sys_prompt is None: |
|
sys_prompt = "You are a helpful, respectful and honest assistant for visual question answering. you are provided with a caption of an image and a list of objects detected in the image along with their bounding boxes and level of certainty, you will output an answer to the given questions in no more than one sentence. Use logical reasoning to reach to the answer, but do not output your reasoning process unless asked for it. If provided, you will use the [CAP] and [/CAP] tags to indicate the begining and end of the caption respectively. If provided you will use the [OBJ] and [/OBJ] tags to indicate the begining and end of the list of detected objects in the image along with their bounding boxes respectively.if provided, you will use [QES] and [/QES] tags to indicate the begining and end of the question respectively." |
|
|
|
B_SENT = '<s>' |
|
E_SENT = '</s>' |
|
B_INST = '[INST]' |
|
E_INST = '[/INST]' |
|
B_SYS = '<<SYS>>\n' |
|
E_SYS = '\n<</SYS>>\n\n' |
|
B_CAP = '[CAP]' |
|
E_CAP = '[/CAP]' |
|
B_QES = '[QES]' |
|
E_QES = '[/QES]' |
|
B_OBJ = '[OBJ]' |
|
E_OBJ = '[/OBJ]' |
|
|
|
|
|
current_query = current_query.strip() |
|
sys_prompt = sys_prompt.strip() |
|
|
|
if history is None: |
|
if objects is None: |
|
p = f"""{B_SENT}{B_INST} {B_SYS}{sys_prompt}{E_SYS}{B_CAP}{caption}{E_CAP}{B_QES}{current_query}{E_QES}{E_INST}""" |
|
else: |
|
p = f"""{B_SENT}{B_INST} {B_SYS}{sys_prompt}{E_SYS}{B_CAP}{caption}{E_CAP}{B_OBJ}{objects}{E_OBJ}{B_QES}taking into consideration the objects with high certainty, {current_query}{E_QES}{E_INST}""" |
|
else: |
|
p = f"""{history}\n{B_SENT}{B_INST} {B_QES}{current_query}{E_QES}{E_INST}""" |
|
|
|
|
|
return p |
|
|
|
|
|
def generate_answer(self, question, caption, detected_objects_str,): |
|
|
|
prompt = self.format_prompt(question, caption=caption, objects=detected_objects_str) |
|
num_tokens = len(self.kbvqa_tokenizer.tokenize(prompt)) |
|
if num_tokens > self.max_context_window: |
|
st.write(f"Prompt too long with {num_tokens} tokens, consider increasing the confidence threshold for the object detector") |
|
return |
|
|
|
model_inputs = self.kbvqa_tokenizer(prompt, add_special_tokens=False, return_tensors="pt").to('cuda') |
|
input_ids = model_inputs["input_ids"] |
|
output_ids = self.kbvqa_model.generate(input_ids) |
|
index = input_ids.shape[1] |
|
history = self.kbvqa_tokenizer.decode(output_ids[0], skip_special_tokens=False) |
|
output_text = self.kbvqa_tokenizer.decode(output_ids[0][index:], skip_special_tokens=True) |
|
|
|
return output_text.capitalize() |
|
|
|
def prepare_kbvqa_model(only_reload_detection_model=False): |
|
free_gpu_resources() |
|
kbvqa = KBVQA() |
|
kbvqa.detection_model = st.session_state.detection_model |
|
|
|
with st.spinner('Loading model... this should take no more than a few minutes.'): |
|
|
|
if not only_reload_detection_model: |
|
progress_bar = st.progress(0) |
|
|
|
kbvqa.load_detector(kbvqa.detection_model) |
|
progress_bar.progress(33) |
|
kbvqa.load_caption_model() |
|
free_gpu_resources() |
|
progress_bar.progress(66) |
|
kbvqa.load_fine_tuned_model() |
|
free_gpu_resources() |
|
progress_bar.progress(100) |
|
|
|
else: |
|
progress_bar = st.progress(0) |
|
kbvqa.load_detector(kbvqa.detection_model) |
|
progress_bar.progress(100) |
|
|
|
if kbvqa.all_models_loaded: |
|
st.success('Model loaded successfully and ready for inferecne!') |
|
kbvqa.kbvqa_model.eval() |
|
free_gpu_resources() |
|
return kbvqa |
|
|
|
|
|
|