File size: 6,797 Bytes
da83cd6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import os
import json
import logging
from typing import List, Dict, Any

# Haystack imports
from haystack.utils import Secret
from haystack.components.generators.openai import OpenAIGenerator
from haystack.components.embedders import OpenAITextEmbedder
from ragas import EvaluationDataset, SingleTurnSample


# Ragas imports
from ragas.metrics import (
    faithfulness,
    answer_relevancy,
    context_precision,
    context_recall,
    # context_relevancy
)
from ragas.llms.haystack_wrapper import HaystackLLMWrapper
from ragas.embeddings.haystack_wrapper import HaystackEmbeddingsWrapper
from ragas import evaluate
import pandas as pd

# Import the existing RAG pipeline
from rag_pipeline import RAGPipeline

# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)

class RAGEvaluator:
    def __init__(
        self,
        embedding_model_name: str = "BAAI/bge-en-icl",
        llm_model_name: str = "meta-llama/Llama-3.3-70B-Instruct",
        qdrant_path: str = "./qdrant_data",
        api_base_url: str = "https://api.studio.nebius.com/v1/",
        collection_name: str = "ltu_documents"
    ):
        self.embedding_model_name = embedding_model_name
        self.llm_model_name = llm_model_name
        self.qdrant_path = qdrant_path
        self.api_base_url = api_base_url
        self.collection_name = collection_name
        
        # Load API key from environment or use the one from testset_generation.py
        self.api_key = Secret.from_token(os.getenv("NEBIUS_API_KEY"))
        
        # Initialize the existing RAG pipeline
        self.init_components()
        
    def init_components(self):
        """Initialize the existing RAG pipeline and Ragas components"""
        logger.info("Initializing components...")
        
        # Initialize the existing RAG pipeline
        self.rag_pipeline = RAGPipeline(
            embedding_model_name=self.embedding_model_name,
            llm_model_name=self.llm_model_name,
            qdrant_path=self.qdrant_path
        )
        
        # Initialize Ragas wrappers
        self.llm_wrapper = HaystackLLMWrapper(
            OpenAIGenerator(
                api_base_url="https://api.studio.nebius.com/v1/",
                model=self.llm_model_name,
                api_key=self.api_key,
                generation_kwargs={
                    "max_tokens": 1024,
                    "temperature": 0.1,
                    "top_p": 0.95,
                }
            )
        )
        
        self.embedding_wrapper = HaystackEmbeddingsWrapper(
            OpenAITextEmbedder(
                api_base_url="https://api.studio.nebius.com/v1/",
                model=self.embedding_model_name,
                api_key=self.api_key,
            )
        )
        
        logger.info("Components initialized successfully")
    
    def load_testset(self, testset_path: str) -> List[Dict[str, Any]]:
        """Load test set from a JSONL file"""
        logger.info(f"Loading test set from {testset_path}...")
        
        test_data = []
        with open(testset_path, 'r', encoding='utf-8') as f:
            for line in f:
                test_data.append(json.loads(line))
                
        logger.info(f"Loaded {len(test_data)} test samples")
        return test_data
    
    def prepare_ragas_dataframe(self, test_data: List[Dict[str, Any]], results: List[Dict[str, Any]]) -> pd.DataFrame:
        """Prepare dataframe for Ragas evaluation"""
        logger.info("Preparing data for Ragas evaluation...")
        
        eval_data = []
        
        for _, (test_sample, result) in enumerate(zip(test_data, results)):
            question = test_sample["user_input"]
            reference_answer = test_sample["reference"]
            
            # Get generated answer and contexts from pipeline result
            generated_answer = result["answer"]
            contexts = [doc.content for doc in result["documents"]]
            
            # Get reference contexts
            reference_contexts = test_sample.get("reference_contexts", [])
            
            eval_data.append(SingleTurnSample(
                user_input=question,
                response=generated_answer,
                retrieved_contexts=contexts,
                reference=reference_answer,
                reference_contexts=reference_contexts
            ))
            # print(eval_data[0])
        
        return EvaluationDataset(eval_data)
    
    def run_evaluation(self, testset_path: str = "testset.jsonl") -> Dict[str, float]:
        """Run the full evaluation process"""
        logger.info("Starting RAG pipeline evaluation...")
        
        # Load test set
        test_data = self.load_testset(testset_path)
        
        # Run pipeline for each test sample
        results = []
        for i, test_sample in enumerate(test_data):
            logger.info(f"Processing test sample {i+1}/{len(test_data)}")
            question = test_sample["user_input"]
            
            # Run the existing RAG pipeline
            result = self.rag_pipeline.query(question)
            results.append(result)
        
        # Prepare data for Ragas
        eval_ds = self.prepare_ragas_dataframe(test_data, results)
        
        # Run Ragas evaluation
        logger.info("Running Ragas evaluation...")
        evaluation_result = evaluate(
            eval_ds,
            # metrics=[
            #     faithfulness,
            #     answer_relevancy,
            #     context_precision,
            #     context_recall,
            #     # context_relevancy
            # ],
            llm=self.llm_wrapper,
            embeddings=self.embedding_wrapper,
            # reference_answers=eval_df["reference_answer"].tolist(),
            # reference_contexts=eval_df["reference_contexts"].tolist()
        )
        
        # Print and return results
        logger.info("Evaluation complete!")
        logger.info(f"Results: {evaluation_result}")
        
        return evaluation_result

if __name__ == "__main__":
    # Create and run evaluator
    evaluator = RAGEvaluator()
    results = evaluator.run_evaluation()
    print(repr(results))
    # Save results to file
    # with open("ragas_evaluation_results.json", "w") as f:
    #     json.dump(results.to_dict(), f, indent=2)
    
    # print("\nEvaluation results saved to ragas_evaluation_results.json") 
#     INFO:__main__:Results: {
#           'answer_relevancy': 0.8558, 
#           'context_precision': 0.9033,
#           'faithfulness': 0.8000,
#           'context_recall': 0.9417
#     }
#     {'answer_relevancy': 0.8558, 'context_precision': 0.9033, 'faithfulness': 0.8000, 'context_recall': 0.9417}