Create app.py
Browse files
app.py
ADDED
@@ -0,0 +1,114 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
import os
|
2 |
+
import fitz # PyMuPDF
|
3 |
+
from sentence_transformers import SentenceTransformer
|
4 |
+
import numpy as np
|
5 |
+
import faiss
|
6 |
+
from typing import List, Tuple, Dict
|
7 |
+
import gradio as gr
|
8 |
+
from google.generativeai import GenerativeModel, configure, types
|
9 |
+
|
10 |
+
# Configure the Google Generative AI API
|
11 |
+
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
|
12 |
+
configure(api_key=GOOGLE_API_KEY)
|
13 |
+
|
14 |
+
# MyApp class to handle PDFs and vector search
|
15 |
+
class MyApp:
|
16 |
+
def __init__(self) -> None:
|
17 |
+
self.documents = []
|
18 |
+
self.embeddings = None
|
19 |
+
self.index = None
|
20 |
+
self.model = SentenceTransformer('all-MiniLM-L6-v2')
|
21 |
+
|
22 |
+
def load_pdfs(self, file_paths: List[str]) -> None:
|
23 |
+
"""Extracts text from multiple PDF files and stores them."""
|
24 |
+
self.documents = []
|
25 |
+
for file_path in file_paths:
|
26 |
+
doc = fitz.open(file_path)
|
27 |
+
for page_num in range(len(doc)):
|
28 |
+
page = doc[page_num]
|
29 |
+
text = page.get_text()
|
30 |
+
self.documents.append({"file": file_path, "page": page_num + 1, "content": text})
|
31 |
+
print("PDFs processed successfully!")
|
32 |
+
|
33 |
+
def build_vector_db(self) -> None:
|
34 |
+
"""Builds a vector database using the content of the PDFs."""
|
35 |
+
if not self.documents:
|
36 |
+
print("No documents to process.")
|
37 |
+
return
|
38 |
+
contents = [doc["content"] for doc in self.documents]
|
39 |
+
self.embeddings = self.model.encode(contents, show_progress_bar=True)
|
40 |
+
self.index = faiss.IndexFlatL2(self.embeddings.shape[1])
|
41 |
+
self.index.add(np.array(self.embeddings))
|
42 |
+
print("Vector database built successfully!")
|
43 |
+
|
44 |
+
def search_documents(self, query: str, k: int = 3) -> List[Dict]:
|
45 |
+
"""Searches for relevant document snippets using vector similarity."""
|
46 |
+
if not self.index:
|
47 |
+
print("Vector database is not built.")
|
48 |
+
return []
|
49 |
+
query_embedding = self.model.encode([query], show_progress_bar=False)
|
50 |
+
D, I = self.index.search(np.array(query_embedding), k)
|
51 |
+
results = [self.documents[i] for i in I[0]]
|
52 |
+
return results if results else [{"content": "No relevant documents found."}]
|
53 |
+
|
54 |
+
# Create an instance of MyApp
|
55 |
+
app = MyApp()
|
56 |
+
|
57 |
+
# Gradio functions
|
58 |
+
def upload_files(files) -> str:
|
59 |
+
file_paths = [file.name for file in files]
|
60 |
+
app.load_pdfs(file_paths)
|
61 |
+
return f"Uploaded {len(files)} files successfully."
|
62 |
+
|
63 |
+
def build_vector_db() -> str:
|
64 |
+
app.build_vector_db()
|
65 |
+
return "Vector database built successfully!"
|
66 |
+
|
67 |
+
def respond(message: str, history: List[Tuple[str, str]]) -> Tuple[List[Tuple[str, str]], str]:
|
68 |
+
# Retrieve relevant documents
|
69 |
+
retrieved_docs = app.search_documents(message)
|
70 |
+
context = "\n".join(
|
71 |
+
[f"File: {doc['file']}, Page: {doc['page']}\n{doc['content'][:300]}..." for doc in retrieved_docs]
|
72 |
+
) # Trimming content for brevity
|
73 |
+
|
74 |
+
# Generate response using the generative model
|
75 |
+
model = GenerativeModel("gemini-1.5-pro-latest")
|
76 |
+
generation_config = types.GenerationConfig(
|
77 |
+
temperature=0.7,
|
78 |
+
max_output_tokens=1024,
|
79 |
+
)
|
80 |
+
|
81 |
+
try:
|
82 |
+
# The context is used as part of the prompt for the generative model
|
83 |
+
response = model.generate_content([f"Context:\n{context}\n\nQuestion:\n{message}"], generation_config=generation_config)
|
84 |
+
response_content = response.text if hasattr(response, "text") else "No response generated."
|
85 |
+
except Exception as e:
|
86 |
+
response_content = f"An error occurred while generating the response: {str(e)}"
|
87 |
+
|
88 |
+
# Append the message and generated response to the chat history
|
89 |
+
history.append((message, response_content))
|
90 |
+
return history, ""
|
91 |
+
|
92 |
+
# Gradio Interface
|
93 |
+
with gr.Blocks() as demo:
|
94 |
+
gr.Markdown("# PDF Chatbot")
|
95 |
+
gr.Markdown("Upload your PDFs, build a vector database, and start querying your documents.")
|
96 |
+
|
97 |
+
with gr.Row():
|
98 |
+
with gr.Column():
|
99 |
+
upload_btn = gr.File(label="Upload PDFs", file_types=[".pdf"], file_count="multiple")
|
100 |
+
upload_message = gr.Textbox(label="Upload Status", lines=2)
|
101 |
+
build_db_btn = gr.Button("Build Vector Database")
|
102 |
+
db_message = gr.Textbox(label="DB Build Status", lines=2)
|
103 |
+
|
104 |
+
upload_btn.change(upload_files, inputs=[upload_btn], outputs=[upload_message])
|
105 |
+
build_db_btn.click(build_vector_db, inputs=[], outputs=[db_message])
|
106 |
+
|
107 |
+
with gr.Column():
|
108 |
+
chatbot = gr.Chatbot(label="Chat Responses")
|
109 |
+
query_input = gr.Textbox(label="Enter your query here")
|
110 |
+
submit_btn = gr.Button("Submit")
|
111 |
+
submit_btn.click(respond, inputs=[query_input, chatbot], outputs=[chatbot, query_input])
|
112 |
+
|
113 |
+
# Launch the Gradio app
|
114 |
+
demo.launch()
|