#https://medium.com/@csakash03/hybrid-search-is-a-method-to-optimize-rag-implementation-98d9d0911341 #https://medium.com/etoai/hybrid-search-combining-bm25-and-semantic-search-for-better-results-with-lan-1358038fe7e6 import gradio as gr import zipfile import os import re from pathlib import Path import chromadb from langchain_huggingface import HuggingFaceEmbeddings, HuggingFaceEndpoint from langchain_chroma import Chroma from langchain.textsplitters import RecursiveCharacterTextSplitter import hashlib import nltk from rank_bm25 import BM25Okapi import numpy as np from langchain.schema import Document from dotenv import load_dotenv # Download the required NLTK data nltk.download('punkt') # Define embeddings using Hugging Face models embeddings = HuggingFaceEmbeddings() load_dotenv() HF_TOKEN = os.getenv("HF_TOKEN") # Initialize Chroma vector store persist_directory = "./chroma_langchain_db" client = chromadb.PersistentClient() collection = client.get_or_create_collection("whatsapp_collection") vector_store = Chroma( collection_name="whatsapp_collection", embedding_function=embeddings, persist_directory=persist_directory, ) # Define global variables bm25 = None all_texts = [] processed_files = {} # Dictionary to store hashes of processed files llm = HuggingFaceEndpoint( repo_id="mistralai/Mistral-7B-Instruct-v0.3", huggingfacehub_api_token=HF_TOKEN.strip(), temperature=0.1, max_new_tokens=200 ) # Function to remove emojis and clean the text def clean_text(text): # Remove emojis text = re.sub(r'[^\x00-\x7F]+', '', text) # Additional cleaning if necessary text = re.sub(r'\s+', ' ', text).strip() return text # Function to compute a file hash for identifying duplicates def compute_file_hash(file_path): hasher = hashlib.md5() with open(file_path, 'rb') as f: buf = f.read() hasher.update(buf) return hasher.hexdigest() # Function to process and upload the zip file to Chroma def process_and_upload_zip(zip_file): global bm25, all_texts, processed_files temp_dir = Path("temp") temp_dir.mkdir(exist_ok=True) # Compute hash to check if file has been processed zip_file_hash = compute_file_hash(zip_file.name) # If the file has been processed before, skip re-uploading if zip_file_hash in processed_files: return f"File '{zip_file.name}' already processed. Using existing Chroma storage." # Extract the zip file with zipfile.ZipFile(zip_file.name, 'r') as zip_ref: zip_ref.extractall(temp_dir) # Load and clean the chat text chat_files = list(temp_dir.glob("*.txt")) metadata = [] all_texts = [] for chat_file in chat_files: with open(chat_file, 'r', encoding='utf-8') as file: page_content = file.read() # Clean the text clean_content = clean_text(page_content) # Split the clean_content into chunks of 2500 characters with 200 overlap chunk_splitter = RecursiveCharacterTextSplitter(chunk_size=2500, chunk_overlap=200) chunks = chunk_splitter.split_text(clean_content) for chunk_index, chunk in enumerate(chunks): metadata.append({ "context": chunk, "document_id": chat_file.stem, "chunk_index": chunk_index }) all_texts.append(chunk) # Initialize BM25 for sparse retrieval bm25 = BM25Okapi([doc.split() for doc in all_texts]) # Create dense embeddings and store in Chroma chunk_embeddings = embeddings.embed_documents(all_texts) ids = [f"{m['document_id']}_chunk_{m['chunk_index']}" for m in metadata] documents = [Document(page_content=m["context"], metadata=m) for m in metadata] vector_store.add_documents(documents=documents, ids=ids) # Store the hash of the processed file to avoid reprocessing processed_files[zip_file_hash] = zip_file.name return "Data uploaded and stored in Chroma successfully." def hybrid_search(query): global bm25, all_texts # BM25 Sparse Retrieval query_terms = query.split() bm25_scores = bm25.get_scores(query_terms) bm25_top_n_indices = np.argsort(bm25_scores)[::-1][:5] # Top 5 results sparse_results = [all_texts[i] for i in bm25_top_n_indices] # Dense Retrieval using Chroma dense_results = vector_store.similarity_search(query, k=5) # Combine the results (you can enhance the combination logic here) combined_results = sparse_results + [result.page_content for result in dense_results] response = "" for result in combined_results: response += f"{result}\n\n" return f"Hybrid Search Results:\n\n{response}" # Gradio Interface for uploading and querying def query_interface(zip_file, query): upload_status = process_and_upload_zip(zip_file) search_results = hybrid_search(query) prompt = (f"Here is a summary of WhatsApp chat contents based on the search for the query: '{query}'. " f"The chat content includes important messages:\n\n" f"{search_results}\n\n" f"Now, based on this chat content, answer the following question as an expert. " f"Please provide a complete and precise answer in **100 words**.\n\n" f"Question: {query}") response = llm.invoke(prompt) # Generate answer using the LLM return f"{upload_status}\n\n{search_results}", response interface = gr.Interface( fn=query_interface, inputs=[gr.File(label="Upload WhatsApp Chat Zip File"), gr.Textbox(label="Enter your query")], outputs=[ gr.Textbox(label="Chat Content"), # To display the chat content gr.Textbox(label="Generated Answer") # To display the generated answer ], title="WhatsApp Chat Upload and Hybrid Search", description="Upload a zip file containing WhatsApp chat data. This app processes the data and performs hybrid search with BM25 + Chroma." ) if __name__ == "__main__": interface.launch()