|
import os |
|
import time |
|
import pdfplumber |
|
import docx |
|
import nltk |
|
import gradio as gr |
|
from langchain_community.embeddings import ( |
|
HuggingFaceEmbeddings, |
|
OpenAIEmbeddings, |
|
CohereEmbeddings, |
|
) |
|
from langchain_openai import OpenAIEmbeddings |
|
|
|
from langchain_community.vectorstores import FAISS, Chroma |
|
from langchain_text_splitters import ( |
|
RecursiveCharacterTextSplitter, |
|
TokenTextSplitter, |
|
) |
|
from langchain.retrievers import ( |
|
VectorStoreRetriever, |
|
ContextualCompressionRetriever, |
|
) |
|
from langchain.retrievers.document_compressors import LLMChainExtractor |
|
from langchain.llms import OpenAI |
|
from typing import List, Dict, Any |
|
import pandas as pd |
|
|
|
|
|
nltk.download('punkt', quiet=True) |
|
|
|
FILES_DIR = './files' |
|
|
|
|
|
MODELS = { |
|
'HuggingFace': { |
|
'e5-base': "danielheinz/e5-base-sts-en-de", |
|
'multilingual-e5-base': "multilingual-e5-base", |
|
'paraphrase-miniLM': "paraphrase-multilingual-MiniLM-L12-v2", |
|
'paraphrase-mpnet': "paraphrase-multilingual-mpnet-base-v2", |
|
'gte-large': "gte-large", |
|
'gbert-base': "gbert-base" |
|
}, |
|
'OpenAI': { |
|
'text-embedding-ada-002': "text-embedding-ada-002" |
|
}, |
|
'Cohere': { |
|
'embed-multilingual-v2.0': "embed-multilingual-v2.0" |
|
} |
|
} |
|
|
|
class FileHandler: |
|
@staticmethod |
|
def extract_text(file_path): |
|
ext = os.path.splitext(file_path)[-1].lower() |
|
if ext == '.pdf': |
|
return FileHandler._extract_from_pdf(file_path) |
|
elif ext == '.docx': |
|
return FileHandler._extract_from_docx(file_path) |
|
elif ext == '.txt': |
|
return FileHandler._extract_from_txt(file_path) |
|
else: |
|
raise ValueError(f"Unsupported file type: {ext}") |
|
|
|
@staticmethod |
|
def _extract_from_pdf(file_path): |
|
with pdfplumber.open(file_path) as pdf: |
|
return ' '.join([page.extract_text() for page in pdf.pages]) |
|
|
|
@staticmethod |
|
def _extract_from_docx(file_path): |
|
doc = docx.Document(file_path) |
|
return ' '.join([para.text for para in doc.paragraphs]) |
|
|
|
@staticmethod |
|
def _extract_from_txt(file_path): |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
return f.read() |
|
|
|
def get_embedding_model(model_type, model_name): |
|
if model_type == 'HuggingFace': |
|
return HuggingFaceEmbeddings(model_name=MODELS[model_type][model_name]) |
|
elif model_type == 'OpenAI': |
|
return OpenAIEmbeddings(model=MODELS[model_type][model_name]) |
|
elif model_type == 'Cohere': |
|
return CohereEmbeddings(model=MODELS[model_type][model_name]) |
|
else: |
|
raise ValueError(f"Unsupported model type: {model_type}") |
|
|
|
def get_text_splitter(split_strategy, chunk_size, overlap_size, custom_separators=None): |
|
if split_strategy == 'token': |
|
return TokenTextSplitter(chunk_size=chunk_size, chunk_overlap=overlap_size) |
|
elif split_strategy == 'recursive': |
|
return RecursiveCharacterTextSplitter( |
|
chunk_size=chunk_size, |
|
chunk_overlap=overlap_size, |
|
separators=custom_separators or ["\n\n", "\n", " ", ""] |
|
) |
|
else: |
|
raise ValueError(f"Unsupported split strategy: {split_strategy}") |
|
|
|
def get_vector_store(store_type, texts, embedding_model): |
|
if store_type == 'FAISS': |
|
return FAISS.from_texts(texts, embedding_model) |
|
elif store_type == 'Chroma': |
|
return Chroma.from_texts(texts, embedding_model) |
|
else: |
|
raise ValueError(f"Unsupported vector store type: {store_type}") |
|
|
|
def get_retriever(vector_store, search_type, search_kwargs=None): |
|
if search_type == 'similarity': |
|
return vector_store.as_retriever(search_type="similarity", search_kwargs=search_kwargs) |
|
elif search_type == 'mmr': |
|
return vector_store.as_retriever(search_type="mmr", search_kwargs=search_kwargs) |
|
else: |
|
raise ValueError(f"Unsupported search type: {search_type}") |
|
|
|
def process_files(file_path, model_type, model_name, split_strategy, chunk_size, overlap_size, custom_separators): |
|
|
|
if file_path: |
|
text = FileHandler.extract_text(file_path) |
|
else: |
|
text = "" |
|
for file in os.listdir(FILES_DIR): |
|
file_path = os.path.join(FILES_DIR, file) |
|
text += FileHandler.extract_text(file_path) |
|
|
|
|
|
text_splitter = get_text_splitter(split_strategy, chunk_size, overlap_size, custom_separators) |
|
chunks = text_splitter.split_text(text) |
|
|
|
|
|
embedding_model = get_embedding_model(model_type, model_name) |
|
|
|
return chunks, embedding_model |
|
|
|
def search_embeddings(chunks, embedding_model, vector_store_type, search_type, query, top_k): |
|
|
|
vector_store = get_vector_store(vector_store_type, chunks, embedding_model) |
|
|
|
|
|
retriever = get_retriever(vector_store, search_type, {"k": top_k}) |
|
|
|
|
|
start_time = time.time() |
|
results = retriever.get_relevant_documents(query) |
|
end_time = time.time() |
|
|
|
return results, end_time - start_time |
|
|
|
def calculate_statistics(results, search_time): |
|
return { |
|
"num_results": len(results), |
|
"avg_content_length": sum(len(doc.page_content) for doc in results) / len(results), |
|
"search_time": search_time |
|
} |
|
|
|
def format_results(results, stats): |
|
df = pd.DataFrame([ |
|
{ |
|
"Content": doc.page_content, |
|
"Source": doc.metadata.get("source", "Unknown"), |
|
"Relevance Score": doc.metadata.get("score", "N/A") |
|
} for doc in results |
|
]) |
|
|
|
formatted_stats = pd.DataFrame([stats]) |
|
|
|
return gr.DataFrame(df), gr.DataFrame(formatted_stats) |
|
|
|
def compare_embeddings(file, query, model_types, model_names, split_strategy, chunk_size, overlap_size, custom_separators, vector_store_type, search_type, top_k): |
|
all_results = [] |
|
all_stats = [] |
|
|
|
for model_type, model_name in zip(model_types, model_names): |
|
chunks, embedding_model = process_files( |
|
file.name if file else None, |
|
model_type, |
|
model_name, |
|
split_strategy, |
|
chunk_size, |
|
overlap_size, |
|
custom_separators.split(',') if custom_separators else None |
|
) |
|
|
|
results, search_time = search_embeddings( |
|
chunks, |
|
embedding_model, |
|
vector_store_type, |
|
search_type, |
|
query, |
|
top_k |
|
) |
|
|
|
stats = calculate_statistics(results, search_time) |
|
stats["model"] = f"{model_type} - {model_name}" |
|
|
|
all_results.append(results) |
|
all_stats.append(stats) |
|
|
|
return [format_results(results, stats) for results, stats in zip(all_results, all_stats)] |
|
|
|
|
|
iface = gr.Interface( |
|
fn=compare_embeddings, |
|
inputs=[ |
|
gr.File(label="Upload File (Optional)"), |
|
gr.Textbox(label="Search Query"), |
|
gr.CheckboxGroup(choices=list(MODELS.keys()), label="Embedding Model Types", value=["HuggingFace"]), |
|
gr.CheckboxGroup(choices=[model for models in MODELS.values() for model in models], label="Embedding Models", value=["e5-base"]), |
|
gr.Radio(choices=["token", "recursive"], label="Split Strategy", value="recursive"), |
|
gr.Slider(100, 1000, step=100, value=500, label="Chunk Size"), |
|
gr.Slider(0, 100, step=10, value=50, label="Overlap Size"), |
|
gr.Textbox(label="Custom Split Separators (comma-separated, optional)"), |
|
gr.Radio(choices=["FAISS", "Chroma"], label="Vector Store Type", value="FAISS"), |
|
gr.Radio(choices=["similarity", "mmr"], label="Search Type", value="similarity"), |
|
gr.Slider(1, 10, step=1, value=5, label="Top K") |
|
], |
|
outputs=[gr.DataFrame(label="Results"), gr.DataFrame(label="Statistics")] * len(MODELS), |
|
title="Embedding Comparison Tool", |
|
description="Compare different embedding models and retrieval strategies" |
|
) |
|
|
|
iface.launch() |