Chris4K's picture
Update app.py
fed8ef0 verified
raw
history blame
8.57 kB
import os
import time
import pdfplumber
import docx
import nltk
import gradio as gr
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.embeddings import (
#HuggingFaceEmbeddings,
OpenAIEmbeddings,
CohereEmbeddings,
)
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS, Chroma
from langchain_text_splitters import (
RecursiveCharacterTextSplitter,
TokenTextSplitter,
)
#from langchain.retrievers import (
# VectorStoreRetriever,
# ContextualCompressionRetriever,
#)
from langchain.retrievers.document_compressors import LLMChainExtractor
from langchain_community.llms import OpenAI
from typing import List, Dict, Any
import pandas as pd
# Ensure nltk sentence tokenizer is downloaded
nltk.download('punkt', quiet=True)
FILES_DIR = './files'
# Supported embedding models
MODELS = {
'HuggingFace': {
'e5-base': "danielheinz/e5-base-sts-en-de",
'multilingual-e5-base': "multilingual-e5-base",
'paraphrase-miniLM': "paraphrase-multilingual-MiniLM-L12-v2",
'paraphrase-mpnet': "paraphrase-multilingual-mpnet-base-v2",
'gte-large': "gte-large",
'gbert-base': "gbert-base"
},
'OpenAI': {
'text-embedding-ada-002': "text-embedding-ada-002"
},
'Cohere': {
'embed-multilingual-v2.0': "embed-multilingual-v2.0"
}
}
class FileHandler:
@staticmethod
def extract_text(file_path):
ext = os.path.splitext(file_path)[-1].lower()
if ext == '.pdf':
return FileHandler._extract_from_pdf(file_path)
elif ext == '.docx':
return FileHandler._extract_from_docx(file_path)
elif ext == '.txt':
return FileHandler._extract_from_txt(file_path)
else:
raise ValueError(f"Unsupported file type: {ext}")
@staticmethod
def _extract_from_pdf(file_path):
with pdfplumber.open(file_path) as pdf:
return ' '.join([page.extract_text() for page in pdf.pages])
@staticmethod
def _extract_from_docx(file_path):
doc = docx.Document(file_path)
return ' '.join([para.text for para in doc.paragraphs])
@staticmethod
def _extract_from_txt(file_path):
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
def get_embedding_model(model_type, model_name):
if model_type == 'HuggingFace':
return HuggingFaceEmbeddings(model_name=MODELS[model_type][model_name])
elif model_type == 'OpenAI':
return OpenAIEmbeddings(model=MODELS[model_type][model_name])
elif model_type == 'Cohere':
return CohereEmbeddings(model=MODELS[model_type][model_name])
else:
raise ValueError(f"Unsupported model type: {model_type}")
def get_text_splitter(split_strategy, chunk_size, overlap_size, custom_separators=None):
if split_strategy == 'token':
return TokenTextSplitter(chunk_size=chunk_size, chunk_overlap=overlap_size)
elif split_strategy == 'recursive':
return RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=overlap_size,
separators=custom_separators or ["\n\n", "\n", " ", ""]
)
else:
raise ValueError(f"Unsupported split strategy: {split_strategy}")
def get_vector_store(store_type, texts, embedding_model):
if store_type == 'FAISS':
return FAISS.from_texts(texts, embedding_model)
elif store_type == 'Chroma':
return Chroma.from_texts(texts, embedding_model)
else:
raise ValueError(f"Unsupported vector store type: {store_type}")
def get_retriever(vector_store, search_type, search_kwargs=None):
if search_type == 'similarity':
return vector_store.as_retriever(search_type="similarity", search_kwargs=search_kwargs)
elif search_type == 'mmr':
return vector_store.as_retriever(search_type="mmr", search_kwargs=search_kwargs)
else:
raise ValueError(f"Unsupported search type: {search_type}")
def process_files(file_path, model_type, model_name, split_strategy, chunk_size, overlap_size, custom_separators):
# File processing
if file_path:
text = FileHandler.extract_text(file_path)
else:
text = ""
for file in os.listdir(FILES_DIR):
file_path = os.path.join(FILES_DIR, file)
text += FileHandler.extract_text(file_path)
# Split text into chunks
text_splitter = get_text_splitter(split_strategy, chunk_size, overlap_size, custom_separators)
chunks = text_splitter.split_text(text)
# Get embedding model
embedding_model = get_embedding_model(model_type, model_name)
return chunks, embedding_model
def search_embeddings(chunks, embedding_model, vector_store_type, search_type, query, top_k):
# Create vector store
vector_store = get_vector_store(vector_store_type, chunks, embedding_model)
# Get retriever
retriever = get_retriever(vector_store, search_type, {"k": top_k})
# Perform search
start_time = time.time()
results = retriever.get_relevant_documents(query)
end_time = time.time()
return results, end_time - start_time
def calculate_statistics(results, search_time):
return {
"num_results": len(results),
"avg_content_length": sum(len(doc.page_content) for doc in results) / len(results),
"search_time": search_time
}
import gradio as gr
import pandas as pd
def compare_embeddings(file, query, model_types, model_names, split_strategy, chunk_size, overlap_size, custom_separators, vector_store_type, search_type, top_k):
all_results = []
all_stats = []
for model_type, model_name in zip(model_types, model_names):
chunks, embedding_model = process_files(
file.name if file else None,
model_type,
model_name,
split_strategy,
chunk_size,
overlap_size,
custom_separators.split(',') if custom_separators else None
)
results, search_time = search_embeddings(
chunks,
embedding_model,
vector_store_type,
search_type,
query,
top_k
)
stats = calculate_statistics(results, search_time)
stats["model"] = f"{model_type} - {model_name}"
formatted_results, formatted_stats = format_results(results, stats)
all_results.append(formatted_results)
all_stats.append(formatted_stats)
return all_results + all_stats
def format_results(results, stats):
# List to store the processed document data
data = []
# Extracting content and metadata from each document
for doc in results:
# Ensure metadata is a dictionary (if it's a custom object, convert it)
metadata_dict = dict(doc.metadata)
# Create a combined dictionary with 'Content' and all metadata fields
doc_data = {"Content": doc.page_content}
doc_data.update(metadata_dict) # Add all metadata key-value pairs
# Append the processed document data to the list
data.append(doc_data)
# Convert the list of document data into a DataFrame
df = pd.DataFrame(data)
# Formatting stats as a DataFrame
formatted_stats = pd.DataFrame([stats])
return df, formatted_stats
# Gradio interface
iface = gr.Interface(
fn=compare_embeddings,
inputs=[
gr.File(label="Upload File (Optional)"),
gr.Textbox(label="Search Query"),
gr.CheckboxGroup(choices=list(MODELS.keys()), label="Embedding Model Types", value=["HuggingFace"]),
gr.CheckboxGroup(choices=[model for models in MODELS.values() for model in models], label="Embedding Models", value=["e5-base"]),
gr.Radio(choices=["token", "recursive"], label="Split Strategy", value="recursive"),
gr.Slider(100, 1000, step=100, value=500, label="Chunk Size"),
gr.Slider(0, 100, step=10, value=50, label="Overlap Size"),
gr.Textbox(label="Custom Split Separators (comma-separated, optional)"),
gr.Radio(choices=["FAISS", "Chroma"], label="Vector Store Type", value="FAISS"),
gr.Radio(choices=["similarity", "mmr"], label="Search Type", value="similarity"),
gr.Slider(1, 10, step=1, value=5, label="Top K")
],
outputs=[
gr.Dataframe(label="Results"),
gr.Dataframe(label="Statistics")
],
title="Embedding Comparison Tool",
description="Compare different embedding models and retrieval strategies"
)
iface.launch()