import os |
import time |
import pdfplumber |
import docx |
import nltk |
import gradio as gr |
from langchain_huggingface import HuggingFaceEmbeddings |
from langchain_community.embeddings import ( |
OpenAIEmbeddings, |
CohereEmbeddings, |
) |
from langchain_openai import OpenAIEmbeddings |
from langchain_community.vectorstores import FAISS, Chroma |
from langchain_text_splitters import ( |
RecursiveCharacterTextSplitter, |
TokenTextSplitter, |
) |
from typing import List, Dict, Any |
import pandas as pd |
import re |
from nltk.corpus import stopwords |
from nltk.tokenize import word_tokenize |
from nltk.stem import SnowballStemmer |
import jellyfish |
from gensim.models import Word2Vec |
from gensim.models.fasttext import FastText |
from collections import Counter |
from tokenizers import Tokenizer |
from tokenizers.models import BPE |
from tokenizers.trainers import BpeTrainer |
def download_nltk_resources(): |
resources = [ |
'punkt', |
'stopwords', |
'snowball_data', |
] |
for resource in resources: |
try: |
nltk.download(resource, quiet=True) |
except Exception as e: |
print(f"Failed to download {resource}: {str(e)}") |
download_nltk_resources() |
nltk.download('stopwords', quiet=True) |
nltk.download('punkt', quiet=True) |
FILES_DIR = './files' |
MODELS = { |
'HuggingFace': { |
'e5-base-de': "danielheinz/e5-base-sts-en-de", |
'paraphrase-miniLM': "paraphrase-multilingual-MiniLM-L12-v2", |
'paraphrase-mpnet': "paraphrase-multilingual-mpnet-base-v2", |
'gte-large': "gte-large", |
'gbert-base': "gbert-base" |
}, |
'OpenAI': { |
'text-embedding-ada-002': "text-embedding-ada-002" |
}, |
'Cohere': { |
'embed-multilingual-v2.0': "embed-multilingual-v2.0" |
} |
} |
def simple_tokenize(text): |
"""Simple tokenization fallback method.""" |
return text.split() |
def preprocess_text(text, lang='german'): |
text = text.lower() |
text = re.sub(r'[^a-zA-Z\s]', '', text) |
try: |
tokens = word_tokenize(text, language=lang) |
except LookupError: |
print(f"Warning: NLTK punkt tokenizer for {lang} not found. Using simple tokenization.") |
tokens = simple_tokenize(text) |
try: |
stop_words = set(stopwords.words(lang)) |
except LookupError: |
print(f"Warning: Stopwords for {lang} not found. Skipping stopword removal.") |
stop_words = set() |
tokens = [token for token in tokens if token not in stop_words] |
try: |
stemmer = SnowballStemmer(lang) |
tokens = [stemmer.stem(token) for token in tokens] |
except ValueError: |
print(f"Warning: SnowballStemmer for {lang} not available. Skipping stemming.") |
return ' '.join(tokens) |
def phonetic_match(text, query, method='koelner_phonetik'): |
if method == 'koelner_phonetik': |
text_phonetic = jellyfish.cologne_phonetic(text) |
query_phonetic = jellyfish.cologne_phonetic(query) |
return jellyfish.jaro_winkler(text_phonetic, query_phonetic) |
return 0 |
class FileHandler: |
@staticmethod |
def extract_text(file_path): |
ext = os.path.splitext(file_path)[-1].lower() |
if ext == '.pdf': |
return FileHandler._extract_from_pdf(file_path) |
elif ext == '.docx': |
return FileHandler._extract_from_docx(file_path) |
elif ext == '.txt': |
return FileHandler._extract_from_txt(file_path) |
else: |
raise ValueError(f"Unsupported file type: {ext}") |
@staticmethod |
def _extract_from_pdf(file_path): |
with pdfplumber.open(file_path) as pdf: |
return ' '.join([page.extract_text() for page in pdf.pages]) |
@staticmethod |
def _extract_from_docx(file_path): |
doc = docx.Document(file_path) |
return ' '.join([para.text for para in doc.paragraphs]) |
@staticmethod |
def _extract_from_txt(file_path): |
with open(file_path, 'r', encoding='utf-8') as f: |
return f.read() |
def get_embedding_model(model_type, model_name): |
if model_type == 'HuggingFace': |
return HuggingFaceEmbeddings(model_name=MODELS[model_type][model_name]) |
elif model_type == 'OpenAI': |
return OpenAIEmbeddings(model=MODELS[model_type][model_name]) |
elif model_type == 'Cohere': |
return CohereEmbeddings(model=MODELS[model_type][model_name]) |
else: |
raise ValueError(f"Unsupported model type: {model_type}") |
def get_text_splitter(split_strategy, chunk_size, overlap_size, custom_separators=None): |
if split_strategy == 'token': |
return TokenTextSplitter(chunk_size=chunk_size, chunk_overlap=overlap_size) |
elif split_strategy == 'recursive': |
return RecursiveCharacterTextSplitter( |
chunk_size=chunk_size, |
chunk_overlap=overlap_size, |
separators=custom_separators or ["\n\n", "\n", " ", ""] |
) |
else: |
raise ValueError(f"Unsupported split strategy: {split_strategy}") |
def get_vector_store(vector_store_type, chunks, embedding_model): |
if vector_store_type == 'FAISS': |
return FAISS.from_texts(chunks, embedding_model) |
elif vector_store_type == 'Chroma': |
return Chroma.from_texts(chunks, embedding_model) |
else: |
raise ValueError(f"Unsupported vector store type: {vector_store_type}") |
def get_retriever(vector_store, search_type, search_kwargs): |
if search_type == 'similarity': |
return vector_store.as_retriever(search_type="similarity", search_kwargs=search_kwargs) |
elif search_type == 'mmr': |
return vector_store.as_retriever(search_type="mmr", search_kwargs=search_kwargs) |
elif search_type == 'custom': |
pass |
else: |
raise ValueError(f"Unsupported search type: {search_type}") |
def process_files(file_path, model_type, model_name, split_strategy, chunk_size, overlap_size, custom_separators, lang='german'): |
if file_path: |
text = FileHandler.extract_text(file_path) |
else: |
text = "" |
for file in os.listdir(FILES_DIR): |
file_path = os.path.join(FILES_DIR, file) |
text += FileHandler.extract_text(file_path) |
text = preprocess_text(text, lang) |
text_splitter = get_text_splitter(split_strategy, chunk_size, overlap_size, custom_separators) |
chunks = text_splitter.split_text(text) |
embedding_model = get_embedding_model(model_type, model_name) |
return chunks, embedding_model, len(text.split()) |
def search_embeddings(chunks, embedding_model, vector_store_type, search_type, query, top_k, lang='german', phonetic_weight=0.3): |
preprocessed_query = preprocess_text(query, lang) |
vector_store = get_vector_store(vector_store_type, chunks, embedding_model) |
retriever = get_retriever(vector_store, search_type, {"k": top_k}) |
start_time = time.time() |
results = retriever.invoke(preprocessed_query) |
results = sorted(results, key=lambda x: (1 - phonetic_weight) * vector_store.similarity_search(x.page_content, k=1)[0][1] + |
phonetic_weight * phonetic_match(x.page_content, query), |
reverse=True) |
end_time = time.time() |
return results[:top_k], end_time - start_time, vector_store |
def calculate_statistics(results, search_time, vector_store, num_tokens, embedding_model, query, top_k): |
stats = { |
"num_results": len(results), |
"avg_content_length": np.mean([len(doc.page_content) for doc in results]) if results else 0, |
"search_time": search_time, |
"vector_store_size": vector_store._index.ntotal if hasattr(vector_store, '_index') else "N/A", |
"num_documents": len(vector_store.docstore._dict), |
"num_tokens": num_tokens, |
"embedding_vocab_size": embedding_model.client.get_vocab_size() if hasattr(embedding_model, 'client') and hasattr(embedding_model.client, 'get_vocab_size') else "N/A", |
"embedding_dimension": len(embedding_model.embed_query(query)), |
"top_k": top_k, |
} |
if len(results) > 1: |
embeddings = [embedding_model.embed_query(doc.page_content) for doc in results] |
pairwise_similarities = cosine_similarity(embeddings) |
stats["result_diversity"] = 1 - np.mean(pairwise_similarities[np.triu_indices(len(embeddings), k=1)]) |
else: |
stats["result_diversity"] = "N/A" |
query_embedding = embedding_model.embed_query(query) |
result_embeddings = [embedding_model.embed_query(doc.page_content) for doc in results] |
similarities = [cosine_similarity([query_embedding], [emb])[0][0] for emb in result_embeddings] |
rank_correlation, _ = spearmanr(similarities, range(len(similarities))) |
stats["rank_correlation"] = rank_correlation |
return stats |
def create_custom_embedding(texts, model_type='word2vec', vector_size=100, window=5, min_count=1): |
tokenized_texts = [text.split() for text in texts] |
if model_type == 'word2vec': |
model = Word2Vec(sentences=tokenized_texts, vector_size=vector_size, window=window, min_count=min_count, workers=4) |
elif model_type == 'fasttext': |
model = FastText(sentences=tokenized_texts, vector_size=vector_size, window=window, min_count=min_count, workers=4) |
else: |
raise ValueError("Unsupported model type") |
return model |
class CustomEmbeddings(HuggingFaceEmbeddings): |
def __init__(self, model_path): |
self.model = Word2Vec.load(model_path) |
def embed_documents(self, texts): |
return [self.model.wv[text.split()] for text in texts] |
def embed_query(self, text): |
return self.model.wv[text.split()] |
def optimize_vocabulary(texts, vocab_size=10000, min_frequency=2): |
word_freq = Counter(word for text in texts for word in text.split()) |
optimized_texts = [ |
' '.join(word for word in text.split() if word_freq[word] >= min_frequency) |
for text in texts |
] |
tokenizer = Tokenizer(BPE(unk_token="[UNK]")) |
trainer = BpeTrainer(vocab_size=vocab_size, special_tokens=["[UNK]", "[CLS]", "[SEP]", "[PAD]", "[MASK]"]) |
tokenizer.train_from_iterator(optimized_texts, trainer) |
return tokenizer, optimized_texts |
def compare_embeddings(file, query, model_types, model_names, split_strategy, chunk_size, overlap_size, custom_separators, vector_store_type, search_type, top_k, lang='german', use_custom_embedding=False, optimize_vocab=False, phonetic_weight=0.3): |
all_results = [] |
all_stats = [] |
settings = { |
"split_strategy": split_strategy, |
"chunk_size": chunk_size, |
"overlap_size": overlap_size, |
"custom_separators": custom_separators, |
"vector_store_type": vector_store_type, |
"search_type": search_type, |
"top_k": top_k, |
"lang": lang, |
"use_custom_embedding": use_custom_embedding, |
"optimize_vocab": optimize_vocab, |
"phonetic_weight": phonetic_weight |
} |
for model_type, model_name in zip(model_types, model_names): |
chunks, embedding_model, num_tokens = process_files( |
file.name if file else None, |
model_type, |
model_name, |
split_strategy, |
chunk_size, |
overlap_size, |
custom_separators.split(',') if custom_separators else None, |
lang |
) |
if use_custom_embedding: |
custom_model = create_custom_embedding(chunks) |
embedding_model = CustomEmbeddings(custom_model) |
if optimize_vocab: |
tokenizer, optimized_chunks = optimize_vocabulary(chunks) |
chunks = optimized_chunks |
results, search_time, vector_store = search_embeddings( |
chunks, |
embedding_model, |
vector_store_type, |
search_type, |
query, |
top_k, |
lang, |
phonetic_weight |
) |
stats = calculate_statistics(results, search_time, vector_store, num_tokens, embedding_model, query, top_k) |
stats["model"] = f"{model_type} - {model_name}" |
stats.update(settings) |
formatted_results = format_results(results, stats) |
all_results.extend(formatted_results) |
all_stats.append(stats) |
results_df = pd.DataFrame(all_results) |
stats_df = pd.DataFrame(all_stats) |
return results_df, stats_df |
def format_results(results, stats): |
formatted_results = [] |
for doc in results: |
result = { |
"Model": stats["model"], |
"Content": doc.page_content, |
**doc.metadata, |
**{k: v for k, v in stats.items() if k not in ["model"]} |
} |
formatted_results.append(result) |
return formatted_results |
import matplotlib.pyplot as plt |
import seaborn as sns |
from sklearn.manifold import TSNE |
def visualize_results(results_df, stats_df): |
fig, axs = plt.subplots(2, 2, figsize=(20, 20)) |
sns.barplot(x='model', y='search_time', data=stats_df, ax=axs[0, 0]) |
axs[0, 0].set_title('Search Time by Model') |
axs[0, 0].set_xticklabels(axs[0, 0].get_xticklabels(), rotation=45, ha='right') |
sns.scatterplot(x='result_diversity', y='rank_correlation', hue='model', data=stats_df, ax=axs[0, 1]) |
axs[0, 1].set_title('Result Diversity vs. Rank Correlation') |
sns.boxplot(x='model', y='content_length', data=results_df, ax=axs[1, 0]) |
axs[1, 0].set_title('Distribution of Result Content Lengths') |
axs[1, 0].set_xticklabels(axs[1, 0].get_xticklabels(), rotation=45, ha='right') |
embeddings = np.array(results_df['embedding'].tolist()) |
tsne = TSNE(n_components=2, random_state=42) |
embeddings_2d = tsne.fit_transform(embeddings) |
sns.scatterplot(x=embeddings_2d[:, 0], y=embeddings_2d[:, 1], hue=results_df['model'], ax=axs[1, 1]) |
axs[1, 1].set_title('t-SNE Visualization of Result Embeddings') |
plt.tight_layout() |
return fig |
def launch_interface(share=True): |
iface = gr.Interface( |
fn=compare_embeddings, |
inputs=[ |
gr.File(label="Upload File (Optional)"), |
gr.Textbox(label="Search Query"), |
gr.CheckboxGroup(choices=list(MODELS.keys()) + ["Custom"], label="Embedding Model Types"), |
gr.CheckboxGroup(choices=[model for models in MODELS.values() for model in models] + ["custom_model"], label="Embedding Models"), |
gr.Radio(choices=["token", "recursive"], label="Split Strategy", value="recursive"), |
gr.Slider(100, 1000, step=100, value=500, label="Chunk Size"), |
gr.Slider(0, 100, step=10, value=50, label="Overlap Size"), |
gr.Textbox(label="Custom Split Separators (comma-separated, optional)"), |
gr.Radio(choices=["FAISS", "Chroma"], label="Vector Store Type", value="FAISS"), |
gr.Radio(choices=["similarity", "mmr", "custom"], label="Search Type", value="similarity"), |
gr.Slider(1, 10, step=1, value=5, label="Top K"), |
gr.Dropdown(choices=["german", "english", "french"], label="Language", value="german"), |
gr.Checkbox(label="Use Custom Embedding", value=False), |
gr.Checkbox(label="Optimize Vocabulary", value=False), |
gr.Slider(0, 1, step=0.1, value=0.3, label="Phonetic Matching Weight") |
], |
outputs=[ |
gr.Dataframe(label="Results", interactive=False), |
gr.Dataframe(label="Statistics", interactive=False), |
gr.Plot(label="Visualizations") |
], |
title="Advanced Embedding Comparison Tool", |
description="Compare different embedding models and retrieval strategies with advanced preprocessing and phonetic matching" |
) |
tutorial_md = """ |
# Advanced Embedding Comparison Tool Tutorial |
... (update the tutorial to include information about the new features) ... |
""" |
iface = gr.TabbedInterface( |
[iface, gr.Markdown(tutorial_md)], |
["Embedding Comparison", "Tutorial"] |
) |
iface.launch(share=share) |
if __name__ == "__main__": |
launch_interface() |