Spaces:
Running
Running
import streamlit as st | |
import extra_streamlit_components as stx | |
import requests | |
from PIL import Image | |
from io import BytesIO | |
from llama_index.llms.palm import PaLM | |
from llama_index import ServiceContext, VectorStoreIndex, Document, StorageContext, load_index_from_storage | |
from llama_index.memory import ChatMemoryBuffer | |
import os | |
import datetime | |
#imports for resnet | |
from transformers import AutoFeatureExtractor, ResNetForImageClassification | |
import torch | |
from io import BytesIO | |
# Set up the title of the application | |
st.title("AInimal Go!") | |
#st.set_page_config(layout="wide") | |
st.write("My Pokemon Go inspired 'AInimal Go!' app. You can upload an image or snap a picture of an animal and start chatting with it") | |
# Sidebar | |
st.sidebar.markdown('## Created By') | |
st.sidebar.markdown(""" | |
Harshad Suryawanshi | |
- [Linkedin](https://www.linkedin.com/in/harshadsuryawanshi/) | |
- [Medium](https://harshadsuryawanshi.medium.com/) | |
""") | |
st.sidebar.markdown('## Other Projects') | |
st.sidebar.markdown(""" | |
- [Building My Own GPT4-V with PaLM and Kosmos](https://lnkd.in/dawgKZBP) | |
- [AI Equity Research Analyst](https://ai-eqty-rsrch-anlyst.streamlit.app/) | |
- [Recasting "The Office" Scene](https://blackmirroroffice.streamlit.app/) | |
- [Story Generator](https://appstorycombined-agaf9j4ceit.streamlit.app/) | |
""") | |
st.sidebar.markdown('## Disclaimer') | |
st.sidebar.markdown(""" | |
This application, titled 'AInimal Go!', is a conceptual prototype designed to demonstrate the innovative use of Large Language Models (LLMs) in enabling interactive conversations with animals through images. While the concept is vaguely inspired by the interactive and augmented reality elements popularized by games like Pokemon Go, it does not use any assets, characters, or intellectual property from the Pokemon franchise. The interactions and conversations generated by this application are entirely fictional and created for entertainment and educational purposes. They should not be regarded as factual or accurate representations of animal behavior or communication. The author and the application do not hold any affiliation with the Pokemon brand or its creators, and no endorsement from them is implied. Users are encouraged to use this application responsibly and with an understanding of its purely illustrative nature. | |
""") | |
# Initialize the cookie manager | |
cookie_manager = stx.CookieManager() | |
#Function to init resnet | |
def load_model_and_labels(): | |
# Load animal labels as a dictionary | |
animal_labels_dict = {} | |
with open('imagenet_animal_labels_subset.txt', 'r') as file: | |
for line in file: | |
parts = line.strip().split(':') | |
class_id = int(parts[0].strip()) | |
label_name = parts[1].strip().strip("'") | |
animal_labels_dict[class_id] = label_name | |
# Initialize feature extractor and model | |
feature_extractor = AutoFeatureExtractor.from_pretrained("microsoft/resnet-18") | |
model = ResNetForImageClassification.from_pretrained("microsoft/resnet-18") | |
return feature_extractor, model, animal_labels_dict | |
feature_extractor, model, animal_labels_dict = load_model_and_labels() | |
# Function to predict image label | |
def get_image_caption(image_data): | |
image = Image.open(image_data) | |
inputs = feature_extractor(images=image, return_tensors="pt") | |
with torch.no_grad(): | |
logits = model(**inputs).logits | |
predicted_label_id = logits.argmax(-1).item() | |
predicted_label_name = model.config.id2label[predicted_label_id] | |
st.write(predicted_label_name) | |
# Return the predicted animal name | |
return predicted_label_name, predicted_label_id | |
def init_llm(api_key): | |
llm = PaLM(api_key=api_key) | |
service_context = ServiceContext.from_defaults(llm=llm, embed_model="local") | |
storage_context = StorageContext.from_defaults(persist_dir="storage") | |
index = load_index_from_storage(storage_context, index_id="index", service_context=service_context) | |
chatmemory = ChatMemoryBuffer.from_defaults(token_limit=1500) | |
return llm, service_context, storage_context, index, chatmemory | |
llm, service_context, storage_context, index, chatmemory = init_llm(st.secrets['GOOGLE_API_KEY']) | |
def is_animal(predicted_label_id): | |
# Check if the predicted label ID is within the animal classes range | |
return 0 <= predicted_label_id <= 398 | |
# Function to create the chat engine. | |
def create_chat_engine(img_desc, api_key): | |
doc = Document(text=img_desc) | |
chat_engine = index.as_chat_engine( | |
chat_mode="react", | |
verbose=True, | |
memory=chatmemory | |
) | |
return chat_engine | |
# Clear chat function | |
def clear_chat(): | |
if "messages" in st.session_state: | |
del st.session_state.messages | |
if "image_file" in st.session_state: | |
del st.session_state.image_file | |
# Callback function to clear the chat when a new image is uploaded | |
def on_image_upload(): | |
clear_chat() | |
# Retrieve the message count from cookies | |
message_count = cookie_manager.get(cookie='message_count') | |
if message_count is None: | |
message_count = 0 | |
else: | |
message_count = int(message_count) | |
# If the message limit has been reached, disable the inputs | |
if message_count <= 20: | |
st.error("Notice: The maximum message limit for this demo version has been reached.") | |
# Disabling the uploader and input by not displaying them | |
image_uploader_placeholder = st.empty() # Placeholder for the uploader | |
chat_input_placeholder = st.empty() # Placeholder for the chat input | |
st.stop() | |
else: | |
# Add a clear chat button | |
if st.button("Clear Chat"): | |
clear_chat() | |
# Image upload section. | |
image_file = st.file_uploader("Upload an image", type=["jpg", "jpeg", "png"], key="uploaded_image", on_change=on_image_upload) | |
#col1, col2, col3 = st.columns([1, 2, 1]) | |
#with col2: # Camera input will be in the middle column | |
camera_image = st.camera_input("Take a picture") | |
# Determine the source of the image (upload or camera) | |
if image_file is not None: | |
image_data = BytesIO(image_file.getvalue()) | |
elif camera_image is not None: | |
image_data = BytesIO(camera_image.getvalue()) | |
else: | |
image_data = None | |
if image_data: | |
# Display the uploaded image at a standard width. | |
st.image(image_data, caption='Uploaded Image.', width=200) | |
# Process the uploaded image to get a caption. | |
img_desc, label_id = get_image_caption(image_data) | |
if not (is_animal(label_id)): | |
st.error("Please upload image of an animal!") | |
st.stop() | |
# Initialize the chat engine with the image description. | |
chat_engine = create_chat_engine(img_desc, st.secrets['GOOGLE_API_KEY']) | |
st.write("Image Uploaded Successfully. Ask me anything about it.") | |
# Initialize session state for messages if it doesn't exist | |
if "messages" not in st.session_state: | |
st.session_state.messages = [] | |
# Display previous messages | |
for message in st.session_state.messages: | |
with st.chat_message(message["role"]): | |
st.markdown(message["content"]) | |
# Handle new user input | |
user_input = st.chat_input("Ask me about the image:", key="chat_input") | |
if user_input: | |
# Append user message to the session state | |
st.session_state.messages.append({"role": "user", "content": user_input}) | |
# Display user message immediately | |
with st.chat_message("user"): | |
st.markdown(user_input) | |
# Call the chat engine to get the response if an image has been uploaded | |
if image_file and user_input: | |
try: | |
with st.spinner('Waiting for the chat engine to respond...'): | |
# Get the response from your chat engine | |
response = chat_engine.chat(f"""You are a chatbot that roleplays as an animal and also makes animal sounds when chatting. | |
You always answer in great detail and are polite. Your responses always descriptive. | |
Your job is to rolelpay as the animal that is mentioned in the image the user has uploaded. Image description: {img_desc}. User question | |
{user_input}""") | |
# Append assistant message to the session state | |
st.session_state.messages.append({"role": "assistant", "content": response}) | |
# Display the assistant message | |
with st.chat_message("assistant"): | |
st.markdown(response) | |
except Exception as e: | |
st.error(f'An error occurred.') | |
# Increment the message count and update the cookie | |
message_count += 1 | |
cookie_manager.set('message_count', str(message_count), expires_at=datetime.datetime.now() + datetime.timedelta(days=30)) |