Spaces:
Sleeping
Sleeping
import os | |
import json | |
import logging | |
from typing import List, Dict, Any | |
# Haystack imports | |
from haystack.utils import Secret | |
from haystack.components.generators.openai import OpenAIGenerator | |
from haystack.components.embedders import OpenAITextEmbedder | |
from ragas import EvaluationDataset, SingleTurnSample | |
# Ragas imports | |
from ragas.metrics import ( | |
faithfulness, | |
answer_relevancy, | |
context_precision, | |
context_recall, | |
# context_relevancy | |
) | |
from ragas.llms.haystack_wrapper import HaystackLLMWrapper | |
from ragas.embeddings.haystack_wrapper import HaystackEmbeddingsWrapper | |
from ragas import evaluate | |
import pandas as pd | |
# Import the existing RAG pipeline | |
from rag_pipeline import RAGPipeline | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") | |
logger = logging.getLogger(__name__) | |
class RAGEvaluator: | |
def __init__( | |
self, | |
embedding_model_name: str = "BAAI/bge-en-icl", | |
llm_model_name: str = "meta-llama/Llama-3.3-70B-Instruct", | |
qdrant_path: str = "./qdrant_data", | |
api_base_url: str = "https://api.studio.nebius.com/v1/", | |
collection_name: str = "ltu_documents" | |
): | |
self.embedding_model_name = embedding_model_name | |
self.llm_model_name = llm_model_name | |
self.qdrant_path = qdrant_path | |
self.api_base_url = api_base_url | |
self.collection_name = collection_name | |
# Load API key from environment or use the one from testset_generation.py | |
self.api_key = Secret.from_token(os.getenv("NEBIUS_API_KEY")) | |
# Initialize the existing RAG pipeline | |
self.init_components() | |
def init_components(self): | |
"""Initialize the existing RAG pipeline and Ragas components""" | |
logger.info("Initializing components...") | |
# Initialize the existing RAG pipeline | |
self.rag_pipeline = RAGPipeline( | |
embedding_model_name=self.embedding_model_name, | |
llm_model_name=self.llm_model_name, | |
qdrant_path=self.qdrant_path | |
) | |
# Initialize Ragas wrappers | |
self.llm_wrapper = HaystackLLMWrapper( | |
OpenAIGenerator( | |
api_base_url="https://api.studio.nebius.com/v1/", | |
model=self.llm_model_name, | |
api_key=self.api_key, | |
generation_kwargs={ | |
"max_tokens": 1024, | |
"temperature": 0.1, | |
"top_p": 0.95, | |
} | |
) | |
) | |
self.embedding_wrapper = HaystackEmbeddingsWrapper( | |
OpenAITextEmbedder( | |
api_base_url="https://api.studio.nebius.com/v1/", | |
model=self.embedding_model_name, | |
api_key=self.api_key, | |
) | |
) | |
logger.info("Components initialized successfully") | |
def load_testset(self, testset_path: str) -> List[Dict[str, Any]]: | |
"""Load test set from a JSONL file""" | |
logger.info(f"Loading test set from {testset_path}...") | |
test_data = [] | |
with open(testset_path, 'r', encoding='utf-8') as f: | |
for line in f: | |
test_data.append(json.loads(line)) | |
logger.info(f"Loaded {len(test_data)} test samples") | |
return test_data | |
def prepare_ragas_dataframe(self, test_data: List[Dict[str, Any]], results: List[Dict[str, Any]]) -> pd.DataFrame: | |
"""Prepare dataframe for Ragas evaluation""" | |
logger.info("Preparing data for Ragas evaluation...") | |
eval_data = [] | |
for _, (test_sample, result) in enumerate(zip(test_data, results)): | |
question = test_sample["user_input"] | |
reference_answer = test_sample["reference"] | |
# Get generated answer and contexts from pipeline result | |
generated_answer = result["answer"] | |
contexts = [doc.content for doc in result["documents"]] | |
# Get reference contexts | |
reference_contexts = test_sample.get("reference_contexts", []) | |
eval_data.append(SingleTurnSample( | |
user_input=question, | |
response=generated_answer, | |
retrieved_contexts=contexts, | |
reference=reference_answer, | |
reference_contexts=reference_contexts | |
)) | |
# print(eval_data[0]) | |
return EvaluationDataset(eval_data) | |
def run_evaluation(self, testset_path: str = "testset.jsonl") -> Dict[str, float]: | |
"""Run the full evaluation process""" | |
logger.info("Starting RAG pipeline evaluation...") | |
# Load test set | |
test_data = self.load_testset(testset_path) | |
# Run pipeline for each test sample | |
results = [] | |
for i, test_sample in enumerate(test_data): | |
logger.info(f"Processing test sample {i+1}/{len(test_data)}") | |
question = test_sample["user_input"] | |
# Run the existing RAG pipeline | |
result = self.rag_pipeline.query(question) | |
results.append(result) | |
# Prepare data for Ragas | |
eval_ds = self.prepare_ragas_dataframe(test_data, results) | |
# Run Ragas evaluation | |
logger.info("Running Ragas evaluation...") | |
evaluation_result = evaluate( | |
eval_ds, | |
# metrics=[ | |
# faithfulness, | |
# answer_relevancy, | |
# context_precision, | |
# context_recall, | |
# # context_relevancy | |
# ], | |
llm=self.llm_wrapper, | |
embeddings=self.embedding_wrapper, | |
# reference_answers=eval_df["reference_answer"].tolist(), | |
# reference_contexts=eval_df["reference_contexts"].tolist() | |
) | |
# Print and return results | |
logger.info("Evaluation complete!") | |
logger.info(f"Results: {evaluation_result}") | |
return evaluation_result | |
if __name__ == "__main__": | |
# Create and run evaluator | |
evaluator = RAGEvaluator() | |
results = evaluator.run_evaluation() | |
print(repr(results)) | |
# Save results to file | |
# with open("ragas_evaluation_results.json", "w") as f: | |
# json.dump(results.to_dict(), f, indent=2) | |
# print("\nEvaluation results saved to ragas_evaluation_results.json") | |
# INFO:__main__:Results: { | |
# 'answer_relevancy': 0.8558, | |
# 'context_precision': 0.9033, | |
# 'faithfulness': 0.8000, | |
# 'context_recall': 0.9417 | |
# } | |
# {'answer_relevancy': 0.8558, 'context_precision': 0.9033, 'faithfulness': 0.8000, 'context_recall': 0.9417} |