multimodal / app.py
NEXAS's picture
Update app.py
cd4fff6 verified
import streamlit as st
import tempfile
import base64
import os
import nltk
nltk.download('punkt')
from src.utils.ingest_text import create_vector_database
from src.utils.ingest_image import extract_and_store_images
from src.utils.text_qa import qa_bot
from src.utils.image_qa import query_and_print_results
import nest_asyncio
from langchain.memory import ConversationBufferWindowMemory
from langchain_community.chat_message_histories import StreamlitChatMessageHistory
from dotenv import load_dotenv
# Setup
nest_asyncio.apply()
load_dotenv()
st.set_page_config(layout='wide', page_title="InsightFusion Chat")
# ---- Session State Initialization ----
if "messages" not in st.session_state:
st.session_state.messages = []
if "chain" not in st.session_state:
st.session_state.chain = None
if "image_vdb" not in st.session_state:
st.session_state.image_vdb = None
# ---- Chat Memory Setup ----
memory_storage = StreamlitChatMessageHistory(key="chat_messages")
memory = ConversationBufferWindowMemory(
memory_key="chat_history",
human_prefix="User",
chat_memory=memory_storage,
k=3
)
# ---- Background Image Setup ----
image_bg = r"data/pexels-andreea-ch-371539-1166644.jpg"
def add_bg_from_local(image_file):
with open(image_file, "rb") as img:
encoded_string = base64.b64encode(img.read())
st.markdown(f"""<style>.stApp {{
background-image: url(data:image/png;base64,{encoded_string.decode()});
background-size: cover;
}}</style>""", unsafe_allow_html=True)
add_bg_from_local(image_bg)
# ---- Title ----
st.markdown("""
<svg width="600" height="100">
<text x="50%" y="50%" font-family="San serif" font-size="42px" fill="Black" text-anchor="middle" stroke="white"
stroke-width="0.3" stroke-linejoin="round">InsightFusion Chat
</text>
</svg>
""", unsafe_allow_html=True)
# ---- Utility ----
def get_answer(query, chain):
try:
response = chain.invoke(query)
return response['result']
except Exception as e:
st.error(f"Error in get_answer: {e}")
return None
# ---- File Upload ----
path = None
uploaded_file = st.file_uploader("Upload a PDF file", type="pdf")
if uploaded_file is not None:
temp_file_path = os.path.join("temp", uploaded_file.name)
os.makedirs("temp", exist_ok=True)
with open(temp_file_path, "wb") as f:
f.write(uploaded_file.getbuffer())
path = os.path.abspath(temp_file_path)
st.success("Document uploaded successfully!")
st.write(f"File saved to: {path}")
# ---- Demo PDF Option ----
st.markdown("### Or use a demo file:")
if st.button("Use Demo PDF"):
demo_file_path = os.path.join("pdf_resource", "sample.pdf") # Replace with actual demo name
if os.path.exists(demo_file_path):
path = os.path.abspath(demo_file_path)
st.success(f"Using demo file: {path}")
with st.spinner("Processing demo file..."):
try:
client = create_vector_database(path)
image_vdb = extract_and_store_images(path)
chain = qa_bot(client)
st.session_state.chain = chain
st.session_state.image_vdb = image_vdb
st.success("Demo file processed successfully.")
except Exception as e:
st.error(f"Error processing demo PDF: {e}")
else:
st.error("Demo PDF not found in pdf_resource/.")
# ---- Processing Button ----
if st.button("Start Processing"):
if path is not None:
with st.spinner("Processing file..."):
try:
client = create_vector_database(path)
image_vdb = extract_and_store_images(path)
chain = qa_bot(client)
st.session_state.chain = chain
st.session_state.image_vdb = image_vdb
st.success("File processed successfully.")
except Exception as e:
st.error(f"Error during processing: {e}")
else:
st.error("Please upload a file or select a demo.")
# ---- Style Customization ----
st.markdown("""
<style>
.stChatInputContainer > div {
background-color: #000000;
}
</style>
""", unsafe_allow_html=True)
# ---- Chat Interface ----
if user_input := st.chat_input("User Input"):
if st.session_state.chain and st.session_state.image_vdb:
with st.chat_message("user"):
st.markdown(user_input)
with st.spinner("Generating Response..."):
response = get_answer(user_input, st.session_state.chain)
if response:
with st.chat_message("assistant"):
st.markdown(response)
memory.save_context({"input": user_input}, {"output": response})
st.session_state.messages.append({"role": "user", "content": user_input})
st.session_state.messages.append({"role": "assistant", "content": response})
try:
query_and_print_results(st.session_state.image_vdb, user_input)
except Exception as e:
st.error(f"Error querying image database: {e}")
else:
st.error("Failed to generate a response.")
else:
st.error("Please process a file before chatting.")
# ---- Display Chat History ----
for msg in st.session_state.messages:
with st.chat_message(msg["role"]):
st.write(msg["content"])
# ---- Display Memory History (LangChain) ----
for i, msg in enumerate(memory_storage.messages):
role = "user" if i % 2 == 0 else "assistant"
st.chat_message(role).markdown(msg.content)