Spaces:
Sleeping
Sleeping
import streamlit as st | |
import tempfile | |
import base64 | |
import os | |
import nltk | |
nltk.download('punkt') | |
from src.utils.ingest_text import create_vector_database | |
from src.utils.ingest_image import extract_and_store_images | |
from src.utils.text_qa import qa_bot | |
from src.utils.image_qa import query_and_print_results | |
import nest_asyncio | |
from langchain.memory import ConversationBufferWindowMemory | |
from langchain_community.chat_message_histories import StreamlitChatMessageHistory | |
from dotenv import load_dotenv | |
# Setup | |
nest_asyncio.apply() | |
load_dotenv() | |
st.set_page_config(layout='wide', page_title="InsightFusion Chat") | |
# ---- Session State Initialization ---- | |
if "messages" not in st.session_state: | |
st.session_state.messages = [] | |
if "chain" not in st.session_state: | |
st.session_state.chain = None | |
if "image_vdb" not in st.session_state: | |
st.session_state.image_vdb = None | |
# ---- Chat Memory Setup ---- | |
memory_storage = StreamlitChatMessageHistory(key="chat_messages") | |
memory = ConversationBufferWindowMemory( | |
memory_key="chat_history", | |
human_prefix="User", | |
chat_memory=memory_storage, | |
k=3 | |
) | |
# ---- Background Image Setup ---- | |
image_bg = r"data/pexels-andreea-ch-371539-1166644.jpg" | |
def add_bg_from_local(image_file): | |
with open(image_file, "rb") as img: | |
encoded_string = base64.b64encode(img.read()) | |
st.markdown(f"""<style>.stApp {{ | |
background-image: url(data:image/png;base64,{encoded_string.decode()}); | |
background-size: cover; | |
}}</style>""", unsafe_allow_html=True) | |
add_bg_from_local(image_bg) | |
# ---- Title ---- | |
st.markdown(""" | |
<svg width="600" height="100"> | |
<text x="50%" y="50%" font-family="San serif" font-size="42px" fill="Black" text-anchor="middle" stroke="white" | |
stroke-width="0.3" stroke-linejoin="round">InsightFusion Chat | |
</text> | |
</svg> | |
""", unsafe_allow_html=True) | |
# ---- Utility ---- | |
def get_answer(query, chain): | |
try: | |
response = chain.invoke(query) | |
return response['result'] | |
except Exception as e: | |
st.error(f"Error in get_answer: {e}") | |
return None | |
# ---- File Upload ---- | |
path = None | |
uploaded_file = st.file_uploader("Upload a PDF file", type="pdf") | |
if uploaded_file is not None: | |
temp_file_path = os.path.join("temp", uploaded_file.name) | |
os.makedirs("temp", exist_ok=True) | |
with open(temp_file_path, "wb") as f: | |
f.write(uploaded_file.getbuffer()) | |
path = os.path.abspath(temp_file_path) | |
st.success("Document uploaded successfully!") | |
st.write(f"File saved to: {path}") | |
# ---- Demo PDF Option ---- | |
st.markdown("### Or use a demo file:") | |
if st.button("Use Demo PDF"): | |
demo_file_path = os.path.join("pdf_resource", "sample.pdf") # Replace with actual demo name | |
if os.path.exists(demo_file_path): | |
path = os.path.abspath(demo_file_path) | |
st.success(f"Using demo file: {path}") | |
with st.spinner("Processing demo file..."): | |
try: | |
client = create_vector_database(path) | |
image_vdb = extract_and_store_images(path) | |
chain = qa_bot(client) | |
st.session_state.chain = chain | |
st.session_state.image_vdb = image_vdb | |
st.success("Demo file processed successfully.") | |
except Exception as e: | |
st.error(f"Error processing demo PDF: {e}") | |
else: | |
st.error("Demo PDF not found in pdf_resource/.") | |
# ---- Processing Button ---- | |
if st.button("Start Processing"): | |
if path is not None: | |
with st.spinner("Processing file..."): | |
try: | |
client = create_vector_database(path) | |
image_vdb = extract_and_store_images(path) | |
chain = qa_bot(client) | |
st.session_state.chain = chain | |
st.session_state.image_vdb = image_vdb | |
st.success("File processed successfully.") | |
except Exception as e: | |
st.error(f"Error during processing: {e}") | |
else: | |
st.error("Please upload a file or select a demo.") | |
# ---- Style Customization ---- | |
st.markdown(""" | |
<style> | |
.stChatInputContainer > div { | |
background-color: #000000; | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
# ---- Chat Interface ---- | |
if user_input := st.chat_input("User Input"): | |
if st.session_state.chain and st.session_state.image_vdb: | |
with st.chat_message("user"): | |
st.markdown(user_input) | |
with st.spinner("Generating Response..."): | |
response = get_answer(user_input, st.session_state.chain) | |
if response: | |
with st.chat_message("assistant"): | |
st.markdown(response) | |
memory.save_context({"input": user_input}, {"output": response}) | |
st.session_state.messages.append({"role": "user", "content": user_input}) | |
st.session_state.messages.append({"role": "assistant", "content": response}) | |
try: | |
query_and_print_results(st.session_state.image_vdb, user_input) | |
except Exception as e: | |
st.error(f"Error querying image database: {e}") | |
else: | |
st.error("Failed to generate a response.") | |
else: | |
st.error("Please process a file before chatting.") | |
# ---- Display Chat History ---- | |
for msg in st.session_state.messages: | |
with st.chat_message(msg["role"]): | |
st.write(msg["content"]) | |
# ---- Display Memory History (LangChain) ---- | |
for i, msg in enumerate(memory_storage.messages): | |
role = "user" if i % 2 == 0 else "assistant" | |
st.chat_message(role).markdown(msg.content) | |