mrradix's picture
Upload 48 files
8e4018d verified
import requests
import json
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import time
from utils.logging import setup_logger
from utils.error_handling import handle_exceptions, IntegrationError
from utils.storage import load_data, save_data
# Initialize logger
logger = setup_logger(__name__)
class CryptoIntegration:
"""Cryptocurrency API integration for market data"""
def __init__(self, api_key: Optional[str] = None, provider: str = "coingecko"):
"""Initialize Cryptocurrency API integration
Args:
api_key: API key for the crypto provider (optional for some providers)
provider: Crypto data provider (default: coingecko)
"""
self.api_key = api_key
self.provider = provider.lower()
self.cache = {}
self.cache_expiry = {}
# Set up provider-specific configurations
self.providers = {
"coingecko": {
"base_url": "https://api.coingecko.com/api/v3",
"pro_url": "https://pro-api.coingecko.com/api/v3",
"cache_duration": 300 # 5 minutes
},
"coinmarketcap": {
"base_url": "https://pro-api.coinmarketcap.com/v1",
"cache_duration": 300 # 5 minutes
},
"binance": {
"base_url": "https://api.binance.com/api/v3",
"cache_duration": 60 # 1 minute
}
}
@handle_exceptions
def set_api_key(self, api_key: str) -> None:
"""Set API key for the crypto provider
Args:
api_key: API key
"""
self.api_key = api_key
# Clear cache when API key changes
self.cache = {}
self.cache_expiry = {}
@handle_exceptions
def set_provider(self, provider: str) -> None:
"""Set crypto data provider
Args:
provider: Crypto data provider
"""
provider = provider.lower()
if provider not in self.providers:
raise IntegrationError(f"Unsupported crypto provider: {provider}")
self.provider = provider
# Clear cache when provider changes
self.cache = {}
self.cache_expiry = {}
@handle_exceptions
def test_connection(self) -> bool:
"""Test cryptocurrency API connection
Returns:
True if connection is successful, False otherwise
"""
try:
if self.provider == "coingecko":
# Test with a simple ping request
url = f"{self.providers[self.provider]['base_url']}/ping"
response = requests.get(url)
return response.status_code == 200
elif self.provider == "coinmarketcap":
# API key is required for CoinMarketCap
if not self.api_key:
logger.error("CoinMarketCap API key not set")
return False
# Test with a simple status request
url = f"{self.providers[self.provider]['base_url']}/cryptocurrency/map"
headers = {
"X-CMC_PRO_API_KEY": self.api_key
}
params = {
"limit": 1
}
response = requests.get(url, headers=headers, params=params)
return response.status_code == 200
elif self.provider == "binance":
# Test with a simple ping request
url = f"{self.providers[self.provider]['base_url']}/ping"
response = requests.get(url)
return response.status_code == 200
return False
except Exception as e:
logger.error(f"Crypto API connection test failed: {str(e)}")
return False
@handle_exceptions
def get_market_data(self, coins: Optional[List[str]] = None, vs_currency: str = "usd",
page: int = 1, per_page: int = 100) -> List[Dict[str, Any]]:
"""Get cryptocurrency market data
Args:
coins: List of coin IDs (optional)
vs_currency: Currency to compare against (default: usd)
page: Page number (default: 1)
per_page: Number of results per page (default: 100)
Returns:
List of cryptocurrency market data
"""
# Check cache
cache_key = f"market_{','.join(coins) if coins else 'all'}_{vs_currency}_{page}_{per_page}_{self.provider}"
if cache_key in self.cache and time.time() < self.cache_expiry.get(cache_key, 0):
return self.cache[cache_key]
try:
if self.provider == "coingecko":
url = f"{self.providers[self.provider]['base_url'] if not self.api_key else self.providers[self.provider]['pro_url']}/coins/markets"
params = {
"vs_currency": vs_currency,
"page": page,
"per_page": per_page,
"order": "market_cap_desc"
}
# Add API key if available
if self.api_key:
params["x_cg_pro_api_key"] = self.api_key
# Add optional parameters
if coins:
params["ids"] = ','.join(coins)
response = requests.get(url, params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to get market data: {response.text}")
data = response.json()
# Process data into a standardized format
market_data = self._process_coingecko_market_data(data)
elif self.provider == "coinmarketcap":
if not self.api_key:
raise IntegrationError("CoinMarketCap API key not set")
url = f"{self.providers[self.provider]['base_url']}/cryptocurrency/listings/latest"
headers = {
"X-CMC_PRO_API_KEY": self.api_key
}
params = {
"start": (page - 1) * per_page + 1,
"limit": per_page,
"convert": vs_currency.upper()
}
response = requests.get(url, headers=headers, params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to get market data: {response.text}")
data = response.json()
# Process data into a standardized format
market_data = self._process_coinmarketcap_market_data(data, vs_currency)
elif self.provider == "binance":
# Binance doesn't have a direct market data endpoint like CoinGecko
# We'll use the ticker price endpoint and then fetch additional data
url = f"{self.providers[self.provider]['base_url']}/ticker/24hr"
response = requests.get(url)
if response.status_code != 200:
raise IntegrationError(f"Failed to get market data: {response.text}")
data = response.json()
# Process data into a standardized format
market_data = self._process_binance_market_data(data, vs_currency)
# Filter by coins if specified
if coins:
market_data = [item for item in market_data if item["id"] in coins]
# Apply pagination
start_idx = (page - 1) * per_page
end_idx = start_idx + per_page
market_data = market_data[start_idx:end_idx]
else:
raise IntegrationError(f"Unsupported crypto provider: {self.provider}")
# Cache the result
self.cache[cache_key] = market_data
self.cache_expiry[cache_key] = time.time() + self.providers[self.provider]["cache_duration"]
return market_data
except Exception as e:
if not isinstance(e, IntegrationError):
logger.error(f"Failed to get market data: {str(e)}")
raise IntegrationError(f"Failed to get market data: {str(e)}")
raise
@handle_exceptions
def get_coin_details(self, coin_id: str) -> Dict[str, Any]:
"""Get detailed information about a specific cryptocurrency
Args:
coin_id: Coin ID
Returns:
Detailed coin information
"""
# Check cache
cache_key = f"coin_{coin_id}_{self.provider}"
if cache_key in self.cache and time.time() < self.cache_expiry.get(cache_key, 0):
return self.cache[cache_key]
try:
if self.provider == "coingecko":
url = f"{self.providers[self.provider]['base_url'] if not self.api_key else self.providers[self.provider]['pro_url']}/coins/{coin_id}"
params = {
"localization": "false",
"tickers": "false",
"market_data": "true",
"community_data": "true",
"developer_data": "true"
}
# Add API key if available
if self.api_key:
params["x_cg_pro_api_key"] = self.api_key
response = requests.get(url, params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to get coin details: {response.text}")
data = response.json()
# Process data into a standardized format
coin_details = self._process_coingecko_coin_details(data)
elif self.provider == "coinmarketcap":
if not self.api_key:
raise IntegrationError("CoinMarketCap API key not set")
# First, get the CMC ID for the coin
url = f"{self.providers[self.provider]['base_url']}/cryptocurrency/map"
headers = {
"X-CMC_PRO_API_KEY": self.api_key
}
params = {
"symbol": coin_id.upper()
}
response = requests.get(url, headers=headers, params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to get coin ID: {response.text}")
data = response.json()
if not data.get("data") or len(data["data"]) == 0:
raise IntegrationError(f"Coin not found: {coin_id}")
cmc_id = data["data"][0]["id"]
# Now get the coin details
url = f"{self.providers[self.provider]['base_url']}/cryptocurrency/info"
params = {
"id": cmc_id
}
response = requests.get(url, headers=headers, params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to get coin details: {response.text}")
data = response.json()
# Process data into a standardized format
coin_details = self._process_coinmarketcap_coin_details(data, cmc_id)
elif self.provider == "binance":
# Binance doesn't have a detailed coin info endpoint
# We'll use the ticker price and 24hr stats endpoints
url = f"{self.providers[self.provider]['base_url']}/ticker/24hr"
params = {
"symbol": f"{coin_id.upper()}USDT" # Assuming USDT pair
}
response = requests.get(url, params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to get coin details: {response.text}")
data = response.json()
# Process data into a standardized format
coin_details = self._process_binance_coin_details(data, coin_id)
else:
raise IntegrationError(f"Unsupported crypto provider: {self.provider}")
# Cache the result
self.cache[cache_key] = coin_details
self.cache_expiry[cache_key] = time.time() + self.providers[self.provider]["cache_duration"]
return coin_details
except Exception as e:
if not isinstance(e, IntegrationError):
logger.error(f"Failed to get coin details: {str(e)}")
raise IntegrationError(f"Failed to get coin details: {str(e)}")
raise
@handle_exceptions
def get_price_history(self, coin_id: str, vs_currency: str = "usd",
days: str = "30", interval: str = "daily") -> Dict[str, Any]:
"""Get price history for a specific cryptocurrency
Args:
coin_id: Coin ID
vs_currency: Currency to compare against (default: usd)
days: Number of days of data to retrieve (default: 30)
interval: Data interval (default: daily)
Returns:
Price history data
"""
# Check cache
cache_key = f"history_{coin_id}_{vs_currency}_{days}_{interval}_{self.provider}"
if cache_key in self.cache and time.time() < self.cache_expiry.get(cache_key, 0):
return self.cache[cache_key]
try:
if self.provider == "coingecko":
url = f"{self.providers[self.provider]['base_url'] if not self.api_key else self.providers[self.provider]['pro_url']}/coins/{coin_id}/market_chart"
params = {
"vs_currency": vs_currency,
"days": days,
"interval": interval if days != "max" else "daily"
}
# Add API key if available
if self.api_key:
params["x_cg_pro_api_key"] = self.api_key
response = requests.get(url, params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to get price history: {response.text}")
data = response.json()
# Process data into a standardized format
price_history = self._process_coingecko_price_history(data)
elif self.provider == "coinmarketcap":
if not self.api_key:
raise IntegrationError("CoinMarketCap API key not set")
# First, get the CMC ID for the coin
url = f"{self.providers[self.provider]['base_url']}/cryptocurrency/map"
headers = {
"X-CMC_PRO_API_KEY": self.api_key
}
params = {
"symbol": coin_id.upper()
}
response = requests.get(url, headers=headers, params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to get coin ID: {response.text}")
data = response.json()
if not data.get("data") or len(data["data"]) == 0:
raise IntegrationError(f"Coin not found: {coin_id}")
cmc_id = data["data"][0]["id"]
# Now get the historical data
url = f"{self.providers[self.provider]['base_url']}/cryptocurrency/quotes/historical"
# Convert days to time range
time_end = datetime.now()
time_start = time_end - timedelta(days=int(days))
params = {
"id": cmc_id,
"time_start": time_start.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"time_end": time_end.strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
"interval": self._convert_interval(interval),
"convert": vs_currency.upper()
}
response = requests.get(url, headers=headers, params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to get price history: {response.text}")
data = response.json()
# Process data into a standardized format
price_history = self._process_coinmarketcap_price_history(data, vs_currency)
elif self.provider == "binance":
# Convert interval to Binance format
binance_interval = self._convert_binance_interval(interval)
url = f"{self.providers[self.provider]['base_url']}/klines"
params = {
"symbol": f"{coin_id.upper()}USDT", # Assuming USDT pair
"interval": binance_interval,
"limit": 1000 # Maximum allowed
}
response = requests.get(url, params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to get price history: {response.text}")
data = response.json()
# Process data into a standardized format
price_history = self._process_binance_price_history(data)
else:
raise IntegrationError(f"Unsupported crypto provider: {self.provider}")
# Cache the result
self.cache[cache_key] = price_history
self.cache_expiry[cache_key] = time.time() + self.providers[self.provider]["cache_duration"]
return price_history
except Exception as e:
if not isinstance(e, IntegrationError):
logger.error(f"Failed to get price history: {str(e)}")
raise IntegrationError(f"Failed to get price history: {str(e)}")
raise
@handle_exceptions
def search_coins(self, query: str) -> List[Dict[str, Any]]:
"""Search for cryptocurrencies
Args:
query: Search query
Returns:
List of matching cryptocurrencies
"""
# Check cache
cache_key = f"search_{query}_{self.provider}"
if cache_key in self.cache and time.time() < self.cache_expiry.get(cache_key, 0):
return self.cache[cache_key]
try:
if self.provider == "coingecko":
url = f"{self.providers[self.provider]['base_url'] if not self.api_key else self.providers[self.provider]['pro_url']}/search"
params = {
"query": query
}
# Add API key if available
if self.api_key:
params["x_cg_pro_api_key"] = self.api_key
response = requests.get(url, params=params)
if response.status_code != 200:
raise IntegrationError(f"Failed to search coins: {response.text}")
data = response.json()
# Process data
search_results = []
for coin in data.get("coins", []):
search_results.append({
"id": coin.get("id", ""),
"symbol": coin.get("symbol", "").upper(),
"name": coin.get("name", ""),
"market_cap_rank": coin.get("market_cap_rank", None)
})
elif self.provider == "coinmarketcap":
if not self.api_key:
raise IntegrationError("CoinMarketCap API key not set")
url = f"{self.providers[self.provider]['base_url']}/cryptocurrency/map"
headers = {
"X-CMC_PRO_API_KEY": self.api_key
}
response = requests.get(url, headers=headers)
if response.status_code != 200:
raise IntegrationError(f"Failed to search coins: {response.text}")
data = response.json()
# Process data and filter by query
search_results = []
for coin in data.get("data", []):
if query.lower() in coin.get("name", "").lower() or query.lower() in coin.get("symbol", "").lower():
search_results.append({
"id": coin.get("symbol", "").lower(),
"symbol": coin.get("symbol", "").upper(),
"name": coin.get("name", ""),
"market_cap_rank": coin.get("rank", None)
})
elif self.provider == "binance":
# Binance doesn't have a search endpoint
# We'll use the exchange info endpoint and filter the results
url = f"{self.providers[self.provider]['base_url']}/exchangeInfo"
response = requests.get(url)
if response.status_code != 200:
raise IntegrationError(f"Failed to search coins: {response.text}")
data = response.json()
# Process data and filter by query
search_results = []
for symbol in data.get("symbols", []):
base_asset = symbol.get("baseAsset", "")
if query.lower() in base_asset.lower() and symbol.get("quoteAsset") == "USDT":
search_results.append({
"id": base_asset.lower(),
"symbol": base_asset.upper(),
"name": base_asset.upper(), # Binance doesn't provide full names
"market_cap_rank": None
})
# Remove duplicates
unique_results = []
seen_ids = set()
for result in search_results:
if result["id"] not in seen_ids:
unique_results.append(result)
seen_ids.add(result["id"])
search_results = unique_results
else:
raise IntegrationError(f"Unsupported crypto provider: {self.provider}")
# Cache the result
self.cache[cache_key] = search_results
self.cache_expiry[cache_key] = time.time() + self.providers[self.provider]["cache_duration"]
return search_results
except Exception as e:
if not isinstance(e, IntegrationError):
logger.error(f"Failed to search coins: {str(e)}")
raise IntegrationError(f"Failed to search coins: {str(e)}")
raise
def _process_coingecko_market_data(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Process CoinGecko market data
Args:
data: Raw API response data
Returns:
Processed market data
"""
market_data = []
for coin in data:
processed_coin = {
"id": coin.get("id", ""),
"symbol": coin.get("symbol", "").upper(),
"name": coin.get("name", ""),
"image": coin.get("image", ""),
"current_price": coin.get("current_price", 0),
"market_cap": coin.get("market_cap", 0),
"market_cap_rank": coin.get("market_cap_rank", 0),
"total_volume": coin.get("total_volume", 0),
"high_24h": coin.get("high_24h", 0),
"low_24h": coin.get("low_24h", 0),
"price_change_24h": coin.get("price_change_24h", 0),
"price_change_percentage_24h": coin.get("price_change_percentage_24h", 0),
"market_cap_change_24h": coin.get("market_cap_change_24h", 0),
"market_cap_change_percentage_24h": coin.get("market_cap_change_percentage_24h", 0),
"circulating_supply": coin.get("circulating_supply", 0),
"total_supply": coin.get("total_supply", 0),
"max_supply": coin.get("max_supply", 0),
"last_updated": coin.get("last_updated", "")
}
market_data.append(processed_coin)
return market_data
def _process_coinmarketcap_market_data(self, data: Dict[str, Any], vs_currency: str) -> List[Dict[str, Any]]:
"""Process CoinMarketCap market data
Args:
data: Raw API response data
vs_currency: Currency to compare against
Returns:
Processed market data
"""
market_data = []
vs_currency = vs_currency.upper()
for coin in data.get("data", []):
quote = coin.get("quote", {}).get(vs_currency, {})
processed_coin = {
"id": coin.get("symbol", "").lower(),
"symbol": coin.get("symbol", "").upper(),
"name": coin.get("name", ""),
"image": "", # CoinMarketCap doesn't provide images in this endpoint
"current_price": quote.get("price", 0),
"market_cap": quote.get("market_cap", 0),
"market_cap_rank": coin.get("cmc_rank", 0),
"total_volume": quote.get("volume_24h", 0),
"high_24h": 0, # Not provided
"low_24h": 0, # Not provided
"price_change_24h": quote.get("volume_change_24h", 0),
"price_change_percentage_24h": quote.get("percent_change_24h", 0),
"market_cap_change_24h": 0, # Not provided
"market_cap_change_percentage_24h": quote.get("market_cap_change_percentage_24h", 0),
"circulating_supply": coin.get("circulating_supply", 0),
"total_supply": coin.get("total_supply", 0),
"max_supply": coin.get("max_supply", 0),
"last_updated": coin.get("last_updated", "")
}
market_data.append(processed_coin)
return market_data
def _process_binance_market_data(self, data: List[Dict[str, Any]], vs_currency: str) -> List[Dict[str, Any]]:
"""Process Binance market data
Args:
data: Raw API response data
vs_currency: Currency to compare against
Returns:
Processed market data
"""
market_data = []
vs_currency = vs_currency.upper()
for ticker in data:
symbol = ticker.get("symbol", "")
# Only include pairs with the specified currency
if not symbol.endswith(vs_currency):
continue
base_asset = symbol[:-len(vs_currency)].lower()
processed_coin = {
"id": base_asset,
"symbol": base_asset.upper(),
"name": base_asset.upper(), # Binance doesn't provide full names
"image": "", # Not provided
"current_price": float(ticker.get("lastPrice", 0)),
"market_cap": 0, # Not provided
"market_cap_rank": 0, # Not provided
"total_volume": float(ticker.get("volume", 0)),
"high_24h": float(ticker.get("highPrice", 0)),
"low_24h": float(ticker.get("lowPrice", 0)),
"price_change_24h": float(ticker.get("priceChange", 0)),
"price_change_percentage_24h": float(ticker.get("priceChangePercent", 0)),
"market_cap_change_24h": 0, # Not provided
"market_cap_change_percentage_24h": 0, # Not provided
"circulating_supply": 0, # Not provided
"total_supply": 0, # Not provided
"max_supply": 0, # Not provided
"last_updated": datetime.fromtimestamp(int(ticker.get("closeTime", 0)) / 1000).isoformat()
}
market_data.append(processed_coin)
return market_data
def _process_coingecko_coin_details(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Process CoinGecko coin details
Args:
data: Raw API response data
Returns:
Processed coin details
"""
market_data = data.get("market_data", {})
return {
"id": data.get("id", ""),
"symbol": data.get("symbol", "").upper(),
"name": data.get("name", ""),
"description": data.get("description", {}).get("en", ""),
"image": data.get("image", {}).get("large", ""),
"current_price": market_data.get("current_price", {}).get("usd", 0),
"market_cap": market_data.get("market_cap", {}).get("usd", 0),
"market_cap_rank": market_data.get("market_cap_rank", 0),
"total_volume": market_data.get("total_volume", {}).get("usd", 0),
"high_24h": market_data.get("high_24h", {}).get("usd", 0),
"low_24h": market_data.get("low_24h", {}).get("usd", 0),
"price_change_24h": market_data.get("price_change_24h", 0),
"price_change_percentage_24h": market_data.get("price_change_percentage_24h", 0),
"market_cap_change_24h": market_data.get("market_cap_change_24h", 0),
"market_cap_change_percentage_24h": market_data.get("market_cap_change_percentage_24h", 0),
"circulating_supply": market_data.get("circulating_supply", 0),
"total_supply": market_data.get("total_supply", 0),
"max_supply": market_data.get("max_supply", 0),
"ath": market_data.get("ath", {}).get("usd", 0),
"ath_change_percentage": market_data.get("ath_change_percentage", {}).get("usd", 0),
"ath_date": market_data.get("ath_date", {}).get("usd", ""),
"atl": market_data.get("atl", {}).get("usd", 0),
"atl_change_percentage": market_data.get("atl_change_percentage", {}).get("usd", 0),
"atl_date": market_data.get("atl_date", {}).get("usd", ""),
"last_updated": data.get("last_updated", ""),
"links": {
"homepage": data.get("links", {}).get("homepage", [""])[0],
"blockchain_site": data.get("links", {}).get("blockchain_site", [""])[0],
"official_forum_url": data.get("links", {}).get("official_forum_url", [""])[0],
"chat_url": data.get("links", {}).get("chat_url", [""])[0],
"announcement_url": data.get("links", {}).get("announcement_url", [""])[0],
"twitter_screen_name": data.get("links", {}).get("twitter_screen_name", ""),
"facebook_username": data.get("links", {}).get("facebook_username", ""),
"github_url": data.get("links", {}).get("repos_url", {}).get("github", [""])[0],
"subreddit_url": data.get("links", {}).get("subreddit_url", "")
},
"categories": data.get("categories", []),
"genesis_date": data.get("genesis_date", ""),
"sentiment_votes_up_percentage": data.get("sentiment_votes_up_percentage", 0),
"sentiment_votes_down_percentage": data.get("sentiment_votes_down_percentage", 0),
"developer_data": {
"forks": data.get("developer_data", {}).get("forks", 0),
"stars": data.get("developer_data", {}).get("stars", 0),
"subscribers": data.get("developer_data", {}).get("subscribers", 0),
"total_issues": data.get("developer_data", {}).get("total_issues", 0),
"closed_issues": data.get("developer_data", {}).get("closed_issues", 0),
"pull_requests_merged": data.get("developer_data", {}).get("pull_requests_merged", 0),
"pull_request_contributors": data.get("developer_data", {}).get("pull_request_contributors", 0),
"commit_count_4_weeks": data.get("developer_data", {}).get("commit_count_4_weeks", 0)
},
"community_data": {
"twitter_followers": data.get("community_data", {}).get("twitter_followers", 0),
"reddit_subscribers": data.get("community_data", {}).get("reddit_subscribers", 0),
"reddit_average_posts_48h": data.get("community_data", {}).get("reddit_average_posts_48h", 0),
"reddit_average_comments_48h": data.get("community_data", {}).get("reddit_average_comments_48h", 0),
"telegram_channel_user_count": data.get("community_data", {}).get("telegram_channel_user_count", 0)
}
}
def _process_coinmarketcap_coin_details(self, data: Dict[str, Any], cmc_id: str) -> Dict[str, Any]:
"""Process CoinMarketCap coin details
Args:
data: Raw API response data
cmc_id: CoinMarketCap ID
Returns:
Processed coin details
"""
coin_data = data.get("data", {}).get(str(cmc_id), {})
return {
"id": coin_data.get("symbol", "").lower(),
"symbol": coin_data.get("symbol", "").upper(),
"name": coin_data.get("name", ""),
"description": coin_data.get("description", ""),
"image": coin_data.get("logo", ""),
"current_price": 0, # Not provided in this endpoint
"market_cap": 0, # Not provided in this endpoint
"market_cap_rank": 0, # Not provided in this endpoint
"total_volume": 0, # Not provided in this endpoint
"high_24h": 0, # Not provided in this endpoint
"low_24h": 0, # Not provided in this endpoint
"price_change_24h": 0, # Not provided in this endpoint
"price_change_percentage_24h": 0, # Not provided in this endpoint
"market_cap_change_24h": 0, # Not provided in this endpoint
"market_cap_change_percentage_24h": 0, # Not provided in this endpoint
"circulating_supply": 0, # Not provided in this endpoint
"total_supply": 0, # Not provided in this endpoint
"max_supply": 0, # Not provided in this endpoint
"ath": 0, # Not provided in this endpoint
"ath_change_percentage": 0, # Not provided in this endpoint
"ath_date": "", # Not provided in this endpoint
"atl": 0, # Not provided in this endpoint
"atl_change_percentage": 0, # Not provided in this endpoint
"atl_date": "", # Not provided in this endpoint
"last_updated": coin_data.get("last_updated", ""),
"links": {
"homepage": coin_data.get("urls", {}).get("website", [""])[0],
"blockchain_site": coin_data.get("urls", {}).get("explorer", [""])[0],
"official_forum_url": coin_data.get("urls", {}).get("message_board", [""])[0],
"chat_url": coin_data.get("urls", {}).get("chat", [""])[0],
"announcement_url": coin_data.get("urls", {}).get("announcement", [""])[0],
"twitter_screen_name": coin_data.get("urls", {}).get("twitter", [""])[0],
"facebook_username": coin_data.get("urls", {}).get("facebook", [""])[0],
"github_url": coin_data.get("urls", {}).get("source_code", [""])[0],
"subreddit_url": coin_data.get("urls", {}).get("reddit", [""])[0]
},
"categories": coin_data.get("category", "").split(",") if coin_data.get("category") else [],
"genesis_date": coin_data.get("date_added", ""),
"sentiment_votes_up_percentage": 0, # Not provided
"sentiment_votes_down_percentage": 0, # Not provided
"developer_data": {
"forks": 0, # Not provided
"stars": 0, # Not provided
"subscribers": 0, # Not provided
"total_issues": 0, # Not provided
"closed_issues": 0, # Not provided
"pull_requests_merged": 0, # Not provided
"pull_request_contributors": 0, # Not provided
"commit_count_4_weeks": 0 # Not provided
},
"community_data": {
"twitter_followers": 0, # Not provided
"reddit_subscribers": 0, # Not provided
"reddit_average_posts_48h": 0, # Not provided
"reddit_average_comments_48h": 0, # Not provided
"telegram_channel_user_count": 0 # Not provided
}
}
def _process_binance_coin_details(self, data: Dict[str, Any], coin_id: str) -> Dict[str, Any]:
"""Process Binance coin details
Args:
data: Raw API response data
coin_id: Coin ID
Returns:
Processed coin details
"""
return {
"id": coin_id.lower(),
"symbol": coin_id.upper(),
"name": coin_id.upper(), # Binance doesn't provide full names
"description": "", # Not provided
"image": "", # Not provided
"current_price": float(data.get("lastPrice", 0)),
"market_cap": 0, # Not provided
"market_cap_rank": 0, # Not provided
"total_volume": float(data.get("volume", 0)),
"high_24h": float(data.get("highPrice", 0)),
"low_24h": float(data.get("lowPrice", 0)),
"price_change_24h": float(data.get("priceChange", 0)),
"price_change_percentage_24h": float(data.get("priceChangePercent", 0)),
"market_cap_change_24h": 0, # Not provided
"market_cap_change_percentage_24h": 0, # Not provided
"circulating_supply": 0, # Not provided
"total_supply": 0, # Not provided
"max_supply": 0, # Not provided
"ath": 0, # Not provided
"ath_change_percentage": 0, # Not provided
"ath_date": "", # Not provided
"atl": 0, # Not provided
"atl_change_percentage": 0, # Not provided
"atl_date": "", # Not provided
"last_updated": datetime.fromtimestamp(int(data.get("closeTime", 0)) / 1000).isoformat(),
"links": {
"homepage": "", # Not provided
"blockchain_site": "", # Not provided
"official_forum_url": "", # Not provided
"chat_url": "", # Not provided
"announcement_url": "", # Not provided
"twitter_screen_name": "", # Not provided
"facebook_username": "", # Not provided
"github_url": "", # Not provided
"subreddit_url": "" # Not provided
},
"categories": [], # Not provided
"genesis_date": "", # Not provided
"sentiment_votes_up_percentage": 0, # Not provided
"sentiment_votes_down_percentage": 0, # Not provided
"developer_data": {
"forks": 0, # Not provided
"stars": 0, # Not provided
"subscribers": 0, # Not provided
"total_issues": 0, # Not provided
"closed_issues": 0, # Not provided
"pull_requests_merged": 0, # Not provided
"pull_request_contributors": 0, # Not provided
"commit_count_4_weeks": 0 # Not provided
},
"community_data": {
"twitter_followers": 0, # Not provided
"reddit_subscribers": 0, # Not provided
"reddit_average_posts_48h": 0, # Not provided
"reddit_average_comments_48h": 0, # Not provided
"telegram_channel_user_count": 0 # Not provided
}
}
def _process_coingecko_price_history(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Process CoinGecko price history data
Args:
data: Raw API response data
Returns:
Processed price history data
"""
prices = []
market_caps = []
total_volumes = []
for price_data in data.get("prices", []):
if len(price_data) >= 2:
timestamp = price_data[0]
price = price_data[1]
prices.append({
"timestamp": timestamp,
"datetime": datetime.fromtimestamp(timestamp / 1000).isoformat(),
"price": price
})
for market_cap_data in data.get("market_caps", []):
if len(market_cap_data) >= 2:
timestamp = market_cap_data[0]
market_cap = market_cap_data[1]
market_caps.append({
"timestamp": timestamp,
"datetime": datetime.fromtimestamp(timestamp / 1000).isoformat(),
"market_cap": market_cap
})
for volume_data in data.get("total_volumes", []):
if len(volume_data) >= 2:
timestamp = volume_data[0]
volume = volume_data[1]
total_volumes.append({
"timestamp": timestamp,
"datetime": datetime.fromtimestamp(timestamp / 1000).isoformat(),
"volume": volume
})
return {
"prices": prices,
"market_caps": market_caps,
"total_volumes": total_volumes
}
def _process_coinmarketcap_price_history(self, data: Dict[str, Any], vs_currency: str) -> Dict[str, Any]:
"""Process CoinMarketCap price history data
Args:
data: Raw API response data
vs_currency: Currency to compare against
Returns:
Processed price history data
"""
prices = []
market_caps = []
total_volumes = []
vs_currency = vs_currency.upper()
for quote_data in data.get("data", {}).get("quotes", []):
timestamp = datetime.fromisoformat(quote_data.get("timestamp", "").replace("Z", "+00:00")).timestamp() * 1000
quote = quote_data.get("quote", {}).get(vs_currency, {})
prices.append({
"timestamp": timestamp,
"datetime": datetime.fromtimestamp(timestamp / 1000).isoformat(),
"price": quote.get("price", 0)
})
market_caps.append({
"timestamp": timestamp,
"datetime": datetime.fromtimestamp(timestamp / 1000).isoformat(),
"market_cap": quote.get("market_cap", 0)
})
total_volumes.append({
"timestamp": timestamp,
"datetime": datetime.fromtimestamp(timestamp / 1000).isoformat(),
"volume": quote.get("volume_24h", 0)
})
return {
"prices": prices,
"market_caps": market_caps,
"total_volumes": total_volumes
}
def _process_binance_price_history(self, data: List[List[Any]]) -> Dict[str, Any]:
"""Process Binance price history data
Args:
data: Raw API response data
Returns:
Processed price history data
"""
prices = []
total_volumes = []
for kline in data:
if len(kline) >= 8:
timestamp = kline[0]
open_price = float(kline[1])
high_price = float(kline[2])
low_price = float(kline[3])
close_price = float(kline[4])
volume = float(kline[5])
prices.append({
"timestamp": timestamp,
"datetime": datetime.fromtimestamp(timestamp / 1000).isoformat(),
"price": close_price,
"open": open_price,
"high": high_price,
"low": low_price,
"close": close_price
})
total_volumes.append({
"timestamp": timestamp,
"datetime": datetime.fromtimestamp(timestamp / 1000).isoformat(),
"volume": volume
})
return {
"prices": prices,
"market_caps": [], # Not provided by Binance
"total_volumes": total_volumes
}
def _convert_interval(self, interval: str) -> str:
"""Convert interval to CoinMarketCap format
Args:
interval: Interval string
Returns:
CoinMarketCap interval format
"""
if interval == "daily":
return "1d"
elif interval == "hourly":
return "1h"
elif interval == "minutely":
return "1m"
else:
return "1d"
def _convert_binance_interval(self, interval: str) -> str:
"""Convert interval to Binance format
Args:
interval: Interval string
Returns:
Binance interval format
"""
if interval == "daily":
return "1d"
elif interval == "hourly":
return "1h"
elif interval == "minutely":
return "1m"
else:
return "1d"