|
import os |
|
import zipfile |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import langchain |
|
import chromadb |
|
import faiss |
|
import openai |
|
import pypdf |
|
|
|
import os |
|
import logging |
|
import fitz |
|
import re |
|
from langchain.embeddings import OpenAIEmbeddings |
|
from langchain.vectorstores import Chroma |
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
from langchain.docstore.document import Document |
|
|
|
|
|
print("All packages installed successfully!") |
|
|
|
|
|
|
|
|
|
|
|
import gdown |
|
|
|
os.system('!rm -rf ./*') |
|
|
|
|
|
url = 'https://drive.google.com/file/d/19JKWygyiD2IC_1xdDn1u3vxGZ7aT43d1/view?usp=sharing' |
|
|
|
|
|
output = 'files.zip' |
|
|
|
|
|
gdown.download(url, output, quiet=False, fuzzy=True) |
|
|
|
|
|
def extract_files_in_same_directory(zip_file_path): |
|
""" |
|
Extracts all files from a ZIP archive into the same directory as the ZIP file. |
|
|
|
Args: |
|
zip_file_path (str): Path to the ZIP file. |
|
""" |
|
|
|
if not os.path.exists(zip_file_path): |
|
print(f"Error: The file {zip_file_path} does not exist.") |
|
return |
|
|
|
|
|
if not zip_file_path.endswith('.zip'): |
|
print(f"Error: {zip_file_path} is not a ZIP file.") |
|
return |
|
|
|
|
|
output_dir = os.path.dirname(zip_file_path) |
|
|
|
|
|
try: |
|
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: |
|
zip_ref.extractall(output_dir) |
|
print(f"Extracted files from {zip_file_path} to {output_dir}") |
|
except Exception as e: |
|
print(f"Error extracting {zip_file_path}: {e}") |
|
|
|
|
|
zip_file = "./files.zip" |
|
extract_files_in_same_directory(zip_file) |
|
|
|
os.system('!rm -rf ./files.zip') |
|
|
|
import os |
|
import logging |
|
import fitz |
|
import re |
|
from langchain.embeddings import OpenAIEmbeddings |
|
from langchain.vectorstores import Chroma |
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
from langchain.docstore.document import Document |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY") |
|
|
|
|
|
def custom_load_pdfs(directory): |
|
all_documents = [] |
|
for root, dirs, files in os.walk(directory): |
|
for filename in files: |
|
if filename.endswith('.pdf'): |
|
file_path = os.path.join(root, filename) |
|
try: |
|
doc = fitz.open(file_path) |
|
for page_num in range(len(doc)): |
|
page = doc.load_page(page_num) |
|
text = page.get_text() |
|
|
|
|
|
footer_text = page.get_text("text", flags=fitz.TEXT_PRESERVE_LIGATURES) |
|
match = re.search(r'Page\s+(\d+)', footer_text, re.IGNORECASE) |
|
extracted_page_number = match.group(1) if match else f"{page_num + 1}" |
|
|
|
document = Document( |
|
page_content=text, |
|
metadata={ |
|
"source": file_path, |
|
"page_number": extracted_page_number, |
|
} |
|
) |
|
all_documents.append(document) |
|
print(f"Loaded {len(doc)} pages from '{file_path}'.") |
|
except Exception as e: |
|
print(f"Failed to load '{file_path}': {e}") |
|
return all_documents |
|
|
|
|
|
splitter = RecursiveCharacterTextSplitter( |
|
chunk_size=1000, |
|
chunk_overlap=300, |
|
separators=["\n\n", "\n", " ", ""] |
|
) |
|
|
|
|
|
embeddings = OpenAIEmbeddings(model="text-embedding-3-small") |
|
|
|
|
|
top_level_directory = "./Content_files/Loan_docs/Loan Docs" |
|
|
|
|
|
for folder_name in os.listdir(top_level_directory): |
|
folder_path = os.path.join(top_level_directory, folder_name) |
|
|
|
if os.path.isdir(folder_path): |
|
logger.info(f"Processing folder: {folder_path}") |
|
|
|
|
|
all_documents = custom_load_pdfs(folder_path) |
|
logger.info(f"Total documents loaded from {folder_name}: {len(all_documents)}.") |
|
|
|
|
|
split_documents = splitter.split_documents(all_documents) |
|
logger.info(f"Split into {len(split_documents)} chunks for {folder_name}.") |
|
|
|
|
|
unique_chunks = [] |
|
seen_contents = set() |
|
for chunk in split_documents: |
|
content_hash = hash(chunk.page_content) |
|
if content_hash not in seen_contents: |
|
unique_chunks.append(chunk) |
|
seen_contents.add(content_hash) |
|
|
|
logger.info(f"After removing duplicates, {len(unique_chunks)} unique chunks remain for {folder_name}.") |
|
|
|
|
|
try: |
|
persist_directory = os.path.join("Vectors", folder_name) |
|
os.makedirs(persist_directory, exist_ok=True) |
|
vectorstore = Chroma.from_documents( |
|
documents=unique_chunks, |
|
embedding=embeddings, |
|
persist_directory=persist_directory |
|
) |
|
logger.info(f"Chroma vector store created successfully for {folder_name}.") |
|
except Exception as e: |
|
logger.error(f"Error creating Chroma vector store for {folder_name}: {e}") |
|
|
|
|
|
try: |
|
vectorstore.persist() |
|
logger.info(f"Chroma index persisted to {persist_directory}.") |
|
except Exception as e: |
|
logger.error(f"Error persisting Chroma index for {folder_name}: {e}") |
|
|
|
|
|
import os |
|
import zipfile |
|
|
|
def create_zip_from_folders(zip_file_path, folders_to_zip): |
|
""" |
|
Creates a ZIP file containing the contents of specified folders. |
|
|
|
Args: |
|
zip_file_path (str): The full path of the ZIP file to create. |
|
folders_to_zip (list): List of folder paths to include in the ZIP file. |
|
""" |
|
try: |
|
with zipfile.ZipFile(zip_file_path, 'w') as zipf: |
|
for folder_path in folders_to_zip: |
|
if os.path.exists(folder_path) and os.path.isdir(folder_path): |
|
|
|
for root, _, files in os.walk(folder_path): |
|
for file in files: |
|
file_path = os.path.join(root, file) |
|
|
|
arcname = os.path.relpath(file_path, start=folder_path) |
|
zipf.write(file_path, os.path.join(os.path.basename(folder_path), arcname)) |
|
print(f"Added {file_path} as {os.path.join(os.path.basename(folder_path), arcname)}") |
|
else: |
|
print(f"Folder not found or is not a directory: {folder_path}") |
|
print(f"ZIP file created at: {zip_file_path}") |
|
except Exception as e: |
|
print(f"Error creating ZIP file: {e}") |
|
|
|
|
|
folders = [ |
|
"./Vectors/*" |
|
|
|
|
|
] |
|
zip_output_path = "./vectors(2).zip" |
|
|
|
create_zip_from_folders(zip_output_path, folders) |
|
|
|
import os |
|
import sys |
|
import logging |
|
from getpass import getpass |
|
from langchain.embeddings import OpenAIEmbeddings |
|
from langchain.vectorstores import Chroma |
|
from langchain.chat_models import ChatOpenAI |
|
from langchain.chains.question_answering import load_qa_chain |
|
from langchain.prompts import ChatPromptTemplate |
|
import gradio as gr |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
def get_absolute_path(relative_path): |
|
if getattr(sys, 'frozen', False): |
|
|
|
|
|
|
|
base_path = sys._MEIPASS |
|
else: |
|
base_path = os.path.abspath(".") |
|
return os.path.join(base_path, relative_path) |
|
|
|
|
|
openai_api_key = os.getenv("OPENAI_API_KEY") |
|
if not openai_api_key: |
|
openai_api_key = getpass("Enter your OpenAI API key2: ") |
|
os.environ["OPENAI_API_KEY"] = openai_api_key |
|
|
|
|
|
embeddings = OpenAIEmbeddings(model="text-embedding-3-small") |
|
|
|
|
|
def list_vectorstore_directories(base_path='vectorstores'): |
|
""" |
|
Lists all subdirectories in the base_path which are potential vector store directories. |
|
""" |
|
directories = [] |
|
try: |
|
for entry in os.listdir(base_path): |
|
full_path = os.path.join(base_path, entry) |
|
if os.path.isdir(full_path): |
|
|
|
required_files = ['chroma.sqlite3'] |
|
if all(os.path.exists(os.path.join(full_path, file)) for file in required_files): |
|
print(full_path) |
|
directories.append(full_path) |
|
except Exception as e: |
|
logger.error(f"Error listing directories in '{base_path}': {e}") |
|
return directories |
|
|
|
|
|
def load_selected_vectorstores(selected_dirs): |
|
""" |
|
Loads Chroma vector stores from the selected directories. |
|
""" |
|
vectorstores = [] |
|
for directory in selected_dirs: |
|
try: |
|
vectorstore = Chroma( |
|
persist_directory=directory, |
|
embedding_function=embeddings |
|
) |
|
vectorstores.append(vectorstore) |
|
logger.info(f"Loaded vectorstore from '{directory}'.") |
|
except Exception as e: |
|
logger.error(f"Error loading vectorstore from '{directory}': {e}") |
|
return vectorstores |
|
|
|
|
|
def create_combined_retriever(vectorstores, search_kwargs={"k": 20}): |
|
retrievers = [vs.as_retriever(search_kwargs=search_kwargs) for vs in vectorstores] |
|
|
|
class CombinedRetriever: |
|
def __init__(self, retrievers): |
|
self.retrievers = retrievers |
|
|
|
def get_relevant_documents(self, query): |
|
docs = [] |
|
for retriever in self.retrievers: |
|
try: |
|
docs.extend(retriever.get_relevant_documents(query)) |
|
except Exception as e: |
|
logger.error(f"Error retrieving documents: {e}") |
|
|
|
unique_docs = { (doc.page_content, doc.metadata.get('source', '')): doc for doc in docs } |
|
return list(unique_docs.values()) |
|
|
|
return CombinedRetriever(retrievers) |
|
|
|
|
|
def answer_question(selected_dirs, question): |
|
if not selected_dirs: |
|
return "Please select at least one vector store directory." |
|
|
|
|
|
vectorstores = load_selected_vectorstores(selected_dirs) |
|
if not vectorstores: |
|
return "No vector stores loaded. Please check the selected directories." |
|
|
|
|
|
combined_retriever = create_combined_retriever(vectorstores, search_kwargs={"k": 20}) |
|
|
|
|
|
try: |
|
llm = ChatOpenAI(model_name="gpt-4o") |
|
except Exception as e: |
|
logger.error(f"Error loading LLM: {e}") |
|
return "Error loading the language model. Please check your OpenAI API key and access." |
|
|
|
|
|
template = """ |
|
You are an AI assistant specialized in extracting precise information from legal documents. |
|
Special emphasis on documents but refer outside if necessary. |
|
Always include the source filename and page number in your response. |
|
If multiple documents are the always prefer the lastest date ones. |
|
If ammendment documents are the always prefer the ammendments. |
|
|
|
Context: |
|
{context} |
|
|
|
Question: {input} |
|
|
|
Answer: |
|
""" |
|
|
|
prompt = ChatPromptTemplate.from_template(template) |
|
|
|
|
|
try: |
|
qa_chain = load_qa_chain(llm, chain_type="stuff", prompt=prompt) |
|
except Exception as e: |
|
logger.error(f"Error creating QA chain: {e}") |
|
return "Error initializing the QA system." |
|
|
|
|
|
try: |
|
retrieved_docs = combined_retriever.get_relevant_documents(question) |
|
except Exception as e: |
|
logger.error(f"Error retrieving documents: {e}") |
|
return "Error retrieving documents." |
|
|
|
if not retrieved_docs: |
|
return "No relevant documents found for the question." |
|
|
|
|
|
for doc in retrieved_docs: |
|
source = doc.metadata.get("source", "Unknown Source") |
|
page_number = doc.metadata.get("page_number", "Unknown Page") |
|
doc.page_content = f"Source: {source}\nPage: {page_number}\nContent: {doc.page_content}" |
|
|
|
|
|
try: |
|
response = qa_chain.run(input_documents=retrieved_docs, input=question) |
|
except Exception as e: |
|
logger.error(f"Error generating response: {e}") |
|
return "Error generating the response." |
|
|
|
return response |
|
|
|
|
|
|
|
|
|
vectorstores_path = get_absolute_path('./Vectors') |
|
|
|
|
|
available_dirs = list_vectorstore_directories(vectorstores_path) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
iface = gr.Interface( |
|
fn=answer_question, |
|
inputs=[ |
|
gr.CheckboxGroup( |
|
choices=available_dirs, |
|
label="Select Vector Store Directories" |
|
), |
|
gr.Textbox( |
|
lines=2, |
|
placeholder="Enter your question here...", |
|
label="Your Question" |
|
) |
|
], |
|
outputs=gr.Textbox(label="Response"), |
|
title="Vector Store QA Assistant", |
|
description="Select one or more vector store directories and ask your question. The assistant will retrieve relevant documents and provide an answer.", |
|
allow_flagging="never" |
|
) |
|
|
|
os.system('!rm -rf ./Content_files') |
|
|
|
|
|
iface.launch(debug=True) |