Really-amin's picture
Upload 577 files
b190b45 verified
#!/usr/bin/env python3
"""
Hugging Face Unified Client
==================================
تمام درخواست‌ها از طریق این کلاینت به Hugging Face Space ارسال می‌شوند.
هیچ درخواست مستقیمی به API های خارجی ارسال نمی‌شود.
✅ تمام داده‌ها از Hugging Face
✅ بدون WebSocket (فقط HTTP)
✅ Cache و Retry مکانیزم
✅ Error Handling
References: crypto_resources_unified_2025-11-11.json
"""
import httpx
import asyncio
import logging
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
import os
import hashlib
import json
logger = logging.getLogger(__name__)
class HuggingFaceUnifiedClient:
"""
کلاینت یکپارچه برای تمام درخواست‌های به Hugging Face Space
این کلاینت **تنها** منبع دریافت داده است و به جای API های دیگر،
تمام داده‌ها را از Hugging Face Space دریافت می‌کند.
"""
def __init__(self):
"""Initialize HuggingFace client with config"""
self.base_url = os.getenv(
"HF_SPACE_BASE_URL",
"https://really-amin-datasourceforcryptocurrency.hf.space"
)
self.api_token = os.getenv("HF_API_TOKEN", "")
self.timeout = httpx.Timeout(30.0, connect=10.0)
# Request headers
self.headers = {
"Content-Type": "application/json",
"User-Agent": "CryptoDataHub/1.0"
}
# Add auth token if available
if self.api_token:
self.headers["Authorization"] = f"Bearer {self.api_token}"
# Cache configuration
self.cache = {}
self.cache_ttl = {
"market": 30, # 30 seconds
"ohlcv": 60, # 1 minute
"news": 300, # 5 minutes
"sentiment": 0, # No cache for sentiment
"blockchain": 60, # 1 minute
}
logger.info(f"🚀 HuggingFace Unified Client initialized")
logger.info(f" Base URL: {self.base_url}")
logger.info(f" Auth: {'✅ Token configured' if self.api_token else '❌ No token'}")
def _get_cache_key(self, endpoint: str, params: Dict = None) -> str:
"""Generate cache key from endpoint and params"""
cache_str = f"{endpoint}:{json.dumps(params or {}, sort_keys=True)}"
return hashlib.md5(cache_str.encode()).hexdigest()
def _get_cached(self, cache_key: str, cache_type: str) -> Optional[Dict]:
"""Get data from cache if available and not expired"""
if cache_key not in self.cache:
return None
cached_data, cached_time = self.cache[cache_key]
ttl = self.cache_ttl.get(cache_type, 0)
if ttl == 0:
# No caching
return None
age = (datetime.now() - cached_time).total_seconds()
if age < ttl:
logger.info(f"📦 Cache HIT: {cache_key} (age: {age:.1f}s)")
return cached_data
else:
# Expired
logger.info(f"⏰ Cache EXPIRED: {cache_key} (age: {age:.1f}s, ttl: {ttl}s)")
del self.cache[cache_key]
return None
def _set_cache(self, cache_key: str, data: Dict, cache_type: str):
"""Store data in cache"""
ttl = self.cache_ttl.get(cache_type, 0)
if ttl > 0:
self.cache[cache_key] = (data, datetime.now())
logger.info(f"💾 Cache SET: {cache_key} (ttl: {ttl}s)")
async def _request(
self,
method: str,
endpoint: str,
params: Optional[Dict] = None,
json_body: Optional[Dict] = None,
cache_type: Optional[str] = None,
retry: int = 3
) -> Dict[str, Any]:
"""
Make HTTP request to HuggingFace Space
Args:
method: HTTP method (GET, POST, etc.)
endpoint: API endpoint (e.g., "/api/market")
params: Query parameters
json_body: JSON body for POST requests
cache_type: Type of cache ("market", "ohlcv", etc.)
retry: Number of retry attempts
Returns:
Response data as dict
"""
# Check cache first (only for GET requests)
if method.upper() == "GET" and cache_type:
cache_key = self._get_cache_key(endpoint, params)
cached = self._get_cached(cache_key, cache_type)
if cached:
return cached
# Build full URL
url = f"{self.base_url}{endpoint}"
# Make request with retry
last_error = None
for attempt in range(retry):
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
if method.upper() == "GET":
response = await client.get(url, headers=self.headers, params=params)
elif method.upper() == "POST":
response = await client.post(url, headers=self.headers, json=json_body)
else:
raise ValueError(f"Unsupported HTTP method: {method}")
# Check status
response.raise_for_status()
# Parse JSON
data = response.json()
# Cache if applicable
if method.upper() == "GET" and cache_type:
cache_key = self._get_cache_key(endpoint, params)
self._set_cache(cache_key, data, cache_type)
logger.info(f"✅ HF Request: {method} {endpoint} (attempt {attempt + 1}/{retry})")
return data
except httpx.HTTPStatusError as e:
last_error = e
logger.warning(f"❌ HF Request failed (attempt {attempt + 1}/{retry}): {e.response.status_code} - {e.response.text}")
if attempt < retry - 1:
await asyncio.sleep(1 * (attempt + 1)) # Exponential backoff
except Exception as e:
last_error = e
logger.error(f"❌ HF Request error (attempt {attempt + 1}/{retry}): {e}")
if attempt < retry - 1:
await asyncio.sleep(1 * (attempt + 1))
# All retries failed
raise Exception(f"HuggingFace API request failed after {retry} attempts: {last_error}")
# =========================================================================
# Market Data Methods
# =========================================================================
async def get_market_prices(
self,
symbols: Optional[List[str]] = None,
limit: int = 100
) -> Dict[str, Any]:
"""
دریافت قیمت‌های بازار از HuggingFace
Endpoint: GET /api/market
Args:
symbols: لیست سمبل‌ها (مثلاً ['BTC', 'ETH'])
limit: تعداد نتایج
Returns:
{
"success": True,
"data": [
{
"symbol": "BTC",
"price": 50000.0,
"market_cap": 1000000000.0,
"volume_24h": 50000000.0,
"change_24h": 2.5,
"last_updated": 1234567890000
},
...
],
"source": "hf_engine",
"timestamp": 1234567890000,
"cached": False
}
"""
params = {"limit": limit}
if symbols:
params["symbols"] = ",".join(symbols)
return await self._request(
"GET",
"/api/market",
params=params,
cache_type="market"
)
async def get_market_history(
self,
symbol: str,
timeframe: str = "1h",
limit: int = 1000
) -> Dict[str, Any]:
"""
دریافت داده‌های تاریخی OHLCV از HuggingFace
Endpoint: GET /api/market/history
Args:
symbol: سمبل (مثلاً "BTCUSDT")
timeframe: بازه زمانی ("1m", "5m", "15m", "1h", "4h", "1d")
limit: تعداد کندل‌ها
Returns:
{
"success": True,
"data": [
{
"timestamp": 1234567890000,
"open": 50000.0,
"high": 51000.0,
"low": 49500.0,
"close": 50500.0,
"volume": 1000000.0
},
...
],
"source": "hf_engine",
"timestamp": 1234567890000
}
"""
params = {
"symbol": symbol,
"timeframe": timeframe,
"limit": limit
}
return await self._request(
"GET",
"/api/market/history",
params=params,
cache_type="ohlcv"
)
# =========================================================================
# Sentiment Analysis Methods
# =========================================================================
async def analyze_sentiment(self, text: str) -> Dict[str, Any]:
"""
تحلیل احساسات متن با مدل‌های AI در HuggingFace
Endpoint: POST /api/sentiment/analyze
Args:
text: متن برای تحلیل
Returns:
{
"success": True,
"data": {
"label": "positive",
"score": 0.95,
"sentiment": "positive",
"confidence": 0.95,
"text": "Bitcoin is...",
"timestamp": 1234567890000
},
"source": "hf_engine",
"timestamp": 1234567890000
}
"""
json_body = {"text": text}
return await self._request(
"POST",
"/api/sentiment/analyze",
json_body=json_body,
cache_type=None # No cache for sentiment
)
# =========================================================================
# News Methods (از HuggingFace Space)
# =========================================================================
async def get_news(
self,
limit: int = 20,
source: Optional[str] = None
) -> Dict[str, Any]:
"""
دریافت اخبار رمز ارز از HuggingFace
Endpoint: GET /api/news
Args:
limit: تعداد خبر
source: منبع خبر (اختیاری)
Returns:
{
"articles": [
{
"id": "123",
"title": "Bitcoin reaches new high",
"url": "https://...",
"source": "CoinDesk",
"published_at": "2025-01-01T00:00:00"
},
...
],
"meta": {
"cache_ttl_seconds": 300,
"source": "hf"
}
}
"""
params = {"limit": limit}
if source:
params["source"] = source
return await self._request(
"GET",
"/api/news",
params=params,
cache_type="news"
)
# =========================================================================
# Blockchain Explorer Methods (از HuggingFace Space)
# =========================================================================
async def get_blockchain_gas_prices(self, chain: str = "ethereum") -> Dict[str, Any]:
"""
دریافت قیمت گس از HuggingFace
Endpoint: GET /api/crypto/blockchain/gas
Args:
chain: نام بلاکچین (ethereum, bsc, polygon, etc.)
Returns:
{
"chain": "ethereum",
"gas_prices": {
"fast": 50.0,
"standard": 30.0,
"slow": 20.0,
"unit": "gwei"
},
"timestamp": "2025-01-01T00:00:00",
"meta": {...}
}
"""
params = {"chain": chain}
return await self._request(
"GET",
"/api/crypto/blockchain/gas",
params=params,
cache_type="blockchain"
)
async def get_blockchain_stats(
self,
chain: str = "ethereum",
hours: int = 24
) -> Dict[str, Any]:
"""
دریافت آمار بلاکچین از HuggingFace
Endpoint: GET /api/crypto/blockchain/stats
Args:
chain: نام بلاکچین
hours: بازه زمانی (ساعت)
Returns:
{
"chain": "ethereum",
"blocks_24h": 7000,
"transactions_24h": 1200000,
"avg_gas_price": 25.0,
"mempool_size": 100000,
"meta": {...}
}
"""
params = {"chain": chain, "hours": hours}
return await self._request(
"GET",
"/api/crypto/blockchain/stats",
params=params,
cache_type="blockchain"
)
# =========================================================================
# Whale Tracking Methods
# =========================================================================
async def get_whale_transactions(
self,
limit: int = 50,
chain: Optional[str] = None,
min_amount_usd: float = 100000
) -> Dict[str, Any]:
"""
دریافت تراکنش‌های نهنگ‌ها از HuggingFace
Endpoint: GET /api/crypto/whales/transactions
"""
params = {
"limit": limit,
"min_amount_usd": min_amount_usd
}
if chain:
params["chain"] = chain
return await self._request(
"GET",
"/api/crypto/whales/transactions",
params=params,
cache_type="market"
)
async def get_whale_stats(self, hours: int = 24) -> Dict[str, Any]:
"""
دریافت آمار نهنگ‌ها از HuggingFace
Endpoint: GET /api/crypto/whales/stats
"""
params = {"hours": hours}
return await self._request(
"GET",
"/api/crypto/whales/stats",
params=params,
cache_type="market"
)
# =========================================================================
# Health & Status Methods
# =========================================================================
async def health_check(self) -> Dict[str, Any]:
"""
بررسی سلامت HuggingFace Space
Endpoint: GET /api/health
Returns:
{
"success": True,
"status": "healthy",
"timestamp": 1234567890000,
"version": "1.0.0",
"database": "connected",
"cache": {
"market_data_count": 100,
"ohlc_count": 5000
},
"ai_models": {
"loaded": 3,
"failed": 0,
"total": 3
},
"source": "hf_engine"
}
"""
return await self._request(
"GET",
"/api/health",
cache_type=None
)
async def get_system_status(self) -> Dict[str, Any]:
"""
دریافت وضعیت کل سیستم
Endpoint: GET /api/status
"""
return await self._request(
"GET",
"/api/status",
cache_type=None
)
# Global singleton instance
_hf_client_instance = None
def get_hf_client() -> HuggingFaceUnifiedClient:
"""Get singleton instance of HuggingFace Unified Client"""
global _hf_client_instance
if _hf_client_instance is None:
_hf_client_instance = HuggingFaceUnifiedClient()
return _hf_client_instance