Spaces:
Sleeping
Sleeping
| from langchain.agents import AgentExecutor, create_tool_calling_agent | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder | |
| from langchain.memory import ConversationBufferWindowMemory | |
| from typing import List, Dict, Any | |
| import asyncio | |
| from datetime import datetime | |
| from src.tools.coingecko_tool import CoinGeckoTool | |
| from src.tools.defillama_tool import DeFiLlamaTool | |
| from src.tools.etherscan_tool import EtherscanTool | |
| from src.agent.query_planner import QueryPlanner | |
| from src.utils.config import config | |
| from src.utils.logger import get_logger | |
| logger = get_logger(__name__) | |
| class Web3ResearchAgent: | |
| def __init__(self): | |
| self.llm = None | |
| self.tools = [] | |
| self.agent = None | |
| self.executor = None | |
| self.enabled = False | |
| if not config.GEMINI_API_KEY: | |
| logger.warning("GEMINI_API_KEY not configured - AI agent disabled") | |
| return | |
| try: | |
| self.llm = ChatGoogleGenerativeAI( | |
| model="gemini-1.5-flash", | |
| google_api_key=config.GEMINI_API_KEY, | |
| temperature=0.1, | |
| max_tokens=2048 | |
| ) | |
| self.tools = self._initialize_tools() | |
| self.query_planner = QueryPlanner(self.llm) | |
| self.memory = ConversationBufferWindowMemory( | |
| memory_key="chat_history", return_messages=True, k=10 | |
| ) | |
| self.agent = self._create_agent() | |
| self.executor = AgentExecutor( | |
| agent=self.agent, tools=self.tools, memory=self.memory, | |
| verbose=False, max_iterations=5, handle_parsing_errors=True | |
| ) | |
| self.enabled = True | |
| logger.info("Web3ResearchAgent initialized successfully") | |
| except Exception as e: | |
| logger.error(f"Agent init failed: {e}") | |
| self.enabled = False | |
| def _initialize_tools(self): | |
| tools = [] | |
| try: | |
| tools.append(CoinGeckoTool()) | |
| logger.info("CoinGecko tool initialized") | |
| except Exception as e: | |
| logger.warning(f"CoinGecko tool failed: {e}") | |
| try: | |
| tools.append(DeFiLlamaTool()) | |
| logger.info("DeFiLlama tool initialized") | |
| except Exception as e: | |
| logger.warning(f"DeFiLlama tool failed: {e}") | |
| try: | |
| tools.append(EtherscanTool()) | |
| logger.info("Etherscan tool initialized") | |
| except Exception as e: | |
| logger.warning(f"Etherscan tool failed: {e}") | |
| return tools | |
| def _create_agent(self): | |
| prompt = ChatPromptTemplate.from_messages([ | |
| ("system", """You are an expert Web3 research assistant. Use available tools to provide accurate, | |
| data-driven insights about cryptocurrency markets, DeFi protocols, and blockchain data. | |
| Format responses with clear sections, emojis, and actionable insights."""), | |
| MessagesPlaceholder("chat_history"), | |
| ("human", "{input}"), | |
| MessagesPlaceholder("agent_scratchpad") | |
| ]) | |
| return create_tool_calling_agent(self.llm, self.tools, prompt) | |
| async def research_query(self, query: str) -> Dict[str, Any]: | |
| if not self.enabled: | |
| return { | |
| "success": False, | |
| "query": query, | |
| "error": "AI agent not configured. Please set GEMINI_API_KEY environment variable.", | |
| "result": "β **Service Unavailable**\n\nThe AI research agent requires a GEMINI_API_KEY to function.\n\nPlease:\n1. Get a free API key from [Google AI Studio](https://makersuite.google.com/app/apikey)\n2. Set environment variable: `export GEMINI_API_KEY='your_key'`\n3. Restart the application", | |
| "sources": [], | |
| "metadata": {"timestamp": datetime.now().isoformat()} | |
| } | |
| try: | |
| logger.info(f"Processing: {query}") | |
| research_plan = await self.query_planner.plan_research(query) | |
| enhanced_query = f""" | |
| Research Query: {query} | |
| Research Plan: {research_plan.get('steps', [])} | |
| Priority: {research_plan.get('priority', 'general')} | |
| Execute systematic research and provide comprehensive analysis. | |
| """ | |
| result = await asyncio.to_thread( | |
| self.executor.invoke, {"input": enhanced_query} | |
| ) | |
| return { | |
| "success": True, | |
| "query": query, | |
| "research_plan": research_plan, | |
| "result": result.get("output", "No response"), | |
| "sources": self._extract_sources(result.get("output", "")), | |
| "metadata": { | |
| "tools_used": [tool.name for tool in self.tools], | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"Research error: {e}") | |
| return { | |
| "success": False, | |
| "query": query, | |
| "error": str(e), | |
| "result": f"β **Research Error**: {str(e)}\n\nPlease try a different query or check your API configuration.", | |
| "sources": [], | |
| "metadata": {"timestamp": datetime.now().isoformat()} | |
| } | |
| async def get_price_history(self, symbol: str, days: int = 30) -> Dict[str, Any]: | |
| try: | |
| coingecko_tool = next(t for t in self.tools if isinstance(t, CoinGeckoTool)) | |
| return await coingecko_tool._arun(symbol, {"type": "price_history", "days": days}) | |
| except Exception as e: | |
| logger.error(f"Price history error: {e}") | |
| return {} | |
| async def get_comprehensive_market_data(self) -> Dict[str, Any]: | |
| try: | |
| tasks = [] | |
| for tool in self.tools: | |
| if isinstance(tool, CoinGeckoTool): | |
| tasks.append(tool._arun("", {"type": "market_overview"})) | |
| elif isinstance(tool, DeFiLlamaTool): | |
| tasks.append(tool._arun("", {"type": "tvl_overview"})) | |
| results = await asyncio.gather(*tasks, return_exceptions=True) | |
| data = {} | |
| for i, result in enumerate(results): | |
| if not isinstance(result, Exception): | |
| if i == 0: | |
| data["market"] = result | |
| elif i == 1: | |
| data["defi"] = result | |
| return data | |
| except Exception as e: | |
| logger.error(f"Market data error: {e}") | |
| return {} | |
| def _extract_sources(self, result_text: str) -> List[str]: | |
| sources = [] | |
| if "CoinGecko" in result_text or "coingecko" in result_text.lower(): | |
| sources.append("CoinGecko API") | |
| if "DeFiLlama" in result_text or "defillama" in result_text.lower(): | |
| sources.append("DeFiLlama API") | |
| if "Etherscan" in result_text or "etherscan" in result_text.lower(): | |
| sources.append("Etherscan API") | |
| return sources | |