File size: 6,896 Bytes
e8051be
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
"""

Query Expansion Module for Advanced RAG

Now uses Groq's llama3-8b-8192 model directly for generating focused sub-queries.

"""

import re
import time
import asyncio
from typing import List
from groq import Groq
from config.config import (
    ENABLE_QUERY_EXPANSION,
    QUERY_EXPANSION_COUNT,
    GROQ_API_KEY_LITE,
    GROQ_MODEL_LITE,
)


class QueryExpansionManager:
    """Manages query expansion for better information retrieval."""
    
    def __init__(self):
        """Initialize the query expansion manager with Groq client."""
        # Initialize Groq client with the lite key and llama3-8b-8192 model
        self.model = GROQ_MODEL_LITE or "llama3-8b-8192"
        if not GROQ_API_KEY_LITE:
            print("⚠️ GROQ_API_KEY_LITE is not set. Query expansion will fall back to original query.")
            self.client = None
        else:
            self.client = Groq(api_key=GROQ_API_KEY_LITE)
        print(f"✅ Query Expansion Manager initialized using Groq model: {self.model}")
    
    async def expand_query(self, original_query: str) -> List[str]:
        """Break complex queries into focused parts for better information retrieval using Groq."""
        if not ENABLE_QUERY_EXPANSION:
            return [original_query]
        if not self.client:
            return [original_query]
        
        try:
            expansion_prompt = f"""Analyze this question and break it down into exactly {QUERY_EXPANSION_COUNT} specific, focused sub-questions that can be searched independently in a document. Each sub-question should target a distinct piece of information or process.



For complex questions with multiple parts, identify:

1. Different processes or procedures mentioned

2. Specific information requests (emails, contact details, forms, etc.)

3. Different entities or subjects involved

4. Sequential steps that might be documented separately



Original question: {original_query}



Break this into exactly {QUERY_EXPANSION_COUNT} focused search queries that target different aspects:



Examples of good breakdown:

- "What is the dental claim submission process?"

- "How to update surname/name in policy records?"

- "What are the company contact details and grievance email?"



Provide only {QUERY_EXPANSION_COUNT} focused sub-questions, one per line, without numbering or additional formatting:"""

            # Call Groq's chat completions in a thread to avoid blocking the event loop
            response = await asyncio.to_thread(
                self.client.chat.completions.create,
                messages=[{"role": "user", "content": expansion_prompt}],
                model=self.model,
                temperature=0.3,
                max_tokens=300,
            )
            
            expanded_queries = []  # Start with empty list - don't include original

            if response and response.choices:
                content = response.choices[0].message.content if response.choices[0].message else ""
                sub_queries = (content or "").strip().split('\n')
                for query in sub_queries:
                    if len(expanded_queries) >= QUERY_EXPANSION_COUNT:  # Stop when we have enough
                        break
                    query = query.strip()
                    # Remove any numbering or bullet points that might be added
                    query = re.sub(r'^[\d\.\-\*\s]+', '', query).strip()
                    if query and len(query) > 10:
                        expanded_queries.append(query)
            
            # If we don't have enough sub-queries, fall back to using the original
            if len(expanded_queries) < QUERY_EXPANSION_COUNT:
                expanded_queries = [original_query] * QUERY_EXPANSION_COUNT
            
            # Ensure we have exactly QUERY_EXPANSION_COUNT queries
            final_queries = expanded_queries[:QUERY_EXPANSION_COUNT]
            
            print(f"🔄 Query broken down from 1 complex question to {len(final_queries)} focused sub-queries using Groq {self.model}")
            print(f"📌 Original query will be used for final LLM generation only")
            for i, q in enumerate(final_queries):
                print(f"   Sub-query {i+1}: {q[:80]}...")
            
            return final_queries
            
        except Exception as e:
            print(f"⚠️ Query expansion failed: {e}")
            return [original_query]
    
    def _identify_query_components(self, query: str) -> dict:
        """Identify different components in a complex query for better breakdown."""
        components = {
            'processes': [],
            'documents': [],
            'contacts': [],
            'eligibility': [],
            'timelines': [],
            'benefits': []
        }
        
        # Define keywords for different component types
        process_keywords = ['process', 'procedure', 'steps', 'how to', 'submit', 'apply', 'claim', 'update', 'change', 'enroll']
        document_keywords = ['documents', 'forms', 'papers', 'certificate', 'proof', 'evidence', 'requirements']
        contact_keywords = ['email', 'phone', 'contact', 'grievance', 'customer service', 'support', 'helpline']
        eligibility_keywords = ['eligibility', 'criteria', 'qualify', 'eligible', 'conditions', 'requirements']
        timeline_keywords = ['timeline', 'period', 'duration', 'time', 'days', 'months', 'waiting', 'grace']
        benefit_keywords = ['benefits', 'coverage', 'limits', 'amount', 'reimbursement', 'claim amount']
        
        query_lower = query.lower()
        
        # Check for process-related content
        if any(keyword in query_lower for keyword in process_keywords):
            components['processes'].append('process identification')
        
        # Check for document-related content
        if any(keyword in query_lower for keyword in document_keywords):
            components['documents'].append('document requirements')
        
        # Check for contact-related content
        if any(keyword in query_lower for keyword in contact_keywords):
            components['contacts'].append('contact information')
        
        # Check for eligibility-related content
        if any(keyword in query_lower for keyword in eligibility_keywords):
            components['eligibility'].append('eligibility criteria')
        
        # Check for timeline-related content
        if any(keyword in query_lower for keyword in timeline_keywords):
            components['timelines'].append('timeline information')
        
        # Check for benefit-related content
        if any(keyword in query_lower for keyword in benefit_keywords):
            components['benefits'].append('benefit details')
        
        return components