Spaces:
Sleeping
Sleeping
File size: 7,744 Bytes
54b712c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 |
import streamlit as st
from PyPDF2 import PdfReader
import docx2txt
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_google_genai import GoogleGenerativeAIEmbeddings, ChatGoogleGenerativeAI
from langchain_community.vectorstores import FAISS
from langchain.chains.question_answering import load_qa_chain
from langchain.prompts import PromptTemplate
from dotenv import load_dotenv
import os
import google.generativeai as genai
import logging
import json
import base64
from datetime import datetime
import sqlite3
load_dotenv()
# Configure logging
logging.basicConfig(level=logging.DEBUG)
# Configure Generative AI API key
api_key = os.getenv("GOOGLE_API_KEY")
if not api_key:
logging.error("Google API key not found. Make sure .env file is set up correctly.")
genai.configure(api_key=api_key)
# Initialize a global list to store query history
query_history = []
# Connect to the SQLite database
conn = sqlite3.connect('documents.db')
c = conn.cursor()
# Create the documents table if it doesn't exist
c.execute('''CREATE TABLE IF NOT EXISTS documents
(id INTEGER PRIMARY KEY, document_type TEXT, document_content TEXT)''')
# Create the query_history table if it doesn't exist
c.execute('''CREATE TABLE IF NOT EXISTS query_history
(id INTEGER PRIMARY KEY, user_id TEXT, query TEXT, response TEXT, timestamp TEXT)''')
conn.commit()
def get_document_text(document, document_type):
"""Extract text from different document types."""
if document_type == 'pdf':
pdf_reader = PdfReader(document)
text = ""
for page in pdf_reader.pages:
text += page.extract_text()
return text
elif document_type == 'docx':
return docx2txt.process(document)
elif document_type == 'txt':
return document.read()
else:
return ""
def get_text_chunks(text):
"""Split text into manageable chunks."""
text_splitter = RecursiveCharacterTextSplitter(chunk_size=10000, chunk_overlap=1000)
chunks = text_splitter.split_text(text)
return chunks
def get_vector_store(text_chunks):
"""Generate embeddings and create FAISS index."""
embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001")
vector_store = FAISS.from_texts(text_chunks, embedding=embeddings)
vector_store.save_local("faiss_index")
logging.info("FAISS index successfully created and saved.")
def get_conversational_chain():
"""Load conversational chain for question answering."""
prompt_template = """
Answer the following question based strictly on the provided context. If the context is similar, use it to answer as accurately as possible. If the context is significantly different or irrelevant, respond with "Sorry, the question is out of context." Ensure your answer is detailed, accurate, and only includes information from the context provided.
Context:
{context}
Question:
{question}
Answer:
"""
model = ChatGoogleGenerativeAI(model="gemini-pro", temperature=0.3)
prompt = PromptTemplate(template=prompt_template, input_variables=["context", "question"])
chain = load_qa_chain(model, chain_type="stuff", prompt=prompt)
return chain
def user_input(user_question, user_id):
"""Process user input and generate response."""
embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001")
# Check if the FAISS index file exists before attempting to load it
if not os.path.exists("faiss_index/index.faiss"):
logging.error("FAISS index file not found. Ensure that the index is created and saved properly.")
return "Error: FAISS index file not found."
# Load FAISS index with the necessary flag
new_db = FAISS.load_local("faiss_index", embeddings, allow_dangerous_deserialization=True)
docs = new_db.similarity_search(user_question)
# Load conversational chain
chain = get_conversational_chain()
# Generate response
response = chain({"input_documents": docs, "question": user_question}, return_only_outputs=True)
response_text = response["output_text"]
# Store query and response in the history
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
query_history.append((user_id, user_question, response_text, current_time))
# Store query and response in the database
c.execute("INSERT INTO query_history (user_id, query, response, timestamp) VALUES (?, ?, ?, ?)",
(user_id, user_question, response_text, current_time))
conn.commit()
return response_text
def display_query_history(user_id):
"""Display the history of queries and responses for a specific user."""
st.sidebar.subheader("Query History")
c.execute("SELECT query, response, timestamp FROM query_history WHERE user_id = ?", (user_id,))
history = c.fetchall()
for query, response, timestamp in history:
st.sidebar.write(f"**Query:** {query}")
st.sidebar.write(f"**Response:** {response}")
st.sidebar.write(f"**Timestamp:** {timestamp}")
st.sidebar.write("---")
def download_query_history(user_id):
"""Allow users to download their query history as a JSON file."""
c.execute("SELECT query, response, timestamp FROM query_history WHERE user_id = ?", (user_id,))
history = c.fetchall()
history_json = json.dumps([{"query": query, "response": response, "timestamp": timestamp} for query, response, timestamp in history], indent=4)
b64 = base64.b64encode(history_json.encode()).decode() # Encode the history as base64
href = f'<a href="data:file/json;base64,{b64}" download="query_history.json">Download Query History</a>'
st.sidebar.markdown(href, unsafe_allow_html=True)
def main():
"""Main Streamlit application function."""
st.set_page_config("Chat with Documents")
st.header("ππ Chat with Documents ππ")
user_id = st.text_input("Enter your user ID:")
user_question = st.text_input("Ask a Question from the Documents")
if user_question and user_id:
response = user_input(user_question, user_id)
st.write("Reply: ", response)
with st.sidebar:
st.title("Menu:")
document_type = st.selectbox("Select Document Type", ["pdf", "docx", "txt"])
document = st.file_uploader(f"Upload your {document_type.upper()} Documents", accept_multiple_files=True)
if st.button("Submit & Process"):
with st.spinner("Processing..."):
try:
if document:
for doc in document:
doc_text = get_document_text(doc, document_type)
text_chunks = get_text_chunks(doc_text)
get_vector_store(text_chunks)
c.execute("INSERT INTO documents (document_type, document_content) VALUES (?, ?)",
(document_type, doc_text))
conn.commit()
st.success("Documents processed and stored in the database.")
else:
st.error("Please upload documents before processing.")
except Exception as e:
logging.error("Error processing documents: %s", e)
st.error(f"An error occurred: {e}")
# Display the query history in the sidebar
display_query_history(user_id)
# Add download button for query history
download_query_history(user_id)
if __name__ == "__main__":
main() |