|
import streamlit as st |
|
import torch |
|
import bitsandbytes |
|
import accelerate |
|
import scipy |
|
import copy |
|
import time |
|
from typing import Tuple, Dict, List, Union |
|
from streamlit.delta_generator import DeltaGenerator |
|
from PIL import Image |
|
import torch.nn as nn |
|
import pandas as pd |
|
from my_model.detector.object_detection import detect_and_draw_objects |
|
from my_model.captioner.image_captioning import get_caption |
|
from my_model.utilities.gen_utilities import free_gpu_resources |
|
from my_model.state_manager import StateManager |
|
from my_model.config import inference_config as config |
|
|
|
|
|
class InferenceRunner(StateManager): |
|
""" |
|
Manages the user interface and interactions for running inference using the Streamlit-based Knowledge-Based Visual |
|
Question Answering (KBVQA) application. |
|
|
|
This class handles image uploads, displays sample images, and facilitates the question-answering process using the |
|
KBVQA model. |
|
Inherits from the StateManager class. |
|
""" |
|
|
|
def __init__(self) -> None: |
|
""" |
|
Initializes the InferenceRunner instance, setting up the necessary state. |
|
""" |
|
|
|
super().__init__() |
|
|
|
def answer_question(self, caption: str, detected_objects_str: str, question: str) -> Tuple[str, int]: |
|
""" |
|
Generates an answer to a user's question based on the image's caption and detected objects. |
|
|
|
Args: |
|
caption (str): Caption generated for the image. |
|
detected_objects_str (str): String representation of detected objects in the image. |
|
question (str): User's question about the image. |
|
|
|
Returns: |
|
Tuple[str, int]: A tuple containing the answer to the question and the prompt length. |
|
""" |
|
|
|
free_gpu_resources() |
|
answer = st.session_state.kbvqa.generate_answer(question, caption, detected_objects_str) |
|
prompt_length = st.session_state.kbvqa.current_prompt_length |
|
free_gpu_resources() |
|
return answer, prompt_length |
|
|
|
def display_sample_images(self) -> None: |
|
""" |
|
Displays sample images as clickable thumbnails for the user to select. |
|
|
|
Returns: |
|
None |
|
""" |
|
|
|
self.col1.write("Choose from sample images:") |
|
cols = self.col1.columns(len(config.SAMPLE_IMAGES)) |
|
for idx, sample_image_path in enumerate(config.SAMPLE_IMAGES): |
|
with cols[idx]: |
|
image = Image.open(sample_image_path) |
|
image_for_display = self.resize_image(sample_image_path, 80, 80) |
|
st.image(image_for_display) |
|
if st.button(f'Select Sample Image {idx + 1}', key=f'sample_{idx + 1}'): |
|
self.process_new_image(sample_image_path, image) |
|
|
|
def handle_image_upload(self) -> None: |
|
""" |
|
Provides an image uploader widget for the user to upload their own images. |
|
|
|
Returns: |
|
None |
|
""" |
|
|
|
uploaded_image = self.col1.file_uploader("Or upload an Image", type=["png", "jpg", "jpeg"]) |
|
if uploaded_image is not None: |
|
self.process_new_image(uploaded_image.name, Image.open(uploaded_image)) |
|
|
|
def display_image_and_analysis(self, image_key: str, image_data: Dict, nested_col21: DeltaGenerator, |
|
nested_col22: DeltaGenerator) -> None: |
|
""" |
|
Displays the uploaded or selected image and provides an option to analyze the image. |
|
|
|
Args: |
|
image_key (str): Unique key identifying the image. |
|
image_data (Dict): Data associated with the image. |
|
nested_col21 (DeltaGenerator): Column for displaying the image. |
|
nested_col22 (DeltaGenerator): Column for displaying the analysis button. |
|
|
|
Returns: |
|
None |
|
""" |
|
|
|
image_for_display = self.resize_image(image_data['image'], 600) |
|
nested_col21.image(image_for_display, caption=f'Uploaded Image: {image_key[-11:]}') |
|
self.handle_analysis_button(image_key, image_data, nested_col22) |
|
|
|
def handle_analysis_button(self, image_key: str, image_data: Dict, nested_col22: DeltaGenerator) -> None: |
|
""" |
|
Provides an 'Analyze Image' button and processes the image analysis upon click. |
|
|
|
Args: |
|
image_key (str): Unique key identifying the image. |
|
image_data (Dict): Data associated with the image. |
|
nested_col22 (DeltaGenerator): Column for displaying the analysis button. |
|
|
|
Returns: |
|
None |
|
""" |
|
|
|
if not image_data['analysis_done'] or self.settings_changed or self.confidance_change: |
|
nested_col22.text("Please click 'Analyze Image'..") |
|
analyze_button_key = f'analyze_{image_key}_{st.session_state.detection_model}_' \ |
|
f'{st.session_state.confidence_level}' |
|
with nested_col22: |
|
if st.button('Analyze Image', key=analyze_button_key, on_click=self.disable_widgets, |
|
disabled=self.is_widget_disabled): |
|
with st.spinner('Analyzing the image...'): |
|
caption, detected_objects_str, image_with_boxes = self.analyze_image(image_data['image']) |
|
self.update_image_data(image_key, caption, detected_objects_str, True) |
|
st.session_state['loading_in_progress'] = False |
|
|
|
def handle_question_answering(self, image_key: str, image_data: Dict, nested_col22: DeltaGenerator) -> None: |
|
""" |
|
Manages the question-answering interface for each image. |
|
|
|
Args: |
|
image_key (str): Unique key identifying the image. |
|
image_data (Dict): Data associated with the image. |
|
nested_col22 (DeltaGenerator): Column for displaying the question-answering interface. |
|
|
|
Returns: |
|
None |
|
""" |
|
|
|
if image_data['analysis_done']: |
|
self.display_question_answering_interface(image_key, image_data, nested_col22) |
|
|
|
if self.settings_changed or self.confidance_change: |
|
nested_col22.warning("Confidence level changed, please click 'Analyze Image' each time you change it.") |
|
|
|
def display_question_answering_interface(self, image_key: str, image_data: Dict, |
|
nested_col22: DeltaGenerator) -> None: |
|
""" |
|
Displays the interface for question answering, including sample questions and a custom question input. |
|
|
|
Args: |
|
image_key (str): Unique key identifying the image. |
|
image_data (Dict): Data associated with the image. |
|
nested_col22 (DeltaGenerator): The column where the interface will be displayed. |
|
|
|
Returns: |
|
None |
|
""" |
|
|
|
sample_questions = config.SAMPLE_QUESTIONS.get(image_key, []) |
|
selected_question = nested_col22.selectbox("Select a sample question or type your own:", |
|
["Custom question..."] + sample_questions, |
|
key=f'sample_question_{image_key}') |
|
|
|
|
|
question = selected_question |
|
if selected_question == "Custom question...": |
|
custom_question = nested_col22.text_input("Or ask your own question:", key=f'custom_question_{image_key}') |
|
question = custom_question |
|
|
|
self.process_question(image_key, question, image_data, nested_col22) |
|
|
|
qa_history = image_data.get('qa_history', []) |
|
for num, (q, a, p) in enumerate(qa_history): |
|
nested_col22.text(f"Q{num + 1}: {q}\nA{num + 1}: {a}\nPrompt Length: {p}\n") |
|
|
|
def process_question(self, image_key: str, question: str, image_data: Dict, nested_col22: DeltaGenerator) -> None: |
|
""" |
|
Processes the user's question, generates an answer, and updates the question-answer history. |
|
This method checks if the question is new or if settings have changed, and if so, generates an answer using the |
|
KBVQA model. |
|
It then updates the question-answer history for the image. |
|
|
|
Args: |
|
image_key (str): Unique key identifying the image. |
|
question (str): The question asked by the user. |
|
image_data (Dict): Data associated with the image. |
|
nested_col22 (DeltaGenerator): The column where the answer will be displayed. |
|
|
|
Returns: |
|
None |
|
""" |
|
|
|
qa_history = image_data.get('qa_history', []) |
|
if question and ( |
|
question not in [q for q, _, _ in qa_history] or self.settings_changed or self.confidance_change): |
|
if nested_col22.button('Get Answer', key=f'answer_{image_key}', disabled=self.is_widget_disabled): |
|
answer, prompt_length = self.answer_question(image_data['caption'], image_data['detected_objects_str'], |
|
question) |
|
self.add_to_qa_history(image_key, question, answer, prompt_length) |
|
|
|
def image_qa_app(self) -> None: |
|
""" |
|
Main application interface for image-based question answering. |
|
|
|
This method orchestrates the display of sample images, handles image uploads, and facilitates the |
|
question-answering process. |
|
It iterates through each image in the session state, displaying the image and providing interfaces for image |
|
analysis and question answering. |
|
|
|
Returns: |
|
None |
|
""" |
|
|
|
self.display_sample_images() |
|
self.handle_image_upload() |
|
|
|
with self.col2: |
|
for image_key, image_data in self.get_images_data().items(): |
|
with st.container(): |
|
nested_col21, nested_col22 = st.columns([0.65, 0.35]) |
|
self.display_image_and_analysis(image_key, image_data, nested_col21, nested_col22) |
|
self.handle_question_answering(image_key, image_data, nested_col22) |
|
|
|
def run_inference(self) -> None: |
|
""" |
|
Sets up widgets and manages the inference process, including model loading and reloading, based on user |
|
interactions. |
|
|
|
This method orchestrates the overall flow of the inference process. |
|
|
|
Returns: |
|
None |
|
""" |
|
|
|
self.set_up_widgets() |
|
|
|
load_fine_tuned_model = False |
|
fine_tuned_model_already_loaded = False |
|
reload_kbvqa = False |
|
reload_detection_model = False |
|
force_reload_full_model = False |
|
|
|
st.session_state.button_label = ( |
|
"Reload Model" if (self.is_model_loaded and |
|
st.session_state.kbvqa.detection_model != st.session_state['detection_model']) or |
|
(st.session_state['previous_state']['method'] is not None and |
|
st.session_state['method'] != st.session_state['previous_state']['method']) |
|
else "Load Model" |
|
) |
|
|
|
if st.session_state.button_label == "Reload Model": |
|
self.col1.warning("Model settings have changed, please reload the model.. ") |
|
|
|
with self.col1: |
|
if st.session_state.method == "7b-Fine-Tuned Model" or st.session_state.method == "13b-Fine-Tuned Model": |
|
with st.container(): |
|
nested_col11, nested_col12 = st.columns([0.5, 0.5]) |
|
if nested_col11.button(st.session_state.button_label, on_click=self.disable_widgets, |
|
disabled=self.is_widget_disabled): |
|
if st.session_state.button_label == "Load Model": |
|
if self.is_model_loaded: |
|
free_gpu_resources() |
|
fine_tuned_model_already_loaded = True |
|
else: |
|
load_fine_tuned_model = True |
|
elif st.session_state.button_label == "Reload Model" and st.session_state['method'] != \ |
|
st.session_state['previous_state']['method']: |
|
force_reload_full_model = True |
|
elif (self.is_model_loaded and st.session_state.kbvqa.detection_model != |
|
st.session_state['detection_model']): |
|
reload_detection_model = True |
|
if nested_col12.button("Force Reload", on_click=self.disable_widgets, |
|
disabled=self.is_widget_disabled): |
|
force_reload_full_model = True |
|
if load_fine_tuned_model: |
|
t1 = time.time() |
|
free_gpu_resources() |
|
self.load_model() |
|
st.session_state['time_taken_to_load_model'] = int(time.time() - t1) |
|
st.session_state['loading_in_progress'] = False |
|
elif fine_tuned_model_already_loaded: |
|
free_gpu_resources() |
|
self.col1.text("Model already loaded and no settings were changed:)") |
|
st.session_state['loading_in_progress'] = False |
|
elif reload_detection_model: |
|
free_gpu_resources() |
|
self.reload_detection_model() |
|
st.session_state['loading_in_progress'] = False |
|
elif force_reload_full_model: |
|
free_gpu_resources() |
|
t1 = time.time() |
|
self.force_reload_model() |
|
st.session_state['time_taken_to_load_model'] = int(time.time() - t1) |
|
st.session_state['loading_in_progress'] = False |
|
st.session_state['model_loaded'] = True |
|
|
|
elif st.session_state.method == "Vision-Language Embeddings Alignment": |
|
self.col1.warning( |
|
f'Model using {st.session_state.method} is desgined but requires large scale data and multiple ' |
|
f'high-end GPUs, implementation will be explored in the future.') |
|
|
|
if self.is_model_loaded: |
|
free_gpu_resources() |
|
st.session_state['loading_in_progress'] = False |
|
self.update_prev_state() |
|
self.image_qa_app() |
|
|
|
|