#external files from openai_interface import GPT_Turbo from weaviate_interface import WeaviateClient from llama_index.finetuning import EmbeddingQAFinetuneDataset from prompt_templates import qa_generation_prompt from reranker import ReRanker #standard library imports import json import time import uuid import os import re import random from datetime import datetime from typing import List, Dict, Tuple, Union, Literal #misc from tqdm import tqdm class QueryContextGenerator: ''' Class designed for the generation of query/context pairs using a Generative LLM. The LLM is used to generate questions from a given corpus of text. The query/context pairs can be used to fine-tune an embedding model using a MultipleNegativesRankingLoss loss function or can be used to create evaluation datasets for retrieval models. ''' def __init__(self, openai_key: str, model_id: str='gpt-3.5-turbo-0613'): self.llm = GPT_Turbo(model=model_id, api_key=openai_key) def clean_validate_data(self, data: List[dict], valid_fields: List[str]=['content', 'summary', 'guest', 'doc_id'], total_chars: int=950 ) -> List[dict]: ''' Strip original data chunks so they only contain valid_fields. Remove any chunks less than total_chars in size. Prevents LLM from asking questions from sparse content. ''' clean_docs = [{k:v for k,v in d.items() if k in valid_fields} for d in data] valid_docs = [d for d in clean_docs if len(d['content']) > total_chars] return valid_docs def train_val_split(self, data: List[dict], n_train_questions: int, n_val_questions: int, n_questions_per_chunk: int=2, total_chars: int=950): ''' Splits corpus into training and validation sets. Training and validation samples are randomly selected from the corpus. total_chars parameter is set based on pre-analysis of average doc length in the training corpus. ''' clean_data = self.clean_validate_data(data, total_chars=total_chars) random.shuffle(clean_data) train_index = n_train_questions//n_questions_per_chunk valid_index = n_val_questions//n_questions_per_chunk end_index = valid_index + train_index if end_index > len(clean_data): raise ValueError('Cannot create dataset with desired number of questions, try using a larger dataset') train_data = clean_data[:train_index] valid_data = clean_data[train_index:end_index] print(f'Length Training Data: {len(train_data)}') print(f'Length Validation Data: {len(valid_data)}') return train_data, valid_data def generate_qa_embedding_pairs( self, data: List[dict], generate_prompt_tmpl: str=None, num_questions_per_chunk: int = 2, ) -> EmbeddingQAFinetuneDataset: """ Generate query/context pairs from a list of documents. The query/context pairs can be used for fine-tuning an embedding model using a MultipleNegativesRankingLoss or can be used to create an evaluation dataset for retrieval models. This function was adapted for this course from the llama_index.finetuning.common module: https://github.com/run-llama/llama_index/blob/main/llama_index/finetuning/embeddings/common.py """ generate_prompt_tmpl = qa_generation_prompt if not generate_prompt_tmpl else generate_prompt_tmpl queries = {} relevant_docs = {} corpus = {chunk['doc_id'] : chunk['content'] for chunk in data} for chunk in tqdm(data): summary = chunk['summary'] guest = chunk['guest'] transcript = chunk['content'] node_id = chunk['doc_id'] query = generate_prompt_tmpl.format(summary=summary, guest=guest, transcript=transcript, num_questions_per_chunk=num_questions_per_chunk) try: response = self.llm.get_chat_completion(prompt=query, temperature=0.1, max_tokens=100) except Exception as e: print(e) continue result = str(response).strip().split("\n") questions = [ re.sub(r"^\d+[\).\s]", "", question).strip() for question in result ] questions = [question for question in questions if len(question) > 0] for question in questions: question_id = str(uuid.uuid4()) queries[question_id] = question relevant_docs[question_id] = [node_id] # construct dataset return EmbeddingQAFinetuneDataset( queries=queries, corpus=corpus, relevant_docs=relevant_docs ) def execute_evaluation(dataset: EmbeddingQAFinetuneDataset, class_name: str, retriever: WeaviateClient, reranker: ReRanker=None, alpha: float=0.5, retrieve_limit: int=100, top_k: int=5, chunk_size: int=256, hnsw_config_keys: List[str]=['maxConnections', 'efConstruction', 'ef'], search_type: Literal['kw', 'vector', 'hybrid', 'all']='all', display_properties: List[str]=['doc_id', 'content'], dir_outpath: str='./eval_results', include_miss_info: bool=False, user_def_params: dict=None ) -> Union[dict, Tuple[dict, List[dict]]]: ''' Given a dataset, a retriever, and a reranker, evaluate the performance of the retriever and reranker. Returns a dict of kw, vector, and hybrid hit rates and mrr scores. If inlude_miss_info is True, will also return a list of kw and vector responses and their associated queries that did not return a hit. Args: ----- dataset: EmbeddingQAFinetuneDataset Dataset to be used for evaluation class_name: str Name of Class on Weaviate host to be used for retrieval retriever: WeaviateClient WeaviateClient object to be used for retrieval reranker: ReRanker ReRanker model to be used for results reranking alpha: float=0.5 Weighting factor for BM25 and Vector search. alpha can be any number from 0 to 1, defaulting to 0.5: alpha = 0 executes a pure keyword search method (BM25) alpha = 0.5 weighs the BM25 and vector methods evenly alpha = 1 executes a pure vector search method retrieve_limit: int=5 Number of documents to retrieve from Weaviate host top_k: int=5 Number of top results to evaluate chunk_size: int=256 Number of tokens used to chunk text hnsw_config_keys: List[str]=['maxConnections', 'efConstruction', 'ef'] List of keys to be used for retrieving HNSW Index parameters from Weaviate host search_type: Literal['kw', 'vector', 'hybrid', 'all']='all' Type of search to be evaluated. Options are 'kw', 'vector', 'hybrid', or 'all' display_properties: List[str]=['doc_id', 'content'] List of properties to be returned from Weaviate host for display in response dir_outpath: str='./eval_results' Directory path for saving results. Directory will be created if it does not already exist. include_miss_info: bool=False Option to include queries and their associated search response values for queries that are "total misses" user_def_params : dict=None Option for user to pass in a dictionary of user-defined parameters and their values. Will be automatically added to the results_dict if correct type is passed. ''' reranker_name = reranker.model_name if reranker else "None" results_dict = {'n':retrieve_limit, 'top_k': top_k, 'alpha': alpha, 'Retriever': retriever.model_name_or_path, 'Ranker': reranker_name, 'chunk_size': chunk_size, 'kw_hit_rate': 0, 'kw_mrr': 0, 'vector_hit_rate': 0, 'vector_mrr': 0, 'hybrid_hit_rate':0, 'hybrid_mrr': 0, 'total_misses': 0, 'total_questions':0 } #add extra params to results_dict results_dict = add_params(retriever, class_name, results_dict, user_def_params, hnsw_config_keys) start = time.perf_counter() miss_info = [] for query_id, q in tqdm(dataset.queries.items(), 'Queries'): results_dict['total_questions'] += 1 hit = False #make Keyword, Vector, and Hybrid calls to Weaviate host try: kw_response = retriever.keyword_search(request=q, class_name=class_name, limit=retrieve_limit, display_properties=display_properties) vector_response = retriever.vector_search(request=q, class_name=class_name, limit=retrieve_limit, display_properties=display_properties) hybrid_response = retriever.hybrid_search(request=q, class_name=class_name, alpha=alpha, limit=retrieve_limit, display_properties=display_properties) #rerank returned responses if reranker is provided if reranker: kw_response = reranker.rerank(kw_response, q, top_k=top_k) vector_response = reranker.rerank(vector_response, q, top_k=top_k) hybrid_response = reranker.rerank(hybrid_response, q, top_k=top_k) #collect doc_ids to check for document matches (include only results_top_k) kw_doc_ids = {result['doc_id']:i for i, result in enumerate(kw_response[:top_k], 1)} vector_doc_ids = {result['doc_id']:i for i, result in enumerate(vector_response[:top_k], 1)} hybrid_doc_ids = {result['doc_id']:i for i, result in enumerate(hybrid_response[:top_k], 1)} #extract doc_id for scoring purposes doc_id = dataset.relevant_docs[query_id][0] #increment hit_rate counters and mrr scores if doc_id in kw_doc_ids: results_dict['kw_hit_rate'] += 1 results_dict['kw_mrr'] += 1/kw_doc_ids[doc_id] hit = True if doc_id in vector_doc_ids: results_dict['vector_hit_rate'] += 1 results_dict['vector_mrr'] += 1/vector_doc_ids[doc_id] hit = True if doc_id in hybrid_doc_ids: results_dict['hybrid_hit_rate'] += 1 results_dict['hybrid_mrr'] += 1/hybrid_doc_ids[doc_id] hit = True # if no hits, let's capture that if not hit: results_dict['total_misses'] += 1 miss_info.append({'query': q, 'answer': dataset.corpus[doc_id], 'doc_id': doc_id, 'kw_response': kw_response, 'vector_response': vector_response, 'hybrid_response': hybrid_response}) except Exception as e: print(e) continue #use raw counts to calculate final scores calc_hit_rate_scores(results_dict, search_type=search_type) calc_mrr_scores(results_dict, search_type=search_type) end = time.perf_counter() - start print(f'Total Processing Time: {round(end/60, 2)} minutes') record_results(results_dict, chunk_size, dir_outpath=dir_outpath, as_text=True) if include_miss_info: return results_dict, miss_info return results_dict def calc_hit_rate_scores(results_dict: Dict[str, Union[str, int]], search_type: Literal['kw', 'vector', 'hybrid', 'all']=['kw', 'vector'] ) -> None: if search_type == 'all': search_type = ['kw', 'vector', 'hybrid'] for prefix in search_type: results_dict[f'{prefix}_hit_rate'] = round(results_dict[f'{prefix}_hit_rate']/results_dict['total_questions'],2) def calc_mrr_scores(results_dict: Dict[str, Union[str, int]], search_type: Literal['kw', 'vector', 'hybrid', 'all']=['kw', 'vector'] ) -> None: if search_type == 'all': search_type = ['kw', 'vector', 'hybrid'] for prefix in search_type: results_dict[f'{prefix}_mrr'] = round(results_dict[f'{prefix}_mrr']/results_dict['total_questions'],2) def create_dir(dir_path: str) -> None: ''' Checks if directory exists, and creates new directory if it does not exist ''' if not os.path.exists(dir_path): os.makedirs(dir_path) def record_results(results_dict: Dict[str, Union[str, int]], chunk_size: int, dir_outpath: str='./eval_results', as_text: bool=False ) -> None: ''' Write results to output file in either txt or json format Args: ----- results_dict: Dict[str, Union[str, int]] Dictionary containing results of evaluation chunk_size: int Size of text chunks in tokens dir_outpath: str Path to output directory. Directory only, filename is hardcoded as part of this function. as_text: bool If True, write results as text file. If False, write as json file. ''' create_dir(dir_outpath) time_marker = datetime.now().strftime("%Y-%m-%d-%H-%M-%S") ext = 'txt' if as_text else 'json' path = os.path.join(dir_outpath, f'retrieval_eval_{chunk_size}_{time_marker}.{ext}') if as_text: with open(path, 'a') as f: f.write(f"{results_dict}\n") else: with open(path, 'w') as f: json.dump(results_dict, f, indent=4) def add_params(client: WeaviateClient, class_name: str, results_dict: dict, param_options: dict, hnsw_config_keys: List[str] ) -> dict: hnsw_params = {k:v for k,v in client.show_class_config(class_name)['vectorIndexConfig'].items() if k in hnsw_config_keys} if hnsw_params: results_dict = {**results_dict, **hnsw_params} if param_options and isinstance(param_options, dict): results_dict = {**results_dict, **param_options} return results_dict