|
import requests |
|
import json |
|
from typing import Dict, List, Any, Optional |
|
from datetime import datetime, timedelta |
|
import time |
|
|
|
from utils.logging import setup_logger |
|
from utils.error_handling import handle_exceptions, IntegrationError |
|
from utils.storage import load_data, save_data |
|
|
|
|
|
logger = setup_logger(__name__) |
|
|
|
class CryptoIntegration: |
|
"""Cryptocurrency API integration for market data""" |
|
|
|
def __init__(self, api_key: Optional[str] = None, provider: str = "coingecko"): |
|
"""Initialize Cryptocurrency API integration |
|
|
|
Args: |
|
api_key: API key for the crypto provider (optional for some providers) |
|
provider: Crypto data provider (default: coingecko) |
|
""" |
|
self.api_key = api_key |
|
self.provider = provider.lower() |
|
self.cache = {} |
|
self.cache_expiry = {} |
|
|
|
|
|
self.providers = { |
|
"coingecko": { |
|
"base_url": "https://api.coingecko.com/api/v3", |
|
"pro_url": "https://pro-api.coingecko.com/api/v3", |
|
"cache_duration": 300 |
|
}, |
|
"coinmarketcap": { |
|
"base_url": "https://pro-api.coinmarketcap.com/v1", |
|
"cache_duration": 300 |
|
}, |
|
"binance": { |
|
"base_url": "https://api.binance.com/api/v3", |
|
"cache_duration": 60 |
|
} |
|
} |
|
|
|
@handle_exceptions |
|
def set_api_key(self, api_key: str) -> None: |
|
"""Set API key for the crypto provider |
|
|
|
Args: |
|
api_key: API key |
|
""" |
|
self.api_key = api_key |
|
|
|
self.cache = {} |
|
self.cache_expiry = {} |
|
|
|
@handle_exceptions |
|
def set_provider(self, provider: str) -> None: |
|
"""Set crypto data provider |
|
|
|
Args: |
|
provider: Crypto data provider |
|
""" |
|
provider = provider.lower() |
|
if provider not in self.providers: |
|
raise IntegrationError(f"Unsupported crypto provider: {provider}") |
|
|
|
self.provider = provider |
|
|
|
self.cache = {} |
|
self.cache_expiry = {} |
|
|
|
@handle_exceptions |
|
def test_connection(self) -> bool: |
|
"""Test cryptocurrency API connection |
|
|
|
Returns: |
|
True if connection is successful, False otherwise |
|
""" |
|
try: |
|
if self.provider == "coingecko": |
|
|
|
url = f"{self.providers[self.provider]['base_url']}/ping" |
|
response = requests.get(url) |
|
return response.status_code == 200 |
|
|
|
elif self.provider == "coinmarketcap": |
|
|
|
if not self.api_key: |
|
logger.error("CoinMarketCap API key not set") |
|
return False |
|
|
|
|
|
url = f"{self.providers[self.provider]['base_url']}/cryptocurrency/map" |
|
headers = { |
|
"X-CMC_PRO_API_KEY": self.api_key |
|
} |
|
params = { |
|
"limit": 1 |
|
} |
|
response = requests.get(url, headers=headers, params=params) |
|
return response.status_code == 200 |
|
|
|
elif self.provider == "binance": |
|
|
|
url = f"{self.providers[self.provider]['base_url']}/ping" |
|
response = requests.get(url) |
|
return response.status_code == 200 |
|
|
|
return False |
|
|
|
except Exception as e: |
|
logger.error(f"Crypto API connection test failed: {str(e)}") |
|
return False |
|
|
|
@handle_exceptions |
|
def get_market_data(self, coins: Optional[List[str]] = None, vs_currency: str = "usd", |
|
page: int = 1, per_page: int = 100) -> List[Dict[str, Any]]: |
|
"""Get cryptocurrency market data |
|
|
|
Args: |
|
coins: List of coin IDs (optional) |
|
vs_currency: Currency to compare against (default: usd) |
|
page: Page number (default: 1) |
|
per_page: Number of results per page (default: 100) |
|
|
|
Returns: |
|
List of cryptocurrency market data |
|
""" |
|
|
|
cache_key = f"market_{','.join(coins) if coins else 'all'}_{vs_currency}_{page}_{per_page}_{self.provider}" |
|
if cache_key in self.cache and time.time() < self.cache_expiry.get(cache_key, 0): |
|
return self.cache[cache_key] |
|
|
|
try: |
|
if self.provider == "coingecko": |
|
url = f"{self.providers[self.provider]['base_url'] if not self.api_key else self.providers[self.provider]['pro_url']}/coins/markets" |
|
params = { |
|
"vs_currency": vs_currency, |
|
"page": page, |
|
"per_page": per_page, |
|
"order": "market_cap_desc" |
|
} |
|
|
|
|
|
if self.api_key: |
|
params["x_cg_pro_api_key"] = self.api_key |
|
|
|
|
|
if coins: |
|
params["ids"] = ','.join(coins) |
|
|
|
response = requests.get(url, params=params) |
|
|
|
if response.status_code != 200: |
|
raise IntegrationError(f"Failed to get market data: {response.text}") |
|
|
|
data = response.json() |
|
|
|
|
|
market_data = self._process_coingecko_market_data(data) |
|
|
|
elif self.provider == "coinmarketcap": |
|
if not self.api_key: |
|
raise IntegrationError("CoinMarketCap API key not set") |
|
|
|
url = f"{self.providers[self.provider]['base_url']}/cryptocurrency/listings/latest" |
|
headers = { |
|
"X-CMC_PRO_API_KEY": self.api_key |
|
} |
|
params = { |
|
"start": (page - 1) * per_page + 1, |
|
"limit": per_page, |
|
"convert": vs_currency.upper() |
|
} |
|
|
|
response = requests.get(url, headers=headers, params=params) |
|
|
|
if response.status_code != 200: |
|
raise IntegrationError(f"Failed to get market data: {response.text}") |
|
|
|
data = response.json() |
|
|
|
|
|
market_data = self._process_coinmarketcap_market_data(data, vs_currency) |
|
|
|
elif self.provider == "binance": |
|
|
|
|
|
url = f"{self.providers[self.provider]['base_url']}/ticker/24hr" |
|
response = requests.get(url) |
|
|
|
if response.status_code != 200: |
|
raise IntegrationError(f"Failed to get market data: {response.text}") |
|
|
|
data = response.json() |
|
|
|
|
|
market_data = self._process_binance_market_data(data, vs_currency) |
|
|
|
|
|
if coins: |
|
market_data = [item for item in market_data if item["id"] in coins] |
|
|
|
|
|
start_idx = (page - 1) * per_page |
|
end_idx = start_idx + per_page |
|
market_data = market_data[start_idx:end_idx] |
|
|
|
else: |
|
raise IntegrationError(f"Unsupported crypto provider: {self.provider}") |
|
|
|
|
|
self.cache[cache_key] = market_data |
|
self.cache_expiry[cache_key] = time.time() + self.providers[self.provider]["cache_duration"] |
|
|
|
return market_data |
|
|
|
except Exception as e: |
|
if not isinstance(e, IntegrationError): |
|
logger.error(f"Failed to get market data: {str(e)}") |
|
raise IntegrationError(f"Failed to get market data: {str(e)}") |
|
raise |
|
|
|
@handle_exceptions |
|
def get_coin_details(self, coin_id: str) -> Dict[str, Any]: |
|
"""Get detailed information about a specific cryptocurrency |
|
|
|
Args: |
|
coin_id: Coin ID |
|
|
|
Returns: |
|
Detailed coin information |
|
""" |
|
|
|
cache_key = f"coin_{coin_id}_{self.provider}" |
|
if cache_key in self.cache and time.time() < self.cache_expiry.get(cache_key, 0): |
|
return self.cache[cache_key] |
|
|
|
try: |
|
if self.provider == "coingecko": |
|
url = f"{self.providers[self.provider]['base_url'] if not self.api_key else self.providers[self.provider]['pro_url']}/coins/{coin_id}" |
|
params = { |
|
"localization": "false", |
|
"tickers": "false", |
|
"market_data": "true", |
|
"community_data": "true", |
|
"developer_data": "true" |
|
} |
|
|
|
|
|
if self.api_key: |
|
params["x_cg_pro_api_key"] = self.api_key |
|
|
|
response = requests.get(url, params=params) |
|
|
|
if response.status_code != 200: |
|
raise IntegrationError(f"Failed to get coin details: {response.text}") |
|
|
|
data = response.json() |
|
|
|
|
|
coin_details = self._process_coingecko_coin_details(data) |
|
|
|
elif self.provider == "coinmarketcap": |
|
if not self.api_key: |
|
raise IntegrationError("CoinMarketCap API key not set") |
|
|
|
|
|
url = f"{self.providers[self.provider]['base_url']}/cryptocurrency/map" |
|
headers = { |
|
"X-CMC_PRO_API_KEY": self.api_key |
|
} |
|
params = { |
|
"symbol": coin_id.upper() |
|
} |
|
|
|
response = requests.get(url, headers=headers, params=params) |
|
|
|
if response.status_code != 200: |
|
raise IntegrationError(f"Failed to get coin ID: {response.text}") |
|
|
|
data = response.json() |
|
|
|
if not data.get("data") or len(data["data"]) == 0: |
|
raise IntegrationError(f"Coin not found: {coin_id}") |
|
|
|
cmc_id = data["data"][0]["id"] |
|
|
|
|
|
url = f"{self.providers[self.provider]['base_url']}/cryptocurrency/info" |
|
params = { |
|
"id": cmc_id |
|
} |
|
|
|
response = requests.get(url, headers=headers, params=params) |
|
|
|
if response.status_code != 200: |
|
raise IntegrationError(f"Failed to get coin details: {response.text}") |
|
|
|
data = response.json() |
|
|
|
|
|
coin_details = self._process_coinmarketcap_coin_details(data, cmc_id) |
|
|
|
elif self.provider == "binance": |
|
|
|
|
|
url = f"{self.providers[self.provider]['base_url']}/ticker/24hr" |
|
params = { |
|
"symbol": f"{coin_id.upper()}USDT" |
|
} |
|
|
|
response = requests.get(url, params=params) |
|
|
|
if response.status_code != 200: |
|
raise IntegrationError(f"Failed to get coin details: {response.text}") |
|
|
|
data = response.json() |
|
|
|
|
|
coin_details = self._process_binance_coin_details(data, coin_id) |
|
|
|
else: |
|
raise IntegrationError(f"Unsupported crypto provider: {self.provider}") |
|
|
|
|
|
self.cache[cache_key] = coin_details |
|
self.cache_expiry[cache_key] = time.time() + self.providers[self.provider]["cache_duration"] |
|
|
|
return coin_details |
|
|
|
except Exception as e: |
|
if not isinstance(e, IntegrationError): |
|
logger.error(f"Failed to get coin details: {str(e)}") |
|
raise IntegrationError(f"Failed to get coin details: {str(e)}") |
|
raise |
|
|
|
@handle_exceptions |
|
def get_price_history(self, coin_id: str, vs_currency: str = "usd", |
|
days: str = "30", interval: str = "daily") -> Dict[str, Any]: |
|
"""Get price history for a specific cryptocurrency |
|
|
|
Args: |
|
coin_id: Coin ID |
|
vs_currency: Currency to compare against (default: usd) |
|
days: Number of days of data to retrieve (default: 30) |
|
interval: Data interval (default: daily) |
|
|
|
Returns: |
|
Price history data |
|
""" |
|
|
|
cache_key = f"history_{coin_id}_{vs_currency}_{days}_{interval}_{self.provider}" |
|
if cache_key in self.cache and time.time() < self.cache_expiry.get(cache_key, 0): |
|
return self.cache[cache_key] |
|
|
|
try: |
|
if self.provider == "coingecko": |
|
url = f"{self.providers[self.provider]['base_url'] if not self.api_key else self.providers[self.provider]['pro_url']}/coins/{coin_id}/market_chart" |
|
params = { |
|
"vs_currency": vs_currency, |
|
"days": days, |
|
"interval": interval if days != "max" else "daily" |
|
} |
|
|
|
|
|
if self.api_key: |
|
params["x_cg_pro_api_key"] = self.api_key |
|
|
|
response = requests.get(url, params=params) |
|
|
|
if response.status_code != 200: |
|
raise IntegrationError(f"Failed to get price history: {response.text}") |
|
|
|
data = response.json() |
|
|
|
|
|
price_history = self._process_coingecko_price_history(data) |
|
|
|
elif self.provider == "coinmarketcap": |
|
if not self.api_key: |
|
raise IntegrationError("CoinMarketCap API key not set") |
|
|
|
|
|
url = f"{self.providers[self.provider]['base_url']}/cryptocurrency/map" |
|
headers = { |
|
"X-CMC_PRO_API_KEY": self.api_key |
|
} |
|
params = { |
|
"symbol": coin_id.upper() |
|
} |
|
|
|
response = requests.get(url, headers=headers, params=params) |
|
|
|
if response.status_code != 200: |
|
raise IntegrationError(f"Failed to get coin ID: {response.text}") |
|
|
|
data = response.json() |
|
|
|
if not data.get("data") or len(data["data"]) == 0: |
|
raise IntegrationError(f"Coin not found: {coin_id}") |
|
|
|
cmc_id = data["data"][0]["id"] |
|
|
|
|
|
url = f"{self.providers[self.provider]['base_url']}/cryptocurrency/quotes/historical" |
|
|
|
|
|
time_end = datetime.now() |
|
time_start = time_end - timedelta(days=int(days)) |
|
|
|
params = { |
|
"id": cmc_id, |
|
"time_start": time_start.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), |
|
"time_end": time_end.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), |
|
"interval": self._convert_interval(interval), |
|
"convert": vs_currency.upper() |
|
} |
|
|
|
response = requests.get(url, headers=headers, params=params) |
|
|
|
if response.status_code != 200: |
|
raise IntegrationError(f"Failed to get price history: {response.text}") |
|
|
|
data = response.json() |
|
|
|
|
|
price_history = self._process_coinmarketcap_price_history(data, vs_currency) |
|
|
|
elif self.provider == "binance": |
|
|
|
binance_interval = self._convert_binance_interval(interval) |
|
|
|
url = f"{self.providers[self.provider]['base_url']}/klines" |
|
params = { |
|
"symbol": f"{coin_id.upper()}USDT", |
|
"interval": binance_interval, |
|
"limit": 1000 |
|
} |
|
|
|
response = requests.get(url, params=params) |
|
|
|
if response.status_code != 200: |
|
raise IntegrationError(f"Failed to get price history: {response.text}") |
|
|
|
data = response.json() |
|
|
|
|
|
price_history = self._process_binance_price_history(data) |
|
|
|
else: |
|
raise IntegrationError(f"Unsupported crypto provider: {self.provider}") |
|
|
|
|
|
self.cache[cache_key] = price_history |
|
self.cache_expiry[cache_key] = time.time() + self.providers[self.provider]["cache_duration"] |
|
|
|
return price_history |
|
|
|
except Exception as e: |
|
if not isinstance(e, IntegrationError): |
|
logger.error(f"Failed to get price history: {str(e)}") |
|
raise IntegrationError(f"Failed to get price history: {str(e)}") |
|
raise |
|
|
|
@handle_exceptions |
|
def search_coins(self, query: str) -> List[Dict[str, Any]]: |
|
"""Search for cryptocurrencies |
|
|
|
Args: |
|
query: Search query |
|
|
|
Returns: |
|
List of matching cryptocurrencies |
|
""" |
|
|
|
cache_key = f"search_{query}_{self.provider}" |
|
if cache_key in self.cache and time.time() < self.cache_expiry.get(cache_key, 0): |
|
return self.cache[cache_key] |
|
|
|
try: |
|
if self.provider == "coingecko": |
|
url = f"{self.providers[self.provider]['base_url'] if not self.api_key else self.providers[self.provider]['pro_url']}/search" |
|
params = { |
|
"query": query |
|
} |
|
|
|
|
|
if self.api_key: |
|
params["x_cg_pro_api_key"] = self.api_key |
|
|
|
response = requests.get(url, params=params) |
|
|
|
if response.status_code != 200: |
|
raise IntegrationError(f"Failed to search coins: {response.text}") |
|
|
|
data = response.json() |
|
|
|
|
|
search_results = [] |
|
for coin in data.get("coins", []): |
|
search_results.append({ |
|
"id": coin.get("id", ""), |
|
"symbol": coin.get("symbol", "").upper(), |
|
"name": coin.get("name", ""), |
|
"market_cap_rank": coin.get("market_cap_rank", None) |
|
}) |
|
|
|
elif self.provider == "coinmarketcap": |
|
if not self.api_key: |
|
raise IntegrationError("CoinMarketCap API key not set") |
|
|
|
url = f"{self.providers[self.provider]['base_url']}/cryptocurrency/map" |
|
headers = { |
|
"X-CMC_PRO_API_KEY": self.api_key |
|
} |
|
|
|
response = requests.get(url, headers=headers) |
|
|
|
if response.status_code != 200: |
|
raise IntegrationError(f"Failed to search coins: {response.text}") |
|
|
|
data = response.json() |
|
|
|
|
|
search_results = [] |
|
for coin in data.get("data", []): |
|
if query.lower() in coin.get("name", "").lower() or query.lower() in coin.get("symbol", "").lower(): |
|
search_results.append({ |
|
"id": coin.get("symbol", "").lower(), |
|
"symbol": coin.get("symbol", "").upper(), |
|
"name": coin.get("name", ""), |
|
"market_cap_rank": coin.get("rank", None) |
|
}) |
|
|
|
elif self.provider == "binance": |
|
|
|
|
|
url = f"{self.providers[self.provider]['base_url']}/exchangeInfo" |
|
response = requests.get(url) |
|
|
|
if response.status_code != 200: |
|
raise IntegrationError(f"Failed to search coins: {response.text}") |
|
|
|
data = response.json() |
|
|
|
|
|
search_results = [] |
|
for symbol in data.get("symbols", []): |
|
base_asset = symbol.get("baseAsset", "") |
|
if query.lower() in base_asset.lower() and symbol.get("quoteAsset") == "USDT": |
|
search_results.append({ |
|
"id": base_asset.lower(), |
|
"symbol": base_asset.upper(), |
|
"name": base_asset.upper(), |
|
"market_cap_rank": None |
|
}) |
|
|
|
|
|
unique_results = [] |
|
seen_ids = set() |
|
for result in search_results: |
|
if result["id"] not in seen_ids: |
|
unique_results.append(result) |
|
seen_ids.add(result["id"]) |
|
|
|
search_results = unique_results |
|
|
|
else: |
|
raise IntegrationError(f"Unsupported crypto provider: {self.provider}") |
|
|
|
|
|
self.cache[cache_key] = search_results |
|
self.cache_expiry[cache_key] = time.time() + self.providers[self.provider]["cache_duration"] |
|
|
|
return search_results |
|
|
|
except Exception as e: |
|
if not isinstance(e, IntegrationError): |
|
logger.error(f"Failed to search coins: {str(e)}") |
|
raise IntegrationError(f"Failed to search coins: {str(e)}") |
|
raise |
|
|
|
def _process_coingecko_market_data(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: |
|
"""Process CoinGecko market data |
|
|
|
Args: |
|
data: Raw API response data |
|
|
|
Returns: |
|
Processed market data |
|
""" |
|
market_data = [] |
|
|
|
for coin in data: |
|
processed_coin = { |
|
"id": coin.get("id", ""), |
|
"symbol": coin.get("symbol", "").upper(), |
|
"name": coin.get("name", ""), |
|
"image": coin.get("image", ""), |
|
"current_price": coin.get("current_price", 0), |
|
"market_cap": coin.get("market_cap", 0), |
|
"market_cap_rank": coin.get("market_cap_rank", 0), |
|
"total_volume": coin.get("total_volume", 0), |
|
"high_24h": coin.get("high_24h", 0), |
|
"low_24h": coin.get("low_24h", 0), |
|
"price_change_24h": coin.get("price_change_24h", 0), |
|
"price_change_percentage_24h": coin.get("price_change_percentage_24h", 0), |
|
"market_cap_change_24h": coin.get("market_cap_change_24h", 0), |
|
"market_cap_change_percentage_24h": coin.get("market_cap_change_percentage_24h", 0), |
|
"circulating_supply": coin.get("circulating_supply", 0), |
|
"total_supply": coin.get("total_supply", 0), |
|
"max_supply": coin.get("max_supply", 0), |
|
"last_updated": coin.get("last_updated", "") |
|
} |
|
|
|
market_data.append(processed_coin) |
|
|
|
return market_data |
|
|
|
def _process_coinmarketcap_market_data(self, data: Dict[str, Any], vs_currency: str) -> List[Dict[str, Any]]: |
|
"""Process CoinMarketCap market data |
|
|
|
Args: |
|
data: Raw API response data |
|
vs_currency: Currency to compare against |
|
|
|
Returns: |
|
Processed market data |
|
""" |
|
market_data = [] |
|
vs_currency = vs_currency.upper() |
|
|
|
for coin in data.get("data", []): |
|
quote = coin.get("quote", {}).get(vs_currency, {}) |
|
|
|
processed_coin = { |
|
"id": coin.get("symbol", "").lower(), |
|
"symbol": coin.get("symbol", "").upper(), |
|
"name": coin.get("name", ""), |
|
"image": "", |
|
"current_price": quote.get("price", 0), |
|
"market_cap": quote.get("market_cap", 0), |
|
"market_cap_rank": coin.get("cmc_rank", 0), |
|
"total_volume": quote.get("volume_24h", 0), |
|
"high_24h": 0, |
|
"low_24h": 0, |
|
"price_change_24h": quote.get("volume_change_24h", 0), |
|
"price_change_percentage_24h": quote.get("percent_change_24h", 0), |
|
"market_cap_change_24h": 0, |
|
"market_cap_change_percentage_24h": quote.get("market_cap_change_percentage_24h", 0), |
|
"circulating_supply": coin.get("circulating_supply", 0), |
|
"total_supply": coin.get("total_supply", 0), |
|
"max_supply": coin.get("max_supply", 0), |
|
"last_updated": coin.get("last_updated", "") |
|
} |
|
|
|
market_data.append(processed_coin) |
|
|
|
return market_data |
|
|
|
def _process_binance_market_data(self, data: List[Dict[str, Any]], vs_currency: str) -> List[Dict[str, Any]]: |
|
"""Process Binance market data |
|
|
|
Args: |
|
data: Raw API response data |
|
vs_currency: Currency to compare against |
|
|
|
Returns: |
|
Processed market data |
|
""" |
|
market_data = [] |
|
vs_currency = vs_currency.upper() |
|
|
|
for ticker in data: |
|
symbol = ticker.get("symbol", "") |
|
|
|
|
|
if not symbol.endswith(vs_currency): |
|
continue |
|
|
|
base_asset = symbol[:-len(vs_currency)].lower() |
|
|
|
processed_coin = { |
|
"id": base_asset, |
|
"symbol": base_asset.upper(), |
|
"name": base_asset.upper(), |
|
"image": "", |
|
"current_price": float(ticker.get("lastPrice", 0)), |
|
"market_cap": 0, |
|
"market_cap_rank": 0, |
|
"total_volume": float(ticker.get("volume", 0)), |
|
"high_24h": float(ticker.get("highPrice", 0)), |
|
"low_24h": float(ticker.get("lowPrice", 0)), |
|
"price_change_24h": float(ticker.get("priceChange", 0)), |
|
"price_change_percentage_24h": float(ticker.get("priceChangePercent", 0)), |
|
"market_cap_change_24h": 0, |
|
"market_cap_change_percentage_24h": 0, |
|
"circulating_supply": 0, |
|
"total_supply": 0, |
|
"max_supply": 0, |
|
"last_updated": datetime.fromtimestamp(int(ticker.get("closeTime", 0)) / 1000).isoformat() |
|
} |
|
|
|
market_data.append(processed_coin) |
|
|
|
return market_data |
|
|
|
def _process_coingecko_coin_details(self, data: Dict[str, Any]) -> Dict[str, Any]: |
|
"""Process CoinGecko coin details |
|
|
|
Args: |
|
data: Raw API response data |
|
|
|
Returns: |
|
Processed coin details |
|
""" |
|
market_data = data.get("market_data", {}) |
|
|
|
return { |
|
"id": data.get("id", ""), |
|
"symbol": data.get("symbol", "").upper(), |
|
"name": data.get("name", ""), |
|
"description": data.get("description", {}).get("en", ""), |
|
"image": data.get("image", {}).get("large", ""), |
|
"current_price": market_data.get("current_price", {}).get("usd", 0), |
|
"market_cap": market_data.get("market_cap", {}).get("usd", 0), |
|
"market_cap_rank": market_data.get("market_cap_rank", 0), |
|
"total_volume": market_data.get("total_volume", {}).get("usd", 0), |
|
"high_24h": market_data.get("high_24h", {}).get("usd", 0), |
|
"low_24h": market_data.get("low_24h", {}).get("usd", 0), |
|
"price_change_24h": market_data.get("price_change_24h", 0), |
|
"price_change_percentage_24h": market_data.get("price_change_percentage_24h", 0), |
|
"market_cap_change_24h": market_data.get("market_cap_change_24h", 0), |
|
"market_cap_change_percentage_24h": market_data.get("market_cap_change_percentage_24h", 0), |
|
"circulating_supply": market_data.get("circulating_supply", 0), |
|
"total_supply": market_data.get("total_supply", 0), |
|
"max_supply": market_data.get("max_supply", 0), |
|
"ath": market_data.get("ath", {}).get("usd", 0), |
|
"ath_change_percentage": market_data.get("ath_change_percentage", {}).get("usd", 0), |
|
"ath_date": market_data.get("ath_date", {}).get("usd", ""), |
|
"atl": market_data.get("atl", {}).get("usd", 0), |
|
"atl_change_percentage": market_data.get("atl_change_percentage", {}).get("usd", 0), |
|
"atl_date": market_data.get("atl_date", {}).get("usd", ""), |
|
"last_updated": data.get("last_updated", ""), |
|
"links": { |
|
"homepage": data.get("links", {}).get("homepage", [""])[0], |
|
"blockchain_site": data.get("links", {}).get("blockchain_site", [""])[0], |
|
"official_forum_url": data.get("links", {}).get("official_forum_url", [""])[0], |
|
"chat_url": data.get("links", {}).get("chat_url", [""])[0], |
|
"announcement_url": data.get("links", {}).get("announcement_url", [""])[0], |
|
"twitter_screen_name": data.get("links", {}).get("twitter_screen_name", ""), |
|
"facebook_username": data.get("links", {}).get("facebook_username", ""), |
|
"github_url": data.get("links", {}).get("repos_url", {}).get("github", [""])[0], |
|
"subreddit_url": data.get("links", {}).get("subreddit_url", "") |
|
}, |
|
"categories": data.get("categories", []), |
|
"genesis_date": data.get("genesis_date", ""), |
|
"sentiment_votes_up_percentage": data.get("sentiment_votes_up_percentage", 0), |
|
"sentiment_votes_down_percentage": data.get("sentiment_votes_down_percentage", 0), |
|
"developer_data": { |
|
"forks": data.get("developer_data", {}).get("forks", 0), |
|
"stars": data.get("developer_data", {}).get("stars", 0), |
|
"subscribers": data.get("developer_data", {}).get("subscribers", 0), |
|
"total_issues": data.get("developer_data", {}).get("total_issues", 0), |
|
"closed_issues": data.get("developer_data", {}).get("closed_issues", 0), |
|
"pull_requests_merged": data.get("developer_data", {}).get("pull_requests_merged", 0), |
|
"pull_request_contributors": data.get("developer_data", {}).get("pull_request_contributors", 0), |
|
"commit_count_4_weeks": data.get("developer_data", {}).get("commit_count_4_weeks", 0) |
|
}, |
|
"community_data": { |
|
"twitter_followers": data.get("community_data", {}).get("twitter_followers", 0), |
|
"reddit_subscribers": data.get("community_data", {}).get("reddit_subscribers", 0), |
|
"reddit_average_posts_48h": data.get("community_data", {}).get("reddit_average_posts_48h", 0), |
|
"reddit_average_comments_48h": data.get("community_data", {}).get("reddit_average_comments_48h", 0), |
|
"telegram_channel_user_count": data.get("community_data", {}).get("telegram_channel_user_count", 0) |
|
} |
|
} |
|
|
|
def _process_coinmarketcap_coin_details(self, data: Dict[str, Any], cmc_id: str) -> Dict[str, Any]: |
|
"""Process CoinMarketCap coin details |
|
|
|
Args: |
|
data: Raw API response data |
|
cmc_id: CoinMarketCap ID |
|
|
|
Returns: |
|
Processed coin details |
|
""" |
|
coin_data = data.get("data", {}).get(str(cmc_id), {}) |
|
|
|
return { |
|
"id": coin_data.get("symbol", "").lower(), |
|
"symbol": coin_data.get("symbol", "").upper(), |
|
"name": coin_data.get("name", ""), |
|
"description": coin_data.get("description", ""), |
|
"image": coin_data.get("logo", ""), |
|
"current_price": 0, |
|
"market_cap": 0, |
|
"market_cap_rank": 0, |
|
"total_volume": 0, |
|
"high_24h": 0, |
|
"low_24h": 0, |
|
"price_change_24h": 0, |
|
"price_change_percentage_24h": 0, |
|
"market_cap_change_24h": 0, |
|
"market_cap_change_percentage_24h": 0, |
|
"circulating_supply": 0, |
|
"total_supply": 0, |
|
"max_supply": 0, |
|
"ath": 0, |
|
"ath_change_percentage": 0, |
|
"ath_date": "", |
|
"atl": 0, |
|
"atl_change_percentage": 0, |
|
"atl_date": "", |
|
"last_updated": coin_data.get("last_updated", ""), |
|
"links": { |
|
"homepage": coin_data.get("urls", {}).get("website", [""])[0], |
|
"blockchain_site": coin_data.get("urls", {}).get("explorer", [""])[0], |
|
"official_forum_url": coin_data.get("urls", {}).get("message_board", [""])[0], |
|
"chat_url": coin_data.get("urls", {}).get("chat", [""])[0], |
|
"announcement_url": coin_data.get("urls", {}).get("announcement", [""])[0], |
|
"twitter_screen_name": coin_data.get("urls", {}).get("twitter", [""])[0], |
|
"facebook_username": coin_data.get("urls", {}).get("facebook", [""])[0], |
|
"github_url": coin_data.get("urls", {}).get("source_code", [""])[0], |
|
"subreddit_url": coin_data.get("urls", {}).get("reddit", [""])[0] |
|
}, |
|
"categories": coin_data.get("category", "").split(",") if coin_data.get("category") else [], |
|
"genesis_date": coin_data.get("date_added", ""), |
|
"sentiment_votes_up_percentage": 0, |
|
"sentiment_votes_down_percentage": 0, |
|
"developer_data": { |
|
"forks": 0, |
|
"stars": 0, |
|
"subscribers": 0, |
|
"total_issues": 0, |
|
"closed_issues": 0, |
|
"pull_requests_merged": 0, |
|
"pull_request_contributors": 0, |
|
"commit_count_4_weeks": 0 |
|
}, |
|
"community_data": { |
|
"twitter_followers": 0, |
|
"reddit_subscribers": 0, |
|
"reddit_average_posts_48h": 0, |
|
"reddit_average_comments_48h": 0, |
|
"telegram_channel_user_count": 0 |
|
} |
|
} |
|
|
|
def _process_binance_coin_details(self, data: Dict[str, Any], coin_id: str) -> Dict[str, Any]: |
|
"""Process Binance coin details |
|
|
|
Args: |
|
data: Raw API response data |
|
coin_id: Coin ID |
|
|
|
Returns: |
|
Processed coin details |
|
""" |
|
return { |
|
"id": coin_id.lower(), |
|
"symbol": coin_id.upper(), |
|
"name": coin_id.upper(), |
|
"description": "", |
|
"image": "", |
|
"current_price": float(data.get("lastPrice", 0)), |
|
"market_cap": 0, |
|
"market_cap_rank": 0, |
|
"total_volume": float(data.get("volume", 0)), |
|
"high_24h": float(data.get("highPrice", 0)), |
|
"low_24h": float(data.get("lowPrice", 0)), |
|
"price_change_24h": float(data.get("priceChange", 0)), |
|
"price_change_percentage_24h": float(data.get("priceChangePercent", 0)), |
|
"market_cap_change_24h": 0, |
|
"market_cap_change_percentage_24h": 0, |
|
"circulating_supply": 0, |
|
"total_supply": 0, |
|
"max_supply": 0, |
|
"ath": 0, |
|
"ath_change_percentage": 0, |
|
"ath_date": "", |
|
"atl": 0, |
|
"atl_change_percentage": 0, |
|
"atl_date": "", |
|
"last_updated": datetime.fromtimestamp(int(data.get("closeTime", 0)) / 1000).isoformat(), |
|
"links": { |
|
"homepage": "", |
|
"blockchain_site": "", |
|
"official_forum_url": "", |
|
"chat_url": "", |
|
"announcement_url": "", |
|
"twitter_screen_name": "", |
|
"facebook_username": "", |
|
"github_url": "", |
|
"subreddit_url": "" |
|
}, |
|
"categories": [], |
|
"genesis_date": "", |
|
"sentiment_votes_up_percentage": 0, |
|
"sentiment_votes_down_percentage": 0, |
|
"developer_data": { |
|
"forks": 0, |
|
"stars": 0, |
|
"subscribers": 0, |
|
"total_issues": 0, |
|
"closed_issues": 0, |
|
"pull_requests_merged": 0, |
|
"pull_request_contributors": 0, |
|
"commit_count_4_weeks": 0 |
|
}, |
|
"community_data": { |
|
"twitter_followers": 0, |
|
"reddit_subscribers": 0, |
|
"reddit_average_posts_48h": 0, |
|
"reddit_average_comments_48h": 0, |
|
"telegram_channel_user_count": 0 |
|
} |
|
} |
|
|
|
def _process_coingecko_price_history(self, data: Dict[str, Any]) -> Dict[str, Any]: |
|
"""Process CoinGecko price history data |
|
|
|
Args: |
|
data: Raw API response data |
|
|
|
Returns: |
|
Processed price history data |
|
""" |
|
prices = [] |
|
market_caps = [] |
|
total_volumes = [] |
|
|
|
for price_data in data.get("prices", []): |
|
if len(price_data) >= 2: |
|
timestamp = price_data[0] |
|
price = price_data[1] |
|
prices.append({ |
|
"timestamp": timestamp, |
|
"datetime": datetime.fromtimestamp(timestamp / 1000).isoformat(), |
|
"price": price |
|
}) |
|
|
|
for market_cap_data in data.get("market_caps", []): |
|
if len(market_cap_data) >= 2: |
|
timestamp = market_cap_data[0] |
|
market_cap = market_cap_data[1] |
|
market_caps.append({ |
|
"timestamp": timestamp, |
|
"datetime": datetime.fromtimestamp(timestamp / 1000).isoformat(), |
|
"market_cap": market_cap |
|
}) |
|
|
|
for volume_data in data.get("total_volumes", []): |
|
if len(volume_data) >= 2: |
|
timestamp = volume_data[0] |
|
volume = volume_data[1] |
|
total_volumes.append({ |
|
"timestamp": timestamp, |
|
"datetime": datetime.fromtimestamp(timestamp / 1000).isoformat(), |
|
"volume": volume |
|
}) |
|
|
|
return { |
|
"prices": prices, |
|
"market_caps": market_caps, |
|
"total_volumes": total_volumes |
|
} |
|
|
|
def _process_coinmarketcap_price_history(self, data: Dict[str, Any], vs_currency: str) -> Dict[str, Any]: |
|
"""Process CoinMarketCap price history data |
|
|
|
Args: |
|
data: Raw API response data |
|
vs_currency: Currency to compare against |
|
|
|
Returns: |
|
Processed price history data |
|
""" |
|
prices = [] |
|
market_caps = [] |
|
total_volumes = [] |
|
vs_currency = vs_currency.upper() |
|
|
|
for quote_data in data.get("data", {}).get("quotes", []): |
|
timestamp = datetime.fromisoformat(quote_data.get("timestamp", "").replace("Z", "+00:00")).timestamp() * 1000 |
|
quote = quote_data.get("quote", {}).get(vs_currency, {}) |
|
|
|
prices.append({ |
|
"timestamp": timestamp, |
|
"datetime": datetime.fromtimestamp(timestamp / 1000).isoformat(), |
|
"price": quote.get("price", 0) |
|
}) |
|
|
|
market_caps.append({ |
|
"timestamp": timestamp, |
|
"datetime": datetime.fromtimestamp(timestamp / 1000).isoformat(), |
|
"market_cap": quote.get("market_cap", 0) |
|
}) |
|
|
|
total_volumes.append({ |
|
"timestamp": timestamp, |
|
"datetime": datetime.fromtimestamp(timestamp / 1000).isoformat(), |
|
"volume": quote.get("volume_24h", 0) |
|
}) |
|
|
|
return { |
|
"prices": prices, |
|
"market_caps": market_caps, |
|
"total_volumes": total_volumes |
|
} |
|
|
|
def _process_binance_price_history(self, data: List[List[Any]]) -> Dict[str, Any]: |
|
"""Process Binance price history data |
|
|
|
Args: |
|
data: Raw API response data |
|
|
|
Returns: |
|
Processed price history data |
|
""" |
|
prices = [] |
|
total_volumes = [] |
|
|
|
for kline in data: |
|
if len(kline) >= 8: |
|
timestamp = kline[0] |
|
open_price = float(kline[1]) |
|
high_price = float(kline[2]) |
|
low_price = float(kline[3]) |
|
close_price = float(kline[4]) |
|
volume = float(kline[5]) |
|
|
|
prices.append({ |
|
"timestamp": timestamp, |
|
"datetime": datetime.fromtimestamp(timestamp / 1000).isoformat(), |
|
"price": close_price, |
|
"open": open_price, |
|
"high": high_price, |
|
"low": low_price, |
|
"close": close_price |
|
}) |
|
|
|
total_volumes.append({ |
|
"timestamp": timestamp, |
|
"datetime": datetime.fromtimestamp(timestamp / 1000).isoformat(), |
|
"volume": volume |
|
}) |
|
|
|
return { |
|
"prices": prices, |
|
"market_caps": [], |
|
"total_volumes": total_volumes |
|
} |
|
|
|
def _convert_interval(self, interval: str) -> str: |
|
"""Convert interval to CoinMarketCap format |
|
|
|
Args: |
|
interval: Interval string |
|
|
|
Returns: |
|
CoinMarketCap interval format |
|
""" |
|
if interval == "daily": |
|
return "1d" |
|
elif interval == "hourly": |
|
return "1h" |
|
elif interval == "minutely": |
|
return "1m" |
|
else: |
|
return "1d" |
|
|
|
def _convert_binance_interval(self, interval: str) -> str: |
|
"""Convert interval to Binance format |
|
|
|
Args: |
|
interval: Interval string |
|
|
|
Returns: |
|
Binance interval format |
|
""" |
|
if interval == "daily": |
|
return "1d" |
|
elif interval == "hourly": |
|
return "1h" |
|
elif interval == "minutely": |
|
return "1m" |
|
else: |
|
return "1d" |