Spaces:
Sleeping
Sleeping
import streamlit as st | |
import torch | |
import bitsandbytes | |
import accelerate | |
import scipy | |
import copy | |
import time | |
from PIL import Image | |
import torch.nn as nn | |
import pandas as pd | |
from my_model.object_detection import detect_and_draw_objects | |
from my_model.captioner.image_captioning import get_caption | |
from my_model.utilities.gen_utilities import free_gpu_resources | |
from my_model.state_manager import StateManager | |
from my_model.config import inference_config as config | |
class InferenceRunner(StateManager): | |
def __init__(self): | |
super().__init__() | |
self.initialize_state() | |
self.sample_images = config.SAMPLE_IMAGES | |
def answer_question(self, caption, detected_objects_str, question, model): | |
free_gpu_resources() | |
answer = model.generate_answer(question, caption, detected_objects_str) | |
free_gpu_resources() | |
return answer | |
def image_qa_app(self, kbvqa): | |
# Display sample images as clickable thumbnails | |
self.col1.write("Choose from sample images:") | |
cols = self.col1.columns(len(self.sample_images)) | |
st.write('loading in prog?', st.session_state['loading_in_progress']) | |
for idx, sample_image_path in enumerate(self.sample_images): | |
with cols[idx]: | |
image = Image.open(sample_image_path) | |
image_for_display = self.resize_image(sample_image_path, 80, 80) | |
st.image(image_for_display) | |
if st.button(f'Select Sample Image {idx + 1}', key=f'sample_{idx}'): | |
self.process_new_image(sample_image_path, image, kbvqa) | |
st.write('loading in prog?',st.session_state['loading_in_progress']) | |
# Image uploader | |
uploaded_image = self.col1.file_uploader("Or upload an Image", type=["png", "jpg", "jpeg"]) | |
st.write(st.session_state['loading_in_progress']) | |
if uploaded_image is not None: | |
self.process_new_image(uploaded_image.name, Image.open(uploaded_image), kbvqa) | |
# Display and interact with each uploaded/selected image | |
with self.col2: | |
for image_key, image_data in self.get_images_data().items(): | |
with st.container(): | |
nested_col21, nested_col22 = st.columns([0.65, 0.35]) | |
image_for_display = self.resize_image(image_data['image'], 600) | |
nested_col21.image(image_for_display, caption=f'Uploaded Image: {image_key[-11:]}') | |
if not image_data['analysis_done']: | |
nested_col22.text("Please click 'Analyze Image'..") | |
with nested_col22: | |
if st.button('Analyze Image', key=f'analyze_{image_key}', on_click=self.disable_widgets, disabled=self.is_widget_disabled): | |
caption, detected_objects_str, image_with_boxes = self.analyze_image(image_data['image'], kbvqa) | |
self.update_image_data(image_key, caption, detected_objects_str, True) | |
st.session_state['loading_in_progress'] = False | |
# Initialize qa_history for each image | |
qa_history = image_data.get('qa_history', []) | |
if image_data['analysis_done']: | |
st.session_state['loading_in_progress'] = False | |
sample_questions = config.SAMPLE_QUESTIONS.get(image_key, []) | |
selected_question = nested_col22.selectbox( | |
"Select a sample question or type your own:", | |
["Custom question..."] + sample_questions, | |
key=f'sample_question_{image_key}') | |
# Text input for custom question | |
custom_question = nested_col22.text_input( | |
"Or ask your own question:", | |
key=f'custom_question_{image_key}') | |
# Use the selected sample question or the custom question | |
question = custom_question if selected_question == "Custom question..." else selected_question | |
if not question: | |
nested_col22.warning("Please select or enter a question.") | |
else: | |
if question in [q for q, _ in qa_history]: | |
nested_col22.warning("This question has already been answered.") | |
else: | |
if nested_col22.button('Get Answer', key=f'answer_{image_key}', disabled=self.is_widget_disabled): | |
answer = self.answer_question(image_data['caption'], image_data['detected_objects_str'], question, kbvqa) | |
st.session_state['loading_in_progress'] = False | |
self.add_to_qa_history(image_key, question, answer) | |
# Display Q&A history for each image | |
for num, (q, a) in enumerate(qa_history): | |
nested_col22.text(f"Q{num+1}: {q}\nA{num+1}: {a}\n") | |
def display_message(self, message, warning=False, write=False, text=False): | |
pass | |
def run_inference(self): | |
self.set_up_widgets() | |
load_fine_tuned_model = False | |
fine_tuned_model_already_loaded = False | |
reload_detection_model = False | |
force_reload_full_model = False | |
st.session_state['settings_changed'] = self.has_state_changed() | |
if st.session_state['settings_changed']: | |
self.col1.warning("Model settings have changed, please reload the model, this will take a second .. ") | |
st.session_state.button_label = "Reload Model" if self.is_model_loaded() and self.settings_changed else "Load Model" | |
with self.col1: | |
if st.session_state.method == "Fine-Tuned Model": | |
with st.container(): | |
nested_col11, nested_col12 = st.columns([0.5, 0.5]) | |
if nested_col11.button(st.session_state.button_label, on_click=self.disable_widgets, disabled=self.is_widget_disabled): | |
if st.session_state.button_label == "Load Model": | |
if self.is_model_loaded(): | |
free_gpu_resources() | |
fine_tuned_model_already_loaded = True | |
else: | |
load_fine_tuned_model = True | |
else: | |
reload_detection_model = True | |
if nested_col12.button("Force Reload", on_click=self.disable_widgets, disabled=self.is_widget_disabled): | |
force_reload_full_model = True | |
if load_fine_tuned_model: | |
t1=time.time() | |
free_gpu_resources() | |
self.load_model() | |
st.session_state['time_taken_to_load_model'] = int(time.time()-t1) | |
st.session_state['loading_in_progress'] = False | |
elif fine_tuned_model_already_loaded: | |
free_gpu_resources() | |
self.col1.text("Model already loaded and no settings were changed:)") | |
st.session_state['loading_in_progress'] = False | |
elif reload_detection_model: | |
free_gpu_resources() | |
self.reload_detection_model() | |
st.session_state['loading_in_progress'] = False | |
elif force_reload_full_model: | |
free_gpu_resources() | |
t1=time.time() | |
self.force_reload_model() | |
st.session_state['time_taken_to_load_model'] = int(time.time()-t1) | |
st.session_state['loading_in_progress'] = False | |
st.session_state['model_loaded'] = True | |
elif st.session_state.method == "In-Context Learning (n-shots)": | |
self.col1.warning(f'Model using {st.session_state.method} is not deployed yet, will be ready later.') | |
st.session_state['loading_in_progress'] = False | |
if self.is_model_loaded(): | |
free_gpu_resources() | |
st.session_state['loading_in_progress'] = False | |
self.image_qa_app(self.get_model()) | |
st.write('load success', 'loading in prog?', st.session_state['loading_in_progress']) | |