|
import os |
|
import time |
|
import pdfplumber |
|
import docx |
|
import nltk |
|
import gradio as gr |
|
from langchain_huggingface import HuggingFaceEmbeddings |
|
from langchain_community.embeddings import CohereEmbeddings |
|
from langchain_openai import OpenAIEmbeddings |
|
from langchain_community.vectorstores import FAISS, Chroma |
|
from langchain_text_splitters import RecursiveCharacterTextSplitter, TokenTextSplitter |
|
from typing import List, Dict, Any |
|
import pandas as pd |
|
import numpy as np |
|
import re |
|
from nltk.corpus import stopwords |
|
from nltk.tokenize import word_tokenize |
|
from nltk.stem import SnowballStemmer |
|
import jellyfish |
|
from gensim.models import Word2Vec |
|
from gensim.models.fasttext import FastText |
|
from collections import Counter |
|
from tokenizers import Tokenizer, models, trainers |
|
from tokenizers.models import WordLevel |
|
from tokenizers.trainers import WordLevelTrainer |
|
from tokenizers.pre_tokenizers import Whitespace |
|
import matplotlib.pyplot as plt |
|
import seaborn as sns |
|
from sklearn.manifold import TSNE |
|
from sklearn.metrics import silhouette_score |
|
from scipy.stats import spearmanr |
|
from functools import lru_cache |
|
from langchain.retrievers import MultiQueryRetriever |
|
from langchain_huggingface.llms import HuggingFacePipeline |
|
from transformers import pipeline |
|
from sentence_transformers import SentenceTransformer, util |
|
from sklearn.model_selection import ParameterGrid |
|
from tqdm import tqdm |
|
import random |
|
from huggingface_hub import login |
|
from typing import List, Tuple, Optional |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from transformers import pipeline |
|
import torch |
|
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from huggingface_hub import InferenceClient |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def download_nltk_resources(): |
|
resources = ['punkt', 'stopwords', 'snowball_data', 'wordnet'] |
|
for resource in resources: |
|
try: |
|
nltk.download(resource, quiet=False) |
|
except Exception as e: |
|
print(f"Failed to download {resource}: {str(e)}") |
|
|
|
download_nltk_resources() |
|
|
|
|
|
print("------ Strinage ------ ") |
|
FILES_DIR = './files' |
|
|
|
|
|
DEFAULT_MODELS = { |
|
"HuggingFace": [ |
|
"paraphrase-miniLM", |
|
"paraphrase-mpnet", |
|
"all-MiniLM-L6-v2" |
|
], |
|
"OpenAI": [ |
|
"text-embedding-ada-002" |
|
], |
|
"Cohere": [ |
|
"embed-multilingual-v2.0" |
|
] |
|
} |
|
|
|
|
|
class ModelManager: |
|
def __init__(self): |
|
self.rankings: Dict[str, float] = {} |
|
self.model_stats: Dict[str, Dict[str, Any]] = {} |
|
self.models = { |
|
'HuggingFace': { |
|
'e5-base-de': "danielheinz/e5-base-sts-en-de", |
|
'paraphrase-miniLM': "paraphrase-multilingual-MiniLM-L12-v2", |
|
'paraphrase-mpnet': "paraphrase-multilingual-mpnet-base-v2", |
|
'gte-large': "gte-large", |
|
'gbert-base': "gbert-base" |
|
}, |
|
'OpenAI': { |
|
'text-embedding-ada-002': "text-embedding-ada-002" |
|
}, |
|
'Cohere': { |
|
'embed-multilingual-v2.0': "embed-multilingual-v2.0" |
|
} |
|
} |
|
|
|
|
|
def update_model_ranking(self, model_id: str, score: float, feedback: str = None): |
|
"""Update model ranking based on performance and optional feedback""" |
|
current_score = self.rankings.get(model_id, 0.0) |
|
|
|
self.rankings[model_id] = 0.7 * current_score + 0.3 * score |
|
|
|
if feedback: |
|
if model_id not in self.model_stats: |
|
self.model_stats[model_id] = {"feedback_count": 0, "feedback": []} |
|
self.model_stats[model_id]["feedback_count"] += 1 |
|
self.model_stats[model_id]["feedback"].append(feedback) |
|
|
|
def get_top_models(self, n: int = 5) -> List[Tuple[str, float]]: |
|
"""Get top n ranked models""" |
|
return sorted(self.rankings.items(), key=lambda x: x[1], reverse=True)[:n] |
|
|
|
def get_model_stats(self, model_id: str) -> Dict[str, Any]: |
|
"""Get statistics for a specific model""" |
|
return self.model_stats.get(model_id, {}) |
|
|
|
|
|
def add_model(self, provider, name, model_path): |
|
if provider not in self.models: |
|
self.models[provider] = {} |
|
self.models[provider][name] = model_path |
|
|
|
def remove_model(self, provider, name): |
|
if provider in self.models and name in self.models[provider]: |
|
del self.models[provider][name] |
|
|
|
def get_model(self, provider, name): |
|
return self.models.get(provider, {}).get(name) |
|
|
|
def list_models(self): |
|
return {provider: list(models.keys()) for provider, models in self.models.items()} |
|
|
|
model_manager = ModelManager() |
|
|
|
|
|
import os |
|
import json |
|
import csv |
|
import xml.etree.ElementTree as ET |
|
import openpyxl |
|
import pdfplumber |
|
import docx |
|
|
|
|
|
class FileHandler: |
|
@staticmethod |
|
def extract_text(file_path): |
|
ext = os.path.splitext(file_path)[-1].lower() |
|
if ext == '.pdf': |
|
return FileHandler._extract_from_pdf(file_path) |
|
elif ext == '.docx': |
|
return FileHandler._extract_from_docx(file_path) |
|
elif ext == '.txt': |
|
return FileHandler._extract_from_txt(file_path) |
|
elif ext == '.xml': |
|
return FileHandler._extract_from_xml(file_path) |
|
elif ext == '.json': |
|
return FileHandler._extract_from_json(file_path) |
|
elif ext == '.xlsx': |
|
return FileHandler._extract_from_xlsx(file_path) |
|
elif ext == '.csv': |
|
return FileHandler._extract_from_csv(file_path) |
|
else: |
|
raise ValueError(f"Unsupported file type: {ext}") |
|
|
|
@staticmethod |
|
def _extract_from_pdf(file_path): |
|
with pdfplumber.open(file_path) as pdf: |
|
return ' '.join([page.extract_text() for page in pdf.pages]) |
|
|
|
@staticmethod |
|
def _extract_from_docx(file_path): |
|
doc = docx.Document(file_path) |
|
return ' '.join([para.text for para in doc.paragraphs]) |
|
|
|
@staticmethod |
|
def _extract_from_txt(file_path): |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
return f.read() |
|
|
|
@staticmethod |
|
def _extract_from_xml(file_path): |
|
tree = ET.parse(file_path) |
|
root = tree.getroot() |
|
return FileHandler._extract_xml_text(root) |
|
|
|
@staticmethod |
|
def _extract_xml_text(element): |
|
|
|
text = element.text or "" |
|
for child in element: |
|
text += FileHandler._extract_xml_text(child) |
|
return text.strip() |
|
|
|
@staticmethod |
|
def _extract_from_json(file_path): |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
data = json.load(f) |
|
return json.dumps(data, indent=4) |
|
|
|
@staticmethod |
|
def _extract_from_xlsx(file_path): |
|
workbook = openpyxl.load_workbook(file_path) |
|
sheet = workbook.active |
|
data = [] |
|
for row in sheet.iter_rows(values_only=True): |
|
data.append('\t'.join([str(cell) for cell in row if cell is not None])) |
|
return '\n'.join(data) |
|
|
|
@staticmethod |
|
def _extract_from_csv(file_path): |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
reader = csv.reader(f) |
|
data = [] |
|
for row in reader: |
|
data.append(','.join(row)) |
|
return '\n'.join(data) |
|
|
|
|
|
|
|
|
|
def simple_tokenize(text): |
|
return text.split() |
|
|
|
def preprocess_text(text, lang='german', apply_preprocessing=False): |
|
if not apply_preprocessing: |
|
return text |
|
|
|
text = text.lower() |
|
text = re.sub(r'[^a-zA-Z\s]', '', text) |
|
|
|
try: |
|
tokens = word_tokenize(text, language=lang) |
|
except LookupError: |
|
print(f"Warning: NLTK punkt tokenizer for {lang} not found. Using simple tokenization.") |
|
tokens = simple_tokenize(text) |
|
|
|
try: |
|
stop_words = set(stopwords.words(lang)) |
|
except LookupError: |
|
print(f"Warning: Stopwords for {lang} not found. Skipping stopword removal.") |
|
stop_words = set() |
|
tokens = [token for token in tokens if token not in stop_words] |
|
|
|
try: |
|
stemmer = SnowballStemmer(lang) |
|
tokens = [stemmer.stem(token) for token in tokens] |
|
except ValueError: |
|
print(f"Warning: SnowballStemmer for {lang} not available. Skipping stemming.") |
|
|
|
return ' '.join(tokens) |
|
|
|
def phonetic_match(text, query, method='levenshtein_distance', apply_phonetic=False): |
|
if not apply_phonetic: |
|
return 0 |
|
if method == 'levenshtein_distance': |
|
text_phonetic = jellyfish.soundex(text) |
|
query_phonetic = jellyfish.soundex(query) |
|
return jellyfish.levenshtein_distance(text_phonetic, query_phonetic) |
|
return 0 |
|
|
|
|
|
from typing import List, Union |
|
import torch |
|
from transformers import AutoTokenizer |
|
import numpy as np |
|
from nltk.tokenize import word_tokenize |
|
from nltk.corpus import wordnet |
|
import nltk |
|
|
|
def optimize_query( |
|
query: str, |
|
query_optimization_model: str, |
|
chunks: List[str], |
|
embedding_model: str, |
|
vector_store_type: str, |
|
search_type: str, |
|
top_k: int = 3, |
|
use_gpu: bool = False |
|
) -> str: |
|
""" |
|
CPU-optimized version of query expansion using a small language model. |
|
|
|
Args: |
|
query: Original search query |
|
query_optimization_model: Name or path of the model to use for optimization |
|
chunks: List of text chunks to search through |
|
embedding_model: Name of the embedding model being used |
|
vector_store_type: Type of vector store being used |
|
search_type: Type of search being performed |
|
top_k: Number of expansion terms to add |
|
use_gpu: Whether to use GPU if available (defaults to False for CPU) |
|
|
|
Returns: |
|
Expanded query string |
|
""" |
|
try: |
|
|
|
device = "cuda" if use_gpu and torch.cuda.is_available() else "cpu" |
|
|
|
|
|
tokens = word_tokenize(query.lower()) |
|
|
|
|
|
expanded_terms = set() |
|
for token in tokens: |
|
|
|
synsets = wordnet.synsets(token)[:1] |
|
for syn in synsets: |
|
|
|
expanded_terms.update([lemma.name() for lemma in syn.lemmas()[:2]]) |
|
|
|
|
|
try: |
|
|
|
llm_pipeline = pipeline(model="meta-llama/Llama-3.2-1B-Instruct", device='cpu') |
|
|
|
|
|
prompt = f''' |
|
<|start_header_id|>system<|end_header_id|> |
|
You are an expert in enhancing user input for vector store retrieval. |
|
Enhance the followinf search query with relevant terms. |
|
|
|
show me just the new term. You SHOULD NOT include any other text in the response. |
|
|
|
<|eot_id|><|start_header_id|>user<|end_header_id|> |
|
{query} |
|
<|eot_id|><|start_header_id|>assistant<|end_header_id|> |
|
''' |
|
|
|
|
|
suggested_settings = llm_pipeline( |
|
prompt, |
|
do_sample=True, |
|
top_k=10, |
|
num_return_sequences=1, |
|
return_full_text=False, |
|
max_new_tokens=1900, |
|
truncation=True |
|
) |
|
|
|
|
|
generated_text = suggested_settings[0].get('generated_text', '') |
|
print(generated_text) |
|
|
|
except Exception as model_error: |
|
print(f"LLM-based expansion failed: {str(model_error)}") |
|
generated_text = "Default settings could not be generated." |
|
|
|
|
|
|
|
final_terms = set(tokens) |
|
final_terms.update(expanded_terms) |
|
if generated_text != query: |
|
final_terms.update(word_tokenize(generated_text.lower())) |
|
|
|
|
|
stopwords = set(['the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to']) |
|
final_terms = [term for term in final_terms if term not in stopwords] |
|
|
|
|
|
generated_text = f"{query} {' '.join(list(final_terms)[:top_k])}" |
|
print(generated_text) |
|
|
|
|
|
|
|
return generated_text.strip() |
|
|
|
except Exception as e: |
|
print(f"Query optimization failed: {str(e)}") |
|
return query |
|
|
|
|
|
|
|
|
|
""" |
|
chunks = ["sample text chunk 1", "sample text chunk 2"] |
|
query = "machine learning algorithms" |
|
optimized_query = optimize_query( |
|
query=query, |
|
chunks=chunks, |
|
embedding_model="sentence-transformers/all-MiniLM-L6-v2", |
|
use_gpu=False # Explicitly use CPU |
|
) |
|
""" |
|
|
|
|
|
def create_custom_embedding(texts, model_type='word2vec', vector_size=100, window=5, min_count=1): |
|
tokenized_texts = [text.split() for text in texts] |
|
|
|
if model_type == 'word2vec': |
|
model = Word2Vec(sentences=tokenized_texts, vector_size=vector_size, window=window, min_count=min_count, workers=4) |
|
elif model_type == 'fasttext': |
|
model = FastText(sentences=tokenized_texts, vector_size=vector_size, window=window, min_count=min_count, workers=4) |
|
else: |
|
raise ValueError("Unsupported model type") |
|
|
|
return model |
|
|
|
class CustomEmbeddings(HuggingFaceEmbeddings): |
|
def __init__(self, model_path): |
|
self.model = Word2Vec.load(model_path) |
|
|
|
def embed_documents(self, texts): |
|
return [self.model.wv[text.split()] for text in texts] |
|
|
|
def embed_query(self, text): |
|
return self.model.wv[text.split()] |
|
|
|
|
|
def create_custom_tokenizer(file_path, model_type='WordLevel', vocab_size=10000, special_tokens=None): |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
text = f.read() |
|
|
|
if model_type == 'WordLevel': |
|
tokenizer = Tokenizer(WordLevel(unk_token="[UNK]")) |
|
elif model_type == 'BPE': |
|
tokenizer = Tokenizer(models.BPE(unk_token="[UNK]")) |
|
elif model_type == 'Unigram': |
|
tokenizer = Tokenizer(models.Unigram()) |
|
else: |
|
raise ValueError(f"Unsupported tokenizer model: {model_type}") |
|
|
|
tokenizer.pre_tokenizer = Whitespace() |
|
|
|
special_tokens = special_tokens or ["[UNK]", "[CLS]", "[SEP]", "[PAD]", "[MASK]"] |
|
trainer = trainers.WordLevelTrainer(special_tokens=special_tokens, vocab_size=vocab_size) |
|
tokenizer.train_from_iterator([text], trainer) |
|
|
|
return tokenizer |
|
|
|
def custom_tokenize(text, tokenizer): |
|
return tokenizer.encode(text).tokens |
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_text_splitter(split_strategy, chunk_size, overlap_size, custom_separators=None): |
|
if split_strategy == 'token': |
|
return TokenTextSplitter(chunk_size=chunk_size, chunk_overlap=overlap_size) |
|
elif split_strategy == 'recursive': |
|
return RecursiveCharacterTextSplitter( |
|
chunk_size=chunk_size, |
|
chunk_overlap=overlap_size, |
|
add_start_index=True, |
|
strip_whitespace=True, |
|
separators=custom_separators or ["\n\n", "\n", " ", ""] |
|
) |
|
else: |
|
raise ValueError(f"Unsupported split strategy: {split_strategy}") |
|
|
|
def get_embedding_model(model_type, model_name): |
|
model_path = model_manager.get_model(model_type, model_name) |
|
if model_type == 'HuggingFace': |
|
return HuggingFaceEmbeddings( |
|
model_name=model_path, |
|
multi_process=True, |
|
|
|
|
|
) |
|
elif model_type == 'OpenAI': |
|
return OpenAIEmbeddings(model=model_path) |
|
elif model_type == 'Cohere': |
|
return CohereEmbeddings(model=model_path) |
|
else: |
|
raise ValueError(f"Unsupported model type: {model_type}") |
|
|
|
def get_vector_store(vector_store_type, chunks, embedding_model): |
|
chunks_tuple = tuple(chunks) |
|
if vector_store_type == 'FAISS': |
|
return FAISS.from_texts(chunks, embedding_model) |
|
elif vector_store_type == 'Chroma': |
|
return Chroma.from_texts(chunks, embedding_model) |
|
else: |
|
raise ValueError(f"Unsupported vector store type: {vector_store_type}") |
|
|
|
def get_retriever(vector_store, search_type, search_kwargs): |
|
if search_type == 'similarity': |
|
return vector_store.as_retriever(search_type="similarity", search_kwargs=search_kwargs) |
|
elif search_type == 'mmr': |
|
return vector_store.as_retriever(search_type="mmr", search_kwargs=search_kwargs) |
|
elif search_type == 'custom': |
|
return vector_store.as_retriever(search_type="similarity", search_kwargs=search_kwargs) |
|
else: |
|
raise ValueError(f"Unsupported search type: {search_type}") |
|
|
|
def custom_similarity(query_embedding, doc_embedding, query, doc_text, phonetic_weight=0.3): |
|
embedding_sim = np.dot(query_embedding, doc_embedding) / (np.linalg.norm(query_embedding) * np.linalg.norm(doc_embedding)) |
|
phonetic_sim = phonetic_match(doc_text, query) |
|
combined_sim = (1 - phonetic_weight) * embedding_sim + phonetic_weight * phonetic_sim |
|
return combined_sim |
|
|
|
def _create_vector_store(vector_store_type, chunks_tuple, embedding_model): |
|
chunks = list(chunks_tuple) |
|
|
|
if vector_store_type == 'FAISS': |
|
return FAISS.from_texts(chunks, embedding_model) |
|
elif vector_store_type == 'Chroma': |
|
return Chroma.from_texts(chunks, embedding_model) |
|
else: |
|
raise ValueError(f"Unsupported vector store type: {vector_store_type}") |
|
|
|
|
|
|
|
def process_files(file_path, model_type, model_name, split_strategy, chunk_size, overlap_size, custom_separators, lang='german', apply_preprocessing=False, custom_tokenizer_file=None, custom_tokenizer_model=None, custom_tokenizer_vocab_size=10000, custom_tokenizer_special_tokens=None): |
|
if file_path: |
|
text = FileHandler.extract_text(file_path) |
|
else: |
|
text = "" |
|
for file in os.listdir(FILES_DIR): |
|
file_path = os.path.join(FILES_DIR, file) |
|
text += FileHandler.extract_text(file_path) |
|
|
|
if custom_tokenizer_file: |
|
tokenizer = create_custom_tokenizer(custom_tokenizer_file, custom_tokenizer_model, custom_tokenizer_vocab_size, custom_tokenizer_special_tokens) |
|
text = ' '.join(custom_tokenize(text, tokenizer)) |
|
elif apply_preprocessing: |
|
text = preprocess_text(text, lang) |
|
|
|
text_splitter = get_text_splitter(split_strategy, chunk_size, overlap_size, custom_separators) |
|
chunks = text_splitter.split_text(text) |
|
|
|
embedding_model = get_embedding_model(model_type, model_name) |
|
|
|
return chunks, embedding_model, len(text.split()) |
|
|
|
def search_embeddings(chunks, embedding_model, vector_store_type, search_type, query, top_k, expected_result=None, lang='german', apply_phonetic=False, phonetic_weight=0.3): |
|
preprocessed_query = preprocess_text(query, lang) if apply_phonetic else query |
|
|
|
vector_store = get_vector_store(vector_store_type, chunks, embedding_model) |
|
retriever = get_retriever(vector_store, search_type, {"k": top_k}) |
|
|
|
start_time = time.time() |
|
results = retriever.invoke(preprocessed_query) |
|
|
|
|
|
def score_result(doc): |
|
base_score = vector_store.similarity_search_with_score(doc.page_content, k=1)[0][1] |
|
|
|
|
|
expected_bonus = 0.3 if expected_result and expected_result in doc.page_content else 0 |
|
|
|
if apply_phonetic: |
|
phonetic_score = phonetic_match(doc.page_content, query) |
|
return (1 - phonetic_weight) * base_score + phonetic_weight * phonetic_score + expected_bonus |
|
else: |
|
return base_score + expected_bonus |
|
|
|
results = sorted(results, key=score_result, reverse=True) |
|
end_time = time.time() |
|
|
|
embeddings = [] |
|
for doc in results: |
|
if hasattr(doc, 'embedding'): |
|
embeddings.append(doc.embedding) |
|
else: |
|
embeddings.append(None) |
|
|
|
results_df = pd.DataFrame({ |
|
'content': [doc.page_content for doc in results], |
|
'embedding': embeddings, |
|
'length': [len(doc.page_content) for doc in results], |
|
'contains_expected': [expected_result in doc.page_content if expected_result else None for doc in results] |
|
}) |
|
|
|
return results_df, end_time - start_time, vector_store, results |
|
|
|
|
|
class ResultAnalyzer: |
|
@staticmethod |
|
def calculate_statistics(results, search_time, vector_store, num_tokens, embedding_model, query, |
|
top_k, expected_result=None, model_feedback=None): |
|
stats = { |
|
"num_results": len(results), |
|
"avg_content_length": np.mean([len(doc.page_content) for doc in results]) if results else 0, |
|
"min_content_length": min([len(doc.page_content) for doc in results]) if results else 0, |
|
"max_content_length": max([len(doc.page_content) for doc in results]) if results else 0, |
|
"search_time": search_time, |
|
"num_tokens": num_tokens, |
|
"embedding_dimension": len(embedding_model.embed_query(query)), |
|
"top_k": top_k, |
|
} |
|
|
|
|
|
try: |
|
if hasattr(vector_store, '_index'): |
|
stats["vector_store_size"] = vector_store._index.ntotal |
|
elif hasattr(vector_store, '_collection'): |
|
stats["vector_store_size"] = len(vector_store._collection.get()) |
|
except: |
|
stats["vector_store_size"] = "N/A" |
|
|
|
|
|
if expected_result: |
|
stats["contains_expected"] = any(expected_result in doc.page_content for doc in results) |
|
stats["expected_result_rank"] = next((i for i, doc in enumerate(results) |
|
if expected_result in doc.page_content), -1) + 1 |
|
|
|
|
|
if len(results) > 3: |
|
embeddings = [embedding_model.embed_query(doc.page_content) for doc in results] |
|
stats["result_diversity"] = ResultAnalyzer._calculate_diversity(embeddings) |
|
stats["silhouette_score"] = ResultAnalyzer._calculate_silhouette(embeddings) |
|
else: |
|
stats["result_diversity"] = "N/A" |
|
stats["silhouette_score"] = "N/A" |
|
|
|
|
|
query_embedding = embedding_model.embed_query(query) |
|
result_embeddings = [embedding_model.embed_query(doc.page_content) for doc in results] |
|
similarities = [np.inner(query_embedding, emb) for emb in result_embeddings] |
|
if len(similarities) > 1: |
|
rank_correlation, _ = spearmanr(similarities, range(len(similarities))) |
|
stats["rank_correlation"] = rank_correlation |
|
else: |
|
stats["rank_correlation"] = "N/A" |
|
|
|
|
|
if model_feedback: |
|
stats["model_feedback"] = model_feedback |
|
|
|
return stats |
|
|
|
@staticmethod |
|
def _calculate_diversity(embeddings: List[np.ndarray]) -> float: |
|
"""Calculate diversity score for embeddings""" |
|
embeddings_array = np.array(embeddings) |
|
pairwise_similarities = np.inner(embeddings_array, embeddings_array) |
|
return 1 - np.mean(pairwise_similarities[np.triu_indices(len(embeddings), k=1)]) |
|
|
|
@staticmethod |
|
def _calculate_silhouette(embeddings: List[np.ndarray]) -> float: |
|
"""Calculate silhouette score for embeddings""" |
|
if len(embeddings) < 3: |
|
return 0.0 |
|
try: |
|
return silhouette_score(embeddings, range(len(embeddings))) |
|
except: |
|
return 0.0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
def visualize_results(results_df, stats_df): |
|
|
|
if 'model' not in stats_df.columns: |
|
stats_df['model'] = stats_df['model_type'] + ' - ' + stats_df['model_name'] |
|
|
|
fig, axs = plt.subplots(2, 2, figsize=(20, 20)) |
|
|
|
|
|
if len(stats_df) == 0: |
|
return fig |
|
|
|
|
|
try: |
|
sns.barplot(data=stats_df, x='model', y='search_time', ax=axs[0, 0]) |
|
axs[0, 0].set_title('Search Time by Model') |
|
axs[0, 0].tick_params(axis='x', rotation=45) |
|
except Exception as e: |
|
print(f"Error in search time plot: {e}") |
|
|
|
try: |
|
sns.scatterplot(data=stats_df, x='result_diversity', y='rank_correlation', |
|
hue='model', ax=axs[0, 1]) |
|
axs[0, 1].set_title('Result Diversity vs. Rank Correlation') |
|
except Exception as e: |
|
print(f"Error in diversity plot: {e}") |
|
|
|
try: |
|
sns.boxplot(data=stats_df, x='model', y='avg_content_length', ax=axs[1, 0]) |
|
axs[1, 0].set_title('Distribution of Result Content Lengths') |
|
axs[1, 0].tick_params(axis='x', rotation=45) |
|
except Exception as e: |
|
print(f"Error in content length plot: {e}") |
|
|
|
try: |
|
valid_embeddings = results_df['embedding'].dropna().values |
|
if len(valid_embeddings) > 1: |
|
tsne = TSNE(n_components=2, random_state=42) |
|
embeddings_2d = tsne.fit_transform(np.vstack(valid_embeddings)) |
|
sns.scatterplot(x=embeddings_2d[:, 0], y=embeddings_2d[:, 1], |
|
hue=results_df['Model'][:len(valid_embeddings)], |
|
ax=axs[1, 1]) |
|
axs[1, 1].set_title('t-SNE Visualization of Result Embeddings') |
|
else: |
|
axs[1, 1].text(0.5, 0.5, "Not enough embeddings for visualization", |
|
ha='center', va='center') |
|
except Exception as e: |
|
print(f"Error in embedding visualization: {e}") |
|
|
|
plt.tight_layout() |
|
return fig |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def optimize_vocabulary(texts, vocab_size=10000, min_frequency=2): |
|
tokenizer = Tokenizer(models.BPE(unk_token="[UNK]")) |
|
|
|
word_freq = Counter(word for text in texts for word in text.split()) |
|
|
|
optimized_texts = [ |
|
' '.join(word for word in text.split() if word_freq[word] >= min_frequency) |
|
for text in texts |
|
] |
|
|
|
trainer = trainers.BpeTrainer(vocab_size=vocab_size, special_tokens=["[UNK]", "[CLS]", "[SEP]", "[PAD]", "[MASK]"]) |
|
tokenizer.train_from_iterator(optimized_texts, trainer) |
|
|
|
return tokenizer, optimized_texts |
|
|
|
import numpy as np |
|
from transformers import TextClassificationPipeline |
|
from typing import List, Union, Any |
|
|
|
|
|
|
|
model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') |
|
|
|
|
|
def rerank_results( |
|
results: List[Any], |
|
query: str, |
|
reranker: Union[TextClassificationPipeline, Any] |
|
) -> List[Any]: |
|
""" |
|
|
|
""" |
|
if not results: |
|
return results |
|
|
|
|
|
query_embedding = model.encode(query, convert_to_tensor=True) |
|
doc_contents = [doc.page_content for doc in results] |
|
doc_embeddings = model.encode(doc_contents, convert_to_tensor=True) |
|
|
|
|
|
cosine_scores = util.cos_sim(query_embedding, doc_embeddings)[0] |
|
|
|
|
|
reranked_idx = np.argsort(cosine_scores.cpu().numpy())[::-1] |
|
|
|
|
|
reranked_results = [results[i] for i in reranked_idx] |
|
|
|
return reranked_results |
|
|
|
|
|
|
|
def compare_embeddings(file, query, embedding_models, custom_embedding_model, split_strategy, chunk_size, overlap_size, custom_separators, vector_store_type, search_type, top_k, expected_result=None, lang='german', apply_preprocessing=True, optimize_vocab=False, apply_phonetic=True, phonetic_weight=0.3, custom_tokenizer_file=None, custom_tokenizer_model=None, custom_tokenizer_vocab_size=10000, custom_tokenizer_special_tokens=None, use_query_optimization=False, query_optimization_model="google/flan-t5-base", use_reranking=False): |
|
all_results = [] |
|
all_stats = [] |
|
settings = { |
|
"split_strategy": split_strategy, |
|
"chunk_size": chunk_size, |
|
"overlap_size": overlap_size, |
|
"custom_separators": custom_separators, |
|
"vector_store_type": vector_store_type, |
|
"search_type": search_type, |
|
"top_k": top_k, |
|
"lang": lang, |
|
"apply_preprocessing": apply_preprocessing, |
|
"optimize_vocab": optimize_vocab, |
|
"apply_phonetic": apply_phonetic, |
|
"phonetic_weight": phonetic_weight, |
|
"use_query_optimization": use_query_optimization, |
|
"query_optimization_model": query_optimization_model, |
|
"use_reranking": use_reranking |
|
} |
|
|
|
|
|
models = [model.split(':') for model in embedding_models] |
|
if custom_embedding_model: |
|
models.append(custom_embedding_model.strip().split(':')) |
|
|
|
for model_type, model_name in models: |
|
chunks, embedding_model, num_tokens = process_files( |
|
file.name if file else None, |
|
model_type, |
|
model_name, |
|
split_strategy, |
|
chunk_size, |
|
overlap_size, |
|
custom_separators.split(',') if custom_separators else None, |
|
lang, |
|
apply_preprocessing, |
|
custom_tokenizer_file, |
|
custom_tokenizer_model, |
|
int(custom_tokenizer_vocab_size), |
|
custom_tokenizer_special_tokens.split(',') if custom_tokenizer_special_tokens else None |
|
) |
|
|
|
if optimize_vocab: |
|
tokenizer, optimized_chunks = optimize_vocabulary(chunks) |
|
chunks = optimized_chunks |
|
|
|
search_query = query |
|
|
|
if use_query_optimization: |
|
optimized_queries = optimize_query(query, query_optimization_model, chunks, embedding_model, vector_store_type, search_type, top_k) |
|
|
|
search_query = optimized_queries |
|
|
|
results, search_time, vector_store, results_raw = search_embeddings( |
|
chunks, |
|
embedding_model, |
|
vector_store_type, |
|
search_type, |
|
search_query, |
|
top_k, |
|
expected_result, |
|
lang, |
|
apply_phonetic, |
|
phonetic_weight |
|
) |
|
|
|
if use_reranking: |
|
reranker = pipeline("text-classification", model="cross-encoder/ms-marco-MiniLM-L-12-v2") |
|
results_raw = rerank_results(results_raw, query, reranker) |
|
|
|
result_embeddings = [doc.metadata.get('embedding', None) for doc in results_raw] |
|
|
|
stats = ResultAnalyzer.calculate_statistics(results_raw, search_time, vector_store, num_tokens, embedding_model, query, top_k, expected_result) |
|
stats["model"] = f"{model_type} - {model_name}" |
|
stats["model_type"] = model_type |
|
stats["model_name"] = model_name |
|
stats.update(settings) |
|
|
|
formatted_results = format_results(results_raw, stats) |
|
for i, result in enumerate(formatted_results): |
|
result['embedding'] = result_embeddings[i] |
|
result['length'] = len(result['Content']) |
|
result['contains_expected'] = expected_result in result['Content'] if expected_result else None |
|
|
|
all_results.extend(formatted_results) |
|
all_stats.append(stats) |
|
|
|
results_df = pd.DataFrame(all_results) |
|
stats_df = pd.DataFrame(all_stats) |
|
|
|
fig = visualize_results(results_df, stats_df) |
|
|
|
best_results = analyze_results(stats_df) |
|
|
|
return results_df, stats_df, fig, best_results |
|
|
|
def format_results(results, stats): |
|
formatted_results = [] |
|
for doc in results: |
|
result = { |
|
"Model": stats["model"], |
|
"Content": doc.page_content, |
|
"Embedding": doc.embedding if hasattr(doc, 'embedding') else None, |
|
**doc.metadata, |
|
**{k: v for k, v in stats.items() if k not in ["model"]} |
|
} |
|
formatted_results.append(result) |
|
return formatted_results |
|
|
|
|
|
|
|
from sklearn.model_selection import ParameterGrid |
|
from tqdm import tqdm |
|
|
|
|
|
|
|
|
|
def automated_testing(file, query, test_params, expected_result=None): |
|
all_results = [] |
|
all_stats = [] |
|
|
|
param_grid = ParameterGrid(test_params) |
|
print(param_grid) |
|
for params in tqdm(param_grid, desc="Running tests"): |
|
chunks, embedding_model, num_tokens = process_files( |
|
file.name if file else None, |
|
params['model_type'], |
|
params['model_name'], |
|
params['split_strategy'], |
|
params['chunk_size'], |
|
params['overlap_size'], |
|
params.get('custom_separators', None), |
|
params['lang'], |
|
params['apply_preprocessing'], |
|
params.get('custom_tokenizer_file', None), |
|
params.get('custom_tokenizer_model', None), |
|
params.get('custom_tokenizer_vocab_size', 10000), |
|
params.get('custom_tokenizer_special_tokens', None) |
|
) |
|
|
|
if params['optimize_vocab']: |
|
tokenizer, optimized_chunks = optimize_vocabulary(chunks) |
|
chunks = optimized_chunks |
|
|
|
if params['use_query_optimization']: |
|
optimized_queries = optimize_query(query, params['query_optimization_model'], chunks , embedding_model , params['vector_store_type'] , params['search_type'] , params['top_k'] ) |
|
|
|
|
|
query = " ".join(optimized_queries) |
|
|
|
results, search_time, vector_store, results_raw = search_embeddings( |
|
chunks, |
|
embedding_model, |
|
params['vector_store_type'], |
|
params['search_type'], |
|
query, |
|
params['top_k'], |
|
expected_result, |
|
params['lang'], |
|
params['apply_phonetic'], |
|
params['phonetic_weight'] |
|
) |
|
|
|
if params['use_reranking']: |
|
reranker = pipeline("text-classification", model="cross-encoder/ms-marco-MiniLM-L-12-v2") |
|
results_raw = rerank_results(results_raw, query, reranker) |
|
|
|
stats = ResultAnalyzer.calculate_statistics(results_raw, search_time, vector_store, num_tokens, embedding_model, query, params['top_k'], expected_result) |
|
stats["model"] = f"{params['model_type']} - {params['model_name']}" |
|
stats["model_type"] = params['model_type'] |
|
stats["model_name"] = params['model_name'] |
|
stats.update(params) |
|
|
|
all_results.extend(format_results(results_raw, stats)) |
|
all_stats.append(stats) |
|
|
|
return pd.DataFrame(all_results), pd.DataFrame(all_stats) |
|
|
|
|
|
|
|
def analyze_results(stats_df): |
|
metric_weights = { |
|
'search_time': -0.3, |
|
'result_diversity': 0.2, |
|
'rank_correlation': 0.3, |
|
'silhouette_score': 0.2, |
|
'contains_expected': 0.5, |
|
'expected_result_rank': -0.4 |
|
} |
|
if stats_df.empty: |
|
print("stats_df is empty. Cannot compute best configuration.") |
|
return None |
|
|
|
for metric in metric_weights.keys(): |
|
|
|
if metric in stats_df.columns: |
|
stats_df[metric] = pd.to_numeric(stats_df[metric], errors='coerce') |
|
else: |
|
stats_df[metric] = 0 |
|
print("Column 'search_time' is missing in stats_df.") |
|
|
|
|
|
|
|
stats_df['weighted_score'] = sum( |
|
stats_df[metric].fillna(0) * weight |
|
for metric, weight in metric_weights.items() |
|
) |
|
|
|
best_config = stats_df.loc[stats_df['weighted_score'].idxmax()] |
|
|
|
recommendations = { |
|
'best_model': f"{best_config['model_type']} - {best_config['model_name']}", |
|
'best_settings': { |
|
'split_strategy': best_config['split_strategy'], |
|
'chunk_size': int(best_config['chunk_size']), |
|
'overlap_size': int(best_config['overlap_size']), |
|
'vector_store_type': best_config['vector_store_type'], |
|
'search_type': best_config['search_type'], |
|
'top_k': int(best_config['top_k']), |
|
'optimize_vocab': bool(best_config['optimize_vocab']), |
|
'use_query_optimization': bool(best_config['use_query_optimization']), |
|
'use_reranking': bool(best_config['use_reranking']), |
|
'lang': best_config['lang'], |
|
'apply_preprocessing': bool(best_config['apply_preprocessing']), |
|
'apply_phonetic': bool(best_config['apply_phonetic']), |
|
'phonetic_weight': float(best_config['phonetic_weight']) |
|
}, |
|
'performance_summary': { |
|
'search_time': float(best_config['search_time']), |
|
'result_diversity': float(best_config['result_diversity']), |
|
'rank_correlation': float(best_config['rank_correlation']), |
|
'silhouette_score': float(best_config['silhouette_score']), |
|
'contains_expected': bool(best_config['contains_expected']), |
|
'expected_result_rank': int(best_config['expected_result_rank']) |
|
} |
|
} |
|
|
|
return recommendations |
|
|
|
|
|
import ast |
|
|
|
def get_llm_suggested_settings(file, num_chunks=1): |
|
if not file: |
|
return {"error": "No file uploaded"} |
|
|
|
chunks, _, _ = process_files( |
|
file.name, |
|
'HuggingFace', |
|
'paraphrase-miniLM', |
|
'recursive', |
|
250, |
|
50, |
|
custom_separators=None |
|
) |
|
|
|
|
|
sample_chunks = random.sample(chunks, min(num_chunks, len(chunks))) |
|
|
|
|
|
llm_pipeline = pipeline(model="meta-llama/Llama-3.2-1B-Instruct", device='cpu') |
|
|
|
|
|
prompt=f''' |
|
<|start_header_id|>system<|end_header_id|> |
|
You are an expert in information retrieval. |
|
You know about strenghs and weaknesses of all models. |
|
|
|
Given the following text chunks from a document, |
|
suggest optimal settings for an embedding-based search system. The settings should include: |
|
|
|
1. Embedding model type and name |
|
2. Split strategy (token or recursive) |
|
3. Chunk size |
|
4. Overlap size |
|
5. Vector store type (FAISS or Chroma) |
|
6. Search type (similarity, mmr, or custom) |
|
7. Top K results to retrieve |
|
8. Whether to apply preprocessing |
|
9. Whether to optimize vocabulary |
|
10. Whether to apply phonetic matching |
|
|
|
Expected output format: |
|
{{ |
|
"embedding_models": "embedding_model_type:embedding_model_name", |
|
"split_strategy": "token or recursive", |
|
"chunk_size": 250, |
|
"overlap_size": 50, |
|
"vector_store_type": "FAISS or Chroma", |
|
"search_type": "similarity, mmr, or custom", |
|
"top_k": 5, |
|
"apply_preprocessing": True, |
|
"optimize_vocab": True, |
|
"apply_phonetic": False, |
|
"phonetic_weight": 0.3 # |
|
}} |
|
|
|
Provide your suggestions in a Python dictionary format. |
|
|
|
show me settings You SHOULD NOT include any other text in the response. |
|
Fill out the seeting and chose usefull values. |
|
Respect the users use cases and content snipet. Choose the setting based on the chunks |
|
|
|
<|eot_id|><|start_header_id|>user<|end_header_id|> |
|
User user case: |
|
{"small local", "large total context", ...} |
|
|
|
total content lenght: |
|
{len(' '.join(chunks))} |
|
|
|
Content snipet: |
|
{' '.join(sample_chunks)} |
|
<|eot_id|><|start_header_id|>assistant<|end_header_id|> |
|
''' |
|
suggested_settings = llm_pipeline( |
|
prompt, |
|
do_sample=True, |
|
top_k=10, |
|
num_return_sequences=1, |
|
return_full_text=False, |
|
max_new_tokens=1900, |
|
truncation=True, |
|
) |
|
|
|
print(suggested_settings[0]['generated_text']) |
|
|
|
try: |
|
|
|
settings_dict = ast.literal_eval(suggested_settings[0]['generated_text']) |
|
|
|
|
|
return { |
|
"embedding_models": settings_dict["embedding_models"], |
|
"split_strategy": settings_dict["split_strategy"], |
|
"chunk_size": settings_dict["chunk_size"], |
|
"overlap_size": settings_dict["overlap_size"], |
|
"vector_store_type": settings_dict["vector_store_type"], |
|
"search_type": settings_dict["search_type"], |
|
"top_k": settings_dict["top_k"], |
|
"apply_preprocessing": settings_dict["apply_preprocessing"], |
|
"optimize_vocab": settings_dict["optimize_vocab"], |
|
"apply_phonetic": settings_dict["apply_phonetic"], |
|
"phonetic_weight": settings_dict.get("phonetic_weight", 0.3) |
|
} |
|
except Exception as e: |
|
print(f"Error parsing LLM suggestions: {e}") |
|
return {"error": "Failed to parse LLM suggestions"} |
|
|
|
|
|
def update_inputs_with_llm_suggestions(suggestions): |
|
if suggestions is None or "error" in suggestions: |
|
return [gr.update() for _ in range(11)] |
|
|
|
return [ |
|
gr.update(value=[suggestions["embedding_models"]]), |
|
gr.update(value=suggestions["split_strategy"]), |
|
gr.update(value=suggestions["chunk_size"]), |
|
gr.update(value=suggestions["overlap_size"]), |
|
gr.update(value=suggestions["vector_store_type"]), |
|
gr.update(value=suggestions["search_type"]), |
|
gr.update(value=suggestions["top_k"]), |
|
gr.update(value=suggestions["apply_preprocessing"]), |
|
gr.update(value=suggestions["optimize_vocab"]), |
|
gr.update(value=suggestions["apply_phonetic"]), |
|
gr.update(value=suggestions["phonetic_weight"]) |
|
] |
|
|
|
def parse_model_selections(default_models, custom_models): |
|
""" |
|
Parse selected default models and custom models into model configurations |
|
|
|
Args: |
|
default_models (List[str]): Selected default models in format "type:name" |
|
custom_models (str): Custom models string with one model per line in format "type:name" |
|
|
|
Returns: |
|
List[Dict[str, str]]: List of model configurations with 'type' and 'name' keys |
|
""" |
|
model_configs = [] |
|
|
|
|
|
if default_models: |
|
for model in default_models: |
|
model_type, model_name = model.split(':') |
|
model_configs.append({ |
|
'type': model_type, |
|
'name': model_name |
|
}) |
|
|
|
|
|
if custom_models: |
|
custom_model_lines = custom_models.strip().split('\n') |
|
for line in custom_model_lines: |
|
if line.strip() and ':' in line: |
|
model_type, model_name = line.strip().split(':') |
|
model_configs.append({ |
|
'type': model_type.strip(), |
|
'name': model_name.strip() |
|
}) |
|
|
|
return model_configs |
|
|
|
def parse_comma_separated(text): |
|
"""Parse comma-separated values into a list""" |
|
if not text: |
|
return [] |
|
return [x.strip() for x in text.split(',') if x.strip()] |
|
|
|
|
|
|
|
|
|
def launch_interface(debug=True): |
|
with gr.Blocks() as iface: |
|
gr.Markdown("# Advanced Embedding Comparison Tool") |
|
|
|
with gr.Tab("Simple"): |
|
file_input = gr.File(label="Upload File (Optional)") |
|
query_input = gr.Textbox(label="Search Query") |
|
expected_result_input = gr.Textbox(label="Expected Result (Optional)") |
|
embedding_models_input = gr.CheckboxGroup( |
|
choices=[ |
|
"HuggingFace:paraphrase-miniLM", |
|
"HuggingFace:paraphrase-mpnet", |
|
"OpenAI:text-embedding-ada-002", |
|
"Cohere:embed-multilingual-v2.0" |
|
], |
|
label="Embedding Models" |
|
) |
|
top_k_input = gr.Slider(1, 10, step=1, value=5, label="Top K") |
|
|
|
with gr.Tab("Advanced"): |
|
custom_embedding_model_input = gr.Textbox(label="Custom Embedding Model (optional, format: type:name)") |
|
split_strategy_input = gr.Radio(choices=["token", "recursive"], label="Split Strategy", value="recursive") |
|
chunk_size_input = gr.Slider(100, 1000, step=100, value=500, label="Chunk Size") |
|
overlap_size_input = gr.Slider(0, 100, step=10, value=50, label="Overlap Size") |
|
custom_separators_input = gr.Textbox(label="Custom Split Separators (comma-separated, optional)") |
|
vector_store_type_input = gr.Radio(choices=["FAISS", "Chroma"], label="Vector Store Type", value="FAISS") |
|
search_type_input = gr.Radio(choices=["similarity", "mmr", "custom"], label="Search Type", value="similarity") |
|
lang_input = gr.Dropdown(choices=["german", "english", "french"], label="Language", value="german") |
|
|
|
with gr.Tab("Expert"): |
|
apply_preprocessing_input = gr.Checkbox(label="Apply Text Preprocessing", value=False) |
|
optimize_vocab_input = gr.Checkbox(label="Optimize Vocabulary", value=False) |
|
apply_phonetic_input = gr.Checkbox(label="Apply Phonetic Matching", value=False) |
|
phonetic_weight_input = gr.Slider(0, 1, step=0.1, value=0.3, label="Phonetic Matching Weight") |
|
custom_tokenizer_file_input = gr.File(label="Custom Tokenizer File (Optional)") |
|
custom_tokenizer_model_input = gr.Textbox(label="Custom Tokenizer Model (e.g., WordLevel, BPE, Unigram)") |
|
custom_tokenizer_vocab_size_input = gr.Textbox(label="Custom Tokenizer Vocab Size", value="10000") |
|
custom_tokenizer_special_tokens_input = gr.Textbox(label="Custom Tokenizer Special Tokens (comma-separated)") |
|
use_query_optimization_input = gr.Checkbox(label="Use Query Optimization", value=False) |
|
query_optimization_model_input = gr.Textbox(label="Query Optimization Model (google/flan-t5-base) ", value="") |
|
use_reranking_input = gr.Checkbox(label="Use Reranking", value=False) |
|
|
|
with gr.Tab("Automation"): |
|
with gr.Row(): |
|
auto_file_input = gr.File(label="Upload File (Optional)") |
|
auto_query_input = gr.Textbox(label="Search Query") |
|
|
|
with gr.Row(): |
|
auto_expected_result_input = gr.Textbox( |
|
label="Expected Result (Optional)", |
|
placeholder="Enter expected text if you want to evaluate accuracy" |
|
) |
|
model_feedback_input = gr.Textbox( |
|
label="Model Feedback (Optional)", |
|
placeholder="Enter any feedback about model performance" |
|
) |
|
|
|
with gr.Row(): |
|
with gr.Column(): |
|
|
|
default_models_input = gr.CheckboxGroup( |
|
choices=[f"{type}:{name}" |
|
for type, names in DEFAULT_MODELS.items() |
|
for name in names], |
|
label="Default Models", |
|
value=[f"HuggingFace:{DEFAULT_MODELS['HuggingFace'][0]}"] |
|
) |
|
|
|
with gr.Column(): |
|
|
|
custom_models_input = gr.TextArea( |
|
label="Custom Models (Optional)", |
|
placeholder="Enter one model per line in format: type:name", |
|
lines=3 |
|
) |
|
|
|
auto_split_strategies = gr.CheckboxGroup( |
|
choices=["token", "recursive"], |
|
label="Split Strategies to Test" |
|
) |
|
auto_chunk_sizes = gr.TextArea(label="Chunk Sizes to Test (comma-separated)") |
|
auto_overlap_sizes = gr.TextArea(label="Overlap Sizes to Test (comma-separated)") |
|
auto_vector_store_types = gr.CheckboxGroup( |
|
choices=["FAISS", "Chroma"], |
|
label="Vector Store Types to Test" |
|
) |
|
auto_search_types = gr.CheckboxGroup( |
|
choices=["similarity", "mmr", "custom"], |
|
label="Search Types to Test" |
|
) |
|
auto_top_k = gr.TextArea(label="Top K Values to Test (comma-separated)") |
|
auto_optimize_vocab = gr.Checkbox(label="Test Vocabulary Optimization", value=True) |
|
auto_use_query_optimization = gr.Checkbox(label="Test Query Optimization", value=True) |
|
auto_use_reranking = gr.Checkbox(label="Test Reranking", value=True) |
|
|
|
auto_results_output = gr.Dataframe(label="Automated Test Results", interactive=False) |
|
auto_stats_output = gr.Dataframe(label="Automated Test Statistics", interactive=False) |
|
recommendations_output = gr.JSON(label="Recommendations") |
|
|
|
def run_automation(file_input, query_input, expected_result, default_models, custom_models, |
|
split_strategies, chunk_sizes, overlap_sizes, |
|
vector_store_types, search_types, top_k_values, |
|
optimize_vocab, use_query_optimization, use_reranking, |
|
model_feedback): |
|
"""Wrapper function to handle Gradio inputs and run automated tests""" |
|
|
|
|
|
model_configs = parse_model_selections(default_models, custom_models) |
|
|
|
|
|
test_params = { |
|
'split_strategy': split_strategies, |
|
'chunk_size': parse_comma_separated(chunk_sizes), |
|
'overlap_size': parse_comma_separated(overlap_sizes), |
|
'vector_store_type': vector_store_types, |
|
'search_type': search_types, |
|
'top_k': parse_comma_separated(top_k_values), |
|
'optimize_vocab': [optimize_vocab], |
|
'use_query_optimization': [use_query_optimization], |
|
'use_reranking': [use_reranking], |
|
'lang': ['en'], |
|
'apply_preprocessing': [True], |
|
'apply_phonetic': [False], |
|
'phonetic_weight': [0.5], |
|
'custom_separators': [None], |
|
'query_optimization_model': ['google/flan-t5-base'] |
|
} |
|
|
|
|
|
results_df, stats_df = run_automated_tests( |
|
file_input.name if file_input else None, |
|
query_input, |
|
model_configs, |
|
test_params, |
|
expected_result if expected_result else None, |
|
model_feedback if model_feedback else None |
|
) |
|
|
|
|
|
recommendations = analyze_results(stats_df) |
|
|
|
return results_df, stats_df, recommendations |
|
|
|
auto_submit_button = gr.Button("Run Automated Tests") |
|
auto_submit_button.click( |
|
fn=run_automation, |
|
inputs=[ |
|
auto_file_input, auto_query_input, auto_expected_result_input, |
|
default_models_input, custom_models_input, |
|
auto_split_strategies, auto_chunk_sizes, auto_overlap_sizes, |
|
auto_vector_store_types, auto_search_types, auto_top_k, |
|
auto_optimize_vocab, auto_use_query_optimization, auto_use_reranking, |
|
model_feedback_input |
|
], |
|
outputs=[auto_results_output, auto_stats_output, recommendations_output] |
|
) |
|
|
|
|
|
|
|
with gr.Tab("LLM Suggestions"): |
|
llm_file_input = gr.File(label="Upload File for LLM Suggestions") |
|
llm_num_chunks = gr.Slider(1, 10, step=1, value=5, label="Number of Sample Chunks") |
|
llm_suggest_button = gr.Button("Get LLM Suggestions") |
|
llm_suggestions_output = gr.JSON(label="LLM-suggested Settings") |
|
|
|
llm_suggest_button.click( |
|
fn=get_llm_suggested_settings, |
|
inputs=[llm_file_input, llm_num_chunks], |
|
outputs=[llm_suggestions_output] |
|
).then( |
|
fn=update_inputs_with_llm_suggestions, |
|
inputs=[llm_suggestions_output], |
|
outputs=[ |
|
embedding_models_input, split_strategy_input, chunk_size_input, |
|
overlap_size_input, vector_store_type_input, search_type_input, |
|
top_k_input, apply_preprocessing_input, optimize_vocab_input, |
|
apply_phonetic_input, phonetic_weight_input |
|
] |
|
) |
|
|
|
results_output = gr.Dataframe(label="Results", interactive=False) |
|
stats_output = gr.Dataframe(label="Statistics", interactive=False) |
|
plot_output = gr.Plot(label="Visualizations") |
|
best_settings_output = gr.JSON(label="Best Settings") |
|
|
|
submit_button = gr.Button("Compare Embeddings") |
|
submit_button.click( |
|
|
|
fn=lambda *args: compare_embeddings(*args), |
|
inputs=[ |
|
file_input, query_input, embedding_models_input, custom_embedding_model_input, |
|
split_strategy_input, chunk_size_input, overlap_size_input, custom_separators_input, |
|
vector_store_type_input, search_type_input, top_k_input, expected_result_input, lang_input, |
|
apply_preprocessing_input, optimize_vocab_input, apply_phonetic_input, |
|
phonetic_weight_input, custom_tokenizer_file_input, custom_tokenizer_model_input, |
|
custom_tokenizer_vocab_size_input, custom_tokenizer_special_tokens_input, |
|
use_query_optimization_input, query_optimization_model_input, use_reranking_input |
|
], |
|
outputs=[results_output, stats_output, plot_output, best_settings_output] |
|
) |
|
|
|
|
|
|
|
use_case_md = """ |
|
# 🚀 AI Act Embedding Use Case Guide |
|
|
|
## 📚 Use Case: Embedding the German AI Act for Local Chat Retrieval |
|
|
|
In this guide, we'll walk through the process of embedding the German version of the AI Act using our advanced embedding tool and MTEB. We'll then use these embeddings in a local chat application as a retriever/context. |
|
|
|
### Step 1: Prepare the Document 📄 |
|
|
|
1. Download the German version of the AI Act (let's call it `ai_act_de.txt`). |
|
2. Place the file in your project directory. |
|
|
|
### Step 2: Set Up the Embedding Tool 🛠️ |
|
|
|
1. Open the Embedding Comparison Tool. |
|
2. Navigate to the new "Automation" tab. |
|
|
|
### Step 3: Configure the Automated Test 🔧 |
|
|
|
In the "Use Case" tab, set up the following configuration: |
|
|
|
```markdown |
|
- File: ai_act_de.txt |
|
- Query: "Wie definiert das Gesetz KI-Systeme?" |
|
- Model Types: ✅ HuggingFace, ✅ Sentence Transformers |
|
- Model Names: paraphrase-multilingual-MiniLM-L12-v2, distiluse-base-multilingual-cased-v2 |
|
- Split Strategies: ✅ recursive, ✅ token |
|
- Chunk Sizes: 256, 512, 1024 |
|
- Overlap Sizes: 32, 64, 128 |
|
- Vector Store Types: ✅ FAISS |
|
- Search Types: ✅ similarity, ✅ mmr |
|
- Top K Values: 3, 5, 7 |
|
- Test Vocabulary Optimization: ✅ |
|
- Test Query Optimization: ✅ |
|
- Test Reranking: ✅ |
|
``` |
|
|
|
### Step 4: Run the Automated Test 🏃♂️ |
|
|
|
Click the "Run Automated Tests" button and wait for the results. |
|
|
|
### Step 5: Analyze the Results 📊 |
|
|
|
Let's say we got the following simulated results: |
|
|
|
```markdown |
|
Best Model: Sentence Transformers - paraphrase-multilingual-MiniLM-L12-v2 |
|
Best Settings: |
|
- Split Strategy: recursive |
|
- Chunk Size: 512 |
|
- Overlap Size: 64 |
|
- Vector Store Type: FAISS |
|
- Search Type: mmr |
|
- Top K: 5 |
|
- Optimize Vocabulary: True |
|
- Use Query Optimization: True |
|
- Use Reranking: True |
|
|
|
Performance Summary: |
|
- Search Time: 0.15s |
|
- Result Diversity: 0.82 |
|
- Rank Correlation: 0.91 |
|
- Silhouette Score: 0.76 |
|
``` |
|
|
|
### Step 6: Understand the Results 🧠 |
|
|
|
1. **Model**: The Sentence Transformers model performed better, likely due to its multilingual capabilities and fine-tuning for paraphrasing tasks. |
|
|
|
2. **Split Strategy**: Recursive splitting worked best, probably because it respects the document's structure better than fixed-length token splitting. |
|
|
|
3. **Chunk Size**: 512 tokens provide a good balance between context and specificity. |
|
|
|
4. **Search Type**: MMR (Maximum Marginal Relevance) outperformed simple similarity search, likely due to its ability to balance relevance and diversity in results. |
|
|
|
5. **Optimizations**: All optimizations (vocabulary, query, and reranking) proved beneficial, indicating that the extra processing time is worth the improved results. |
|
|
|
### Step 7: Implement in Local Chat 💬 |
|
|
|
Now that we have the optimal settings, let's implement this in a local chat application: |
|
|
|
1. Use the `paraphrase-multilingual-MiniLM-L12-v2` model for embeddings. |
|
2. Set up a FAISS vector store with the embedded chunks. |
|
3. Implement MMR search with a top-k of 5. |
|
4. Include the optimization steps in your pipeline. |
|
|
|
### Step 8: Test the Implementation 🧪 |
|
|
|
Create a simple chat interface and test with various queries about the AI Act. For example: |
|
|
|
User: "Was sind die Hauptziele des KI-Gesetzes?" |
|
""" |
|
|
|
|
|
tutorial_md = """ |
|
# Advanced Embedding Comparison Tool Tutorial |
|
|
|
Welcome to the **Advanced Embedding Comparison Tool**! This comprehensive guide will help you understand and utilize the tool's features to optimize your **Retrieval-Augmented Generation (RAG)** systems. |
|
|
|
## Table of Contents |
|
1. [Introduction to RAG](#introduction-to-rag) |
|
2. [Key Components of RAG](#key-components-of-rag) |
|
3. [Impact of Parameter Changes](#impact-of-parameter-changes) |
|
4. [Advanced Features](#advanced-features) |
|
5. [Using the Embedding Comparison Tool](#using-the-embedding-comparison-tool) |
|
6. [Automated Testing and Analysis](#automated-testing-and-analysis) |
|
7. [Mathematical Concepts and Metrics](#mathematical-concepts-and-metrics) |
|
8. [Code Examples](#code-examples) |
|
9. [Best Practices and Tips](#best-practices-and-tips) |
|
10. [Resources and Further Reading](#resources-and-further-reading) |
|
|
|
--- |
|
|
|
## Introduction to RAG |
|
|
|
**Retrieval-Augmented Generation (RAG)** is a powerful technique that combines the strengths of large language models (LLMs) with the ability to access and use external knowledge. RAG is particularly useful for: |
|
|
|
- Providing up-to-date information |
|
- Answering questions based on specific documents or data sources |
|
- Reducing hallucinations in AI responses |
|
- Customizing AI outputs for specific domains or use cases |
|
|
|
RAG is ideal for applications requiring accurate, context-specific information retrieval combined with natural language generation, such as chatbots, question-answering systems, and document analysis tools. |
|
|
|
--- |
|
|
|
## Key Components of RAG |
|
|
|
### 1. Document Loading |
|
Ingests documents from various sources (PDFs, web pages, databases, etc.) into a format that can be processed by the RAG system. The tool supports multiple file formats, including PDF, DOCX, and TXT. |
|
|
|
### 2. Document Splitting |
|
Splits large documents into smaller chunks for more efficient processing and retrieval. Available strategies include: |
|
- **Token-based splitting** |
|
- **Recursive splitting** |
|
|
|
### 3. Vector Store and Embeddings |
|
Embeddings are dense vector representations of text that capture semantic meaning. The tool supports multiple embedding models and vector stores: |
|
- **Embedding models**: HuggingFace, OpenAI, Cohere, and custom models. |
|
- **Vector stores**: FAISS and Chroma. |
|
|
|
### 4. Retrieval |
|
Finds the most relevant documents or chunks based on a query. Available retrieval methods include: |
|
- **Similarity search** |
|
- **Maximum Marginal Relevance (MMR)** |
|
- **Custom search methods** |
|
|
|
--- |
|
|
|
## Impact of Parameter Changes |
|
|
|
Understanding how different parameters affect your RAG system is crucial for optimization: |
|
|
|
- **Chunk Size**: Larger chunks provide more context but may reduce precision. Smaller chunks increase precision but may lose context. |
|
- **Overlap**: More overlap helps maintain context between chunks but increases computational load. |
|
- **Embedding Model**: Performance varies across languages and domains. |
|
- **Vector Store**: Affects query speed and the types of searches. |
|
- **Retrieval Method**: Influences the diversity and relevance of retrieved documents. |
|
|
|
--- |
|
|
|
## Advanced Features |
|
|
|
### 1. Custom Tokenization |
|
Upload a custom tokenizer file and specify the tokenizer model, vocabulary size, and special tokens for domain or language-specific tokenization. |
|
|
|
### 2. Query Optimization |
|
Improve search results by generating multiple variations of the input query using a language model to capture different phrasings. |
|
|
|
### 3. Reranking |
|
Further refine search results by using a separate model to re-score and reorder the initial retrieval results. |
|
|
|
### 4. Phonetic Matching |
|
For languages like German, phonetic matching with adjustable weighting is available. |
|
|
|
### 5. Vocabulary Optimization |
|
Optimize vocabulary for domain-specific applications during the embedding process. |
|
|
|
--- |
|
|
|
## Using the Embedding Comparison Tool |
|
|
|
The tool is divided into several tabs for ease of use: |
|
|
|
### Simple Tab |
|
1. **File Upload**: Upload a file (PDF, DOCX, or TXT) or use files from the `./files` directory. |
|
2. **Search Query**: Enter the search query. |
|
3. **Embedding Models**: Select one or more embedding models to compare. |
|
4. **Top K**: Set the number of top results to retrieve (1-10). |
|
|
|
### Advanced Tab |
|
5. **Custom Embedding Model**: Specify a custom embedding model. |
|
6. **Split Strategy**: Choose between 'token' and 'recursive' splitting. |
|
7. **Chunk Size**: Set chunk size (100-1000). |
|
8. **Overlap Size**: Set overlap between chunks (0-100). |
|
9. **Custom Split Separators**: Enter custom separators for text splitting. |
|
10. **Vector Store Type**: Choose between FAISS and Chroma. |
|
11. **Search Type**: Select 'similarity', 'mmr', or 'custom'. |
|
12. **Language**: Specify the document's primary language. |
|
|
|
### Optional Tab |
|
13. **Text Preprocessing**: Toggle text preprocessing. |
|
14. **Vocabulary Optimization**: Enable vocabulary optimization. |
|
15. **Phonetic Matching**: Enable phonetic matching and set its weight. |
|
16. **Custom Tokenizer**: Upload a custom tokenizer and specify parameters. |
|
17. **Query Optimization**: Enable query optimization and specify the model. |
|
18. **Reranking**: Enable result reranking. |
|
|
|
--- |
|
|
|
## Automated Testing and Analysis |
|
|
|
The **Automation tab** allows you to run comprehensive tests across multiple configurations: |
|
|
|
1. Set up test parameters like model types, split strategies, chunk sizes, etc. |
|
2. Click "Run Automated Tests." |
|
3. View results, statistics, and recommendations to find optimal configurations for your use case. |
|
|
|
--- |
|
|
|
## Mathematical Concepts and Metrics |
|
|
|
### Cosine Similarity |
|
Measures the cosine of the angle between two vectors, used in similarity search: |
|
$$\text{cosine similarity} = \frac{\mathbf{A} \cdot \mathbf{B}}{\|\mathbf{A}\| \|\mathbf{B}\|}$$ |
|
|
|
### Maximum Marginal Relevance (MMR) |
|
Balances relevance and diversity in search results: |
|
$$\text{MMR} = \arg\max_{D_i \in R \setminus S} [\lambda \text{Sim}_1(D_i, Q) - (1-\lambda) \max_{D_j \in S} \text{Sim}_2(D_i, D_j)]$$ |
|
|
|
### Silhouette Score |
|
Measures how well an object fits within its own cluster compared to others. Scores range from -1 to 1, where higher values indicate better-defined clusters. |
|
|
|
--- |
|
|
|
## Code Examples |
|
|
|
### Custom Tokenization |
|
```python |
|
def create_custom_tokenizer(file_path, model_type='WordLevel', vocab_size=10000, special_tokens=None): |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
text = f.read() |
|
|
|
tokenizer = Tokenizer(models.WordLevel(unk_token="[UNK]")) if model_type == 'WordLevel' else Tokenizer(models.BPE(unk_token="[UNK]")) |
|
tokenizer.pre_tokenizer = Whitespace() |
|
|
|
trainer = trainers.WordLevelTrainer(special_tokens=special_tokens or ["[UNK]", "[CLS]", "[SEP]", "[PAD]", "[MASK]"], vocab_size=vocab_size) |
|
tokenizer.train_from_iterator([text], trainer) |
|
|
|
return tokenizer |
|
```` |
|
|
|
### Query Optimization |
|
```python |
|
def optimize_query(query, llm): |
|
multi_query_retriever = MultiQueryRetriever.from_llm( |
|
retriever=get_retriever(vector_store, search_type, search_kwargs), |
|
llm=llm |
|
) |
|
optimized_queries = multi_query_retriever.invoke(query) |
|
return optimized_queries |
|
```` |
|
|
|
### Reranking |
|
```python |
|
def rerank_results(results, query, reranker): |
|
reranked_results = reranker.rerank(query, [doc.page_content for doc in results]) |
|
return reranked_results |
|
```` |
|
|
|
### Best Practices and Tips |
|
|
|
- Start Simple: Begin with basic configurations, then gradually add complexity. |
|
- Benchmark: Use automated testing to benchmark different setups. |
|
- Domain-Specific Tuning: Consider custom tokenizers and embeddings for specialized domains. |
|
- Balance Performance and Cost: Use advanced features like query optimization and reranking judiciously. |
|
- Iterate: Optimization is an iterative process—refine your approach based on tool insights. |
|
|
|
|
|
## Useful Resources and Links |
|
|
|
Here are some valuable resources to help you better understand and work with embeddings, retrieval systems, and natural language processing: |
|
|
|
### Embeddings and Vector Databases |
|
- [Understanding Embeddings](https://www.tensorflow.org/text/guide/word_embeddings): A guide by TensorFlow on word embeddings |
|
- [FAISS: A Library for Efficient Similarity Search](https://github.com/facebookresearch/faiss): Facebook AI's vector similarity search library |
|
- [Chroma: The AI-native open-source embedding database](https://www.trychroma.com/): An embedding database designed for AI applications |
|
|
|
### Natural Language Processing |
|
- [NLTK (Natural Language Toolkit)](https://www.nltk.org/): A leading platform for building Python programs to work with human language data |
|
- [spaCy](https://spacy.io/): Industrial-strength Natural Language Processing in Python |
|
- [Hugging Face Transformers](https://huggingface.co/transformers/): State-of-the-art Natural Language Processing for PyTorch and TensorFlow 2.0 |
|
|
|
### Retrieval-Augmented Generation (RAG) |
|
- [LangChain](https://python.langchain.com/docs/get_started/introduction): A framework for developing applications powered by language models |
|
- [OpenAI's RAG Tutorial](https://platform.openai.com/docs/tutorials/web-qa-embeddings): A guide on building a QA system with embeddings |
|
|
|
### German Language Processing |
|
- [Kölner Phonetik](https://en.wikipedia.org/wiki/Cologne_phonetics): Information about the Kölner Phonetik algorithm |
|
- [German NLP Resources](https://github.com/adbar/German-NLP): A curated list of open-access resources for German NLP |
|
|
|
### Benchmarks and Evaluation |
|
- [MTEB Leaderboard](https://huggingface.co/spaces/mteb/leaderboard): Massive Text Embedding Benchmark leaderboard |
|
- [GLUE Benchmark](https://gluebenchmark.com/): General Language Understanding Evaluation benchmark |
|
|
|
### Tools and Libraries |
|
- [Gensim](https://radimrehurek.com/gensim/): Topic modelling for humans |
|
- [Sentence-Transformers](https://www.sbert.net/): A Python framework for state-of-the-art sentence, text and image embeddings |
|
|
|
### Support me |
|
- [Visual Crew Builder](https://visual-crew.builder.ai/): Tool for create AI systems, workflows and api. Or just a notebook. |
|
|
|
|
|
|
|
This tool empowers you to fine-tune your RAG system for optimal performance. Experiment with different settings, run automated tests, and use insights to create an efficient information retrieval and generation system. |
|
|
|
# Template |
|
|
|
python |
|
´´´ |
|
# Chat App Template |
|
def create_chat_app(settings): |
|
def chat(message, history): |
|
# Process the message using the configured embedding model and vector store |
|
chunks, embedding_model, _ = process_files( |
|
settings['file_path'], |
|
settings['model_type'], |
|
settings['model_name'], |
|
settings['split_strategy'], |
|
settings['chunk_size'], |
|
settings['overlap_size'], |
|
settings['custom_separators'], |
|
settings['lang'], |
|
settings['apply_preprocessing'] |
|
) |
|
|
|
results, _, _, _ = search_embeddings( |
|
chunks, |
|
embedding_model, |
|
settings['vector_store_type'], |
|
settings['search_type'], |
|
message, |
|
settings['top_k'], |
|
lang=settings['lang'], |
|
apply_phonetic=settings['apply_phonetic'], |
|
phonetic_weight=settings['phonetic_weight'] |
|
) |
|
|
|
# Generate a response based on the retrieved results |
|
response = f"Based on the query '{message}', here are the top {settings['top_k']} relevant results:\n\n" |
|
for i, result in enumerate(results[:settings['top_k']]): |
|
response += f"{i+1}. {result['content'][:100]}...\n\n" |
|
|
|
return response |
|
|
|
with gr.Blocks() as chat_interface: |
|
gr.Markdown(f"# Chat App using {settings['model_type']} - {settings['model_name']}") |
|
chatbot = gr.Chatbot() |
|
msg = gr.Textbox() |
|
clear = gr.Button("Clear") |
|
|
|
msg.submit(chat, [msg, chatbot], [msg, chatbot]) |
|
clear.click(lambda: None, None, chatbot, queue=False) |
|
|
|
return chat_interface |
|
|
|
# Sample usage of the chat app template |
|
sample_settings = { |
|
'file_path': 'path/to/your/document.pdf', |
|
'model_type': 'HuggingFace', |
|
'model_name': 'paraphrase-miniLM', |
|
'split_strategy': 'recursive', |
|
'chunk_size': 500, |
|
'overlap_size': 50, |
|
'custom_separators': None, |
|
'vector_store_type': 'FAISS', |
|
'search_type': 'similarity', |
|
'top_k': 3, |
|
'lang': 'english', |
|
'apply_preprocessing': True, |
|
'apply_phonetic': True, |
|
'phonetic_weight': 0.3 |
|
} |
|
|
|
sample_chat_app = create_chat_app(sample_settings) |
|
|
|
if __name__ == "__main__": |
|
launch_interface() |
|
# Uncomment the following line to launch the sample chat app |
|
´´´ |
|
|
|
""" |
|
|
|
|
|
iface = gr.TabbedInterface( |
|
[iface, gr.Markdown(tutorial_md), gr.Markdown( use_case_md )], |
|
["Embedding Comparison", "Tutorial", "Use Case"] |
|
) |
|
|
|
iface.launch(debug=True, share=True) |
|
|
|
|
|
def run_automated_tests(file_path: str, query: str, model_configs: List[Dict[str, str]], |
|
test_params: Dict[str, List[Any]], expected_result: Optional[str] = None, |
|
model_feedback: Optional[str] = None) -> Tuple[pd.DataFrame, pd.DataFrame]: |
|
""" |
|
Enhanced automated testing function with support for custom models and feedback |
|
""" |
|
all_results = [] |
|
all_stats = [] |
|
model_manager = ModelManager() |
|
|
|
|
|
base_params = {k: v for k, v in test_params.items() if k not in ['model_type', 'model_name']} |
|
param_grid = ParameterGrid(base_params) |
|
|
|
|
|
for model_config in tqdm(model_configs, desc="Testing models"): |
|
model_type = model_config['type'] |
|
model_name = model_config['name'] |
|
|
|
for params in tqdm(param_grid, desc=f"Testing parameters for {model_type}:{model_name}"): |
|
try: |
|
|
|
chunks, embedding_model, num_tokens = process_files( |
|
file_path, |
|
model_type, |
|
model_name, |
|
params['split_strategy'], |
|
params['chunk_size'], |
|
params['overlap_size'], |
|
params.get('custom_separators'), |
|
params['lang'], |
|
params['apply_preprocessing'] |
|
) |
|
|
|
|
|
if params['optimize_vocab']: |
|
tokenizer, chunks = optimize_vocabulary(chunks) |
|
|
|
|
|
current_query = query |
|
if params['use_query_optimization']: |
|
optimized_queries = optimize_query( |
|
query, |
|
params['query_optimization_model'], |
|
chunks, |
|
embedding_model, |
|
params['vector_store_type'], |
|
params['search_type'], |
|
params['top_k'] |
|
) |
|
current_query = " ".join(optimized_queries) |
|
|
|
|
|
results, search_time, vector_store, raw_results = search_embeddings( |
|
chunks, |
|
embedding_model, |
|
params['vector_store_type'], |
|
params['search_type'], |
|
current_query, |
|
params['top_k'], |
|
expected_result, |
|
params['lang'], |
|
params['apply_phonetic'], |
|
params['phonetic_weight'] |
|
) |
|
|
|
|
|
if params['use_reranking']: |
|
reranker = pipeline("text-classification", |
|
model="cross-encoder/ms-marco-MiniLM-L-12-v2") |
|
raw_results = rerank_results(raw_results, current_query, reranker) |
|
|
|
|
|
stats = ResultAnalyzer.calculate_statistics( |
|
raw_results, search_time, vector_store, num_tokens, |
|
embedding_model, current_query, params['top_k'], |
|
expected_result, model_feedback |
|
) |
|
|
|
|
|
model_id = f"{model_type}:{model_name}" |
|
ranking_score = calculate_model_ranking_score(stats) |
|
model_manager.update_model_ranking(model_id, ranking_score, model_feedback) |
|
|
|
|
|
stats.update({ |
|
"model_type": model_type, |
|
"model_name": model_name, |
|
"model": f"{model_type} - {model_name}", |
|
**params |
|
}) |
|
|
|
|
|
all_results.extend(format_results(raw_results, stats)) |
|
all_stats.append(stats) |
|
|
|
except Exception as e: |
|
print(f"Error testing {model_type}:{model_name} with parameters {params}: {str(e)}") |
|
continue |
|
|
|
return pd.DataFrame(all_results), pd.DataFrame(all_stats) |
|
|
|
|
|
def calculate_model_ranking_score(stats: Dict[str, Any]) -> float: |
|
"""Calculate a composite score for model ranking""" |
|
weights = { |
|
'search_time': -0.2, |
|
'result_diversity': 0.2, |
|
'rank_correlation': 0.3, |
|
'contains_expected': 0.3, |
|
'expected_result_rank': -0.2 |
|
} |
|
|
|
score = 0.0 |
|
for metric, weight in weights.items(): |
|
if metric in stats and not isinstance(stats[metric], str): |
|
if metric == 'contains_expected': |
|
value = float(stats[metric]) |
|
elif metric == 'expected_result_rank': |
|
value = 1.0 / max(stats[metric], 1) |
|
else: |
|
value = float(stats[metric]) |
|
score += weight * value |
|
|
|
return score |
|
|
|
if __name__ == "__main__": |
|
launch_interface() |
|
|