File size: 6,101 Bytes
e7cf3ff 7842e72 e7cf3ff 93949bf 7842e72 e7cf3ff 7842e72 e7cf3ff 7842e72 e7cf3ff 7842e72 e7cf3ff 7842e72 e7cf3ff 7842e72 e7cf3ff 7842e72 e7cf3ff 7842e72 e7cf3ff 7842e72 e7cf3ff 7842e72 e7cf3ff 7842e72 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
import gradio as gr
import zipfile
import os
import re
from pathlib import Path
import chromadb
from langchain_huggingface import HuggingFaceEmbeddings, HuggingFaceEndpoint
from langchain_chroma import Chroma
# from langchain.textsplitters import RecursiveCharacterTextSplitter
from langchain_text_splitters import RecursiveCharacterTextSplitter
import hashlib
import nltk
from rank_bm25 import BM25Okapi
import numpy as np
from langchain.schema import Document
from dotenv import load_dotenv
# Download the required NLTK data'punkt')
# Define embeddings using Hugging Face models
embeddings = HuggingFaceEmbeddings()
HF_TOKEN = os.getenv("HF_TOKEN")
# Initialize Chroma vector store
persist_directory = "./chroma_langchain_db"
client = chromadb.PersistentClient()
collection = client.get_or_create_collection("whatsapp_collection")
vector_store = Chroma(
# Define global variables
bm25 = None
all_texts = []
processed_files = {} # Dictionary to store hashes of processed files
llm = HuggingFaceEndpoint(
# Function to remove emojis and clean the text
def clean_text(text):
# Remove emojis
text = re.sub(r'[^\x00-\x7F]+', '', text)
# Additional cleaning if necessary
text = re.sub(r'\s+', ' ', text).strip()
return text
# Function to compute a file hash for identifying duplicates
def compute_file_hash(file_path):
hasher = hashlib.md5()
with open(file_path, 'rb') as f:
buf =
return hasher.hexdigest()
# Function to process and upload the zip file to Chroma
def process_and_upload_zip(zip_file):
global bm25, all_texts, processed_files
temp_dir = Path("temp")
# Compute hash to check if file has been processed
zip_file_hash = compute_file_hash(
# If the file has been processed before, skip re-uploading
if zip_file_hash in processed_files:
return f"File '{}' already processed. Using existing Chroma storage."
# Extract the zip file
with zipfile.ZipFile(, 'r') as zip_ref:
# Load and clean the chat text
chat_files = list(temp_dir.glob("*.txt"))
metadata = []
all_texts = []
for chat_file in chat_files:
with open(chat_file, 'r', encoding='utf-8') as file:
page_content =
# Clean the text
clean_content = clean_text(page_content)
# Split the clean_content into chunks of 2500 characters with 200 overlap
chunk_splitter = RecursiveCharacterTextSplitter(chunk_size=2500, chunk_overlap=200)
chunks = chunk_splitter.split_text(clean_content)
for chunk_index, chunk in enumerate(chunks):
"context": chunk,
"document_id": chat_file.stem,
"chunk_index": chunk_index
# Initialize BM25 for sparse retrieval
bm25 = BM25Okapi([doc.split() for doc in all_texts])
# Create dense embeddings and store in Chroma
chunk_embeddings = embeddings.embed_documents(all_texts)
ids = [f"{m['document_id']}_chunk_{m['chunk_index']}" for m in metadata]
documents = [Document(page_content=m["context"], metadata=m) for m in metadata]
vector_store.add_documents(documents=documents, ids=ids)
# Store the hash of the processed file to avoid reprocessing
processed_files[zip_file_hash] =
return "Data uploaded and stored in Chroma successfully."
def hybrid_search(query):
global bm25, all_texts
# BM25 Sparse Retrieval
query_terms = query.split()
bm25_scores = bm25.get_scores(query_terms)
bm25_top_n_indices = np.argsort(bm25_scores)[::-1][:5] # Top 5 results
sparse_results = [all_texts[i] for i in bm25_top_n_indices]
# Dense Retrieval using Chroma
dense_results = vector_store.similarity_search(query, k=5)
# Combine the results (you can enhance the combination logic here)
combined_results = sparse_results + [result.page_content for result in dense_results]
response = ""
for result in combined_results:
response += f"{result}\n\n"
return f"Hybrid Search Results:\n\n{response}"
# Gradio Interface for uploading and querying
def query_interface(zip_file, query):
upload_status = process_and_upload_zip(zip_file)
search_results = hybrid_search(query)
prompt = (f"Here is a summary of WhatsApp chat contents based on the search for the query: '{query}'. "
f"The chat content includes important messages:\n\n"
f"Now, based on this chat content, answer the following question as an expert. "
f"Please provide a complete and precise answer in **100 words**.\n\n"
f"Question: {query}")
response = llm.invoke(prompt)
# Generate answer using the LLM
return f"{upload_status}\n\n{search_results}", response
interface = gr.Interface(
inputs=[gr.File(label="Upload WhatsApp Chat Zip File"), gr.Textbox(label="Enter your query")],
gr.Textbox(label="Chat Content"), # To display the chat content
gr.Textbox(label="Generated Answer") # To display the generated answer
title="WhatsApp Chat Upload and Hybrid Search",
description="Upload a zip file containing WhatsApp chat data. This app processes the data and performs hybrid search with BM25 + Chroma."
if __name__ == "__main__":
interface.launch() |