File size: 3,418 Bytes
f104fee
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
from typing import Dict, Any, List
import asyncio

class QueryPlanner:
    def __init__(self, llm):
        self.llm = llm
    
    async def plan_research(self, query: str) -> Dict[str, Any]:
        try:
            planning_prompt = f"""
            Analyze this research query and create a structured plan:
            Query: {query}
            
            Determine:
            1. What type of analysis is needed (price, market, defi, comparison, etc.)
            2. Which data sources would be most relevant
            3. What specific steps should be taken
            4. Priority focus area
            
            Respond in JSON format with keys: type, steps, priority, data_sources
            """
            
            response = await asyncio.to_thread(
                self.llm.invoke,
                planning_prompt
            )
            
            # Simple categorization based on keywords
            query_lower = query.lower()
            plan = {
                "type": self._categorize_query(query_lower),
                "steps": self._generate_steps(query_lower),
                "priority": self._determine_priority(query_lower),
                "data_sources": self._identify_sources(query_lower)
            }
            
            return plan
            
        except Exception:
            return {
                "type": "general",
                "steps": ["Analyze query", "Gather data", "Provide insights"],
                "priority": "general analysis",
                "data_sources": ["coingecko", "defillama"]
            }
    
    def _categorize_query(self, query: str) -> str:
        if any(word in query for word in ["price", "chart", "value"]):
            return "price_analysis"
        elif any(word in query for word in ["defi", "tvl", "protocol", "yield"]):
            return "defi_analysis"
        elif any(word in query for word in ["compare", "vs", "versus"]):
            return "comparison"
        elif any(word in query for word in ["market", "overview", "trending"]):
            return "market_overview"
        else:
            return "general"
    
    def _generate_steps(self, query: str) -> List[str]:
        steps = ["Gather relevant data"]
        
        if "price" in query:
            steps.extend(["Get current price data", "Analyze price trends"])
        if "defi" in query:
            steps.extend(["Fetch DeFi protocol data", "Analyze TVL trends"])
        if any(word in query for word in ["compare", "vs"]):
            steps.append("Perform comparative analysis")
        
        steps.append("Synthesize insights and recommendations")
        return steps
    
    def _determine_priority(self, query: str) -> str:
        if "urgent" in query or "now" in query:
            return "high"
        elif "overview" in query:
            return "comprehensive"
        else:
            return "standard"
    
    def _identify_sources(self, query: str) -> List[str]:
        sources = []
        if any(word in query for word in ["price", "market", "coin", "token"]):
            sources.append("coingecko")
        if any(word in query for word in ["defi", "tvl", "protocol"]):
            sources.append("defillama")
        if any(word in query for word in ["transaction", "address", "gas"]):
            sources.append("etherscan")
        
        return sources if sources else ["coingecko", "defillama"]