web3-copilot / src /research_agent.py
“Transcendental-Programmer”
feat: Initial commit with project structure and initial files
20eee66
raw
history blame
8.77 kB
from google import genai
from google.genai import types
import json
from typing import Dict, Any, List
from src.api_clients import coingecko_client, cryptocompare_client
from src.cache_manager import cache_manager
from src.config import config
import asyncio
class ResearchAgent:
def __init__(self):
self.client = genai.Client(api_key=config.GEMINI_API_KEY)
self.symbol_map = {
"btc": "bitcoin", "eth": "ethereum", "sol": "solana",
"ada": "cardano", "dot": "polkadot", "bnb": "binancecoin",
"usdc": "usd-coin", "usdt": "tether", "xrp": "ripple",
"avax": "avalanche-2", "link": "chainlink", "matic": "matic-network"
}
def _format_coin_id(self, symbol: str) -> str:
return self.symbol_map.get(symbol.lower(), symbol.lower())
async def get_market_overview(self) -> Dict[str, Any]:
cache_key = "market_overview"
cached = cache_manager.get(cache_key)
if cached:
return cached
try:
market_data = await coingecko_client.get_market_data(per_page=20)
global_data = await coingecko_client.get_global_data()
trending = await coingecko_client.get_trending()
result = {
"market_data": market_data,
"global_data": global_data,
"trending": trending
}
cache_manager.set(cache_key, result)
return result
except Exception as e:
raise Exception(f"Failed to fetch market overview: {str(e)}")
async def get_price_history(self, symbol: str) -> Dict[str, Any]:
cache_key = f"price_history_{symbol.lower()}"
cached = cache_manager.get(cache_key)
if cached:
return cached
try:
coin_id = self._format_coin_id(symbol)
data = await coingecko_client.get_price_history(coin_id, days=30)
cache_manager.set(cache_key, data)
return data
except Exception as e:
raise Exception(f"Failed to fetch price history for {symbol}: {str(e)}")
async def get_coin_analysis(self, symbol: str) -> Dict[str, Any]:
cache_key = f"coin_analysis_{symbol.lower()}"
cached = cache_manager.get(cache_key)
if cached:
return cached
try:
coin_id = self._format_coin_id(symbol)
tasks = [
coingecko_client.get_coin_data(coin_id),
coingecko_client.get_price_history(coin_id, days=7),
cryptocompare_client.get_social_data(symbol.upper())
]
coin_data, price_history, social_data = await asyncio.gather(*tasks, return_exceptions=True)
result = {}
if not isinstance(coin_data, Exception):
result["coin_data"] = coin_data
if not isinstance(price_history, Exception):
result["price_history"] = price_history
if not isinstance(social_data, Exception):
result["social_data"] = social_data
cache_manager.set(cache_key, result)
return result
except Exception as e:
raise Exception(f"Failed to analyze {symbol}: {str(e)}")
def _format_market_data(self, data: Dict[str, Any]) -> str:
if not data:
return "No market data available"
formatted = "📊 MARKET OVERVIEW\n\n"
if "global_data" in data and "data" in data["global_data"]:
global_info = data["global_data"]["data"]
total_mcap = global_info.get("total_market_cap", {}).get("usd", 0)
total_volume = global_info.get("total_volume", {}).get("usd", 0)
btc_dominance = global_info.get("market_cap_percentage", {}).get("btc", 0)
formatted += f"Total Market Cap: ${total_mcap:,.0f}\n"
formatted += f"24h Volume: ${total_volume:,.0f}\n"
formatted += f"Bitcoin Dominance: {btc_dominance:.1f}%\n\n"
if "trending" in data and "coins" in data["trending"]:
formatted += "🔥 TRENDING COINS\n"
for i, coin in enumerate(data["trending"]["coins"][:5], 1):
name = coin.get("item", {}).get("name", "Unknown")
symbol = coin.get("item", {}).get("symbol", "")
formatted += f"{i}. {name} ({symbol.upper()})\n"
formatted += "\n"
if "market_data" in data:
formatted += "💰 TOP CRYPTOCURRENCIES\n"
for i, coin in enumerate(data["market_data"][:10], 1):
name = coin.get("name", "Unknown")
symbol = coin.get("symbol", "").upper()
price = coin.get("current_price", 0)
change = coin.get("price_change_percentage_24h", 0)
change_symbol = "📈" if change >= 0 else "📉"
formatted += f"{i:2d}. {name} ({symbol}): ${price:,.4f} {change_symbol} {change:+.2f}%\n"
return formatted
async def research(self, query: str) -> str:
try:
if not config.GEMINI_API_KEY:
return "❌ Gemini API key not configured. Please set GEMINI_API_KEY environment variable."
system_prompt = """You are an expert Web3 and cryptocurrency research analyst.
Provide comprehensive, accurate, and actionable insights based on real market data.
Guidelines:
- Give specific, data-driven analysis
- Include price targets and risk assessments when relevant
- Explain technical concepts clearly
- Provide actionable recommendations
- Use emojis for better readability
- Be concise but thorough
"""
market_context = ""
try:
if any(keyword in query.lower() for keyword in ["market", "overview", "trending", "top"]):
market_data = await self.get_market_overview()
market_context = f"\n\nCURRENT MARKET DATA:\n{self._format_market_data(market_data)}"
for symbol in ["btc", "eth", "sol", "ada", "dot", "bnb", "avax", "link"]:
if symbol in query.lower() or symbol.upper() in query:
analysis_data = await self.get_coin_analysis(symbol)
if "coin_data" in analysis_data:
coin_info = analysis_data["coin_data"]
market_data = coin_info.get("market_data", {})
current_price = market_data.get("current_price", {}).get("usd", 0)
price_change = market_data.get("price_change_percentage_24h", 0)
market_cap = market_data.get("market_cap", {}).get("usd", 0)
volume = market_data.get("total_volume", {}).get("usd", 0)
market_context += f"\n\n{symbol.upper()} DATA:\n"
market_context += f"Price: ${current_price:,.4f}\n"
market_context += f"24h Change: {price_change:+.2f}%\n"
market_context += f"Market Cap: ${market_cap:,.0f}\n"
market_context += f"Volume: ${volume:,.0f}\n"
break
except Exception as e:
market_context = f"\n\nNote: Some market data unavailable ({str(e)})"
full_prompt = f"{query}{market_context}"
response = self.client.models.generate_content(
model="gemini-2.5-flash",
contents=[
types.Content(
role="user",
parts=[types.Part(text=full_prompt)]
)
],
config=types.GenerateContentConfig(
system_instruction=system_prompt,
temperature=0.3,
max_output_tokens=2000
)
)
if response.text:
return response.text
else:
return "❌ No response generated. Please try rephrasing your query."
except Exception as e:
return f"❌ Research failed: {str(e)}"
async def close(self):
await coingecko_client.close()
await cryptocompare_client.close()