Trad / data_manager.py
Riy777's picture
Update data_manager.py
ed04390
raw
history blame
66.7 kB
# data_manager.py - الإصدار المحسن مع تحليل صافي التدفق الذكي
import os, asyncio, httpx, json, traceback, backoff, re, time, math
from datetime import datetime, timedelta
from functools import wraps
from collections import defaultdict, deque
import ccxt.pro as ccxt
from ccxt.base.errors import RateLimitExceeded, DDoSProtection, NetworkError
import pandas as pd
import numpy as np
from state import MARKET_STATE_OK
# --- 🐋 نظام تتبع الحيتان المحسن مع تحليل صافي التدفق ---
class EnhancedWhaleMonitor:
def __init__(self, contracts_db=None):
self.http_client = httpx.AsyncClient(timeout=15.0, limits=httpx.Limits(max_connections=50, max_keepalive_connections=10))
# 🔑 استخدام مفاتيح API من متغيرات البيئة فقط
self.moralis_key = os.getenv("MORALIS_KEY")
self.etherscan_key = os.getenv("ETHERSCAN_KEY")
self.infura_key = os.getenv("INFURA_KEY")
self.whale_threshold_usd = 100000
self.contracts_db = contracts_db or {}
# قاعدة بيانات العناوين المصنفة ديناميكياً
self.address_labels = {}
self._initialize_dynamic_labels()
# تخزين بيانات صافي التدفق
self.netflow_data = {
'ethereum': defaultdict(lambda: {
'inflow': deque(maxlen=288),
'outflow': deque(maxlen=288),
'netflow': deque(maxlen=288),
'timestamps': deque(maxlen=288),
'volume_24h': 0
}),
'bsc': defaultdict(lambda: {
'inflow': deque(maxlen=288),
'outflow': deque(maxlen=288),
'netflow': deque(maxlen=288),
'timestamps': deque(maxlen=288),
'volume_24h': 0
})
}
# إحصائيات استخدام APIs
self.api_usage_stats = {
'etherscan': {
'requests_today': 0,
'requests_per_second': 0,
'last_request_time': time.time(),
'last_reset': datetime.now().date()
},
'infura': {
'requests_today': 0,
'requests_per_second': 0,
'last_request_time': time.time(),
'last_reset': datetime.now().date()
}
}
# مصادر RPC متعددة
self.rpc_endpoints = {
'ethereum': [
'https://rpc.ankr.com/eth',
'https://cloudflare-eth.com',
'https://eth.llamarpc.com'
],
'bsc': [
'https://bsc-dataseed.binance.org/',
'https://bsc-dataseed1.defibit.io/',
'https://bsc-dataseed1.ninicoin.io/'
]
}
# إضافة Infura إذا كان المفتاح متوفرًا
if self.infura_key:
infura_endpoint = f"https://mainnet.infura.io/v3/{self.infura_key}"
self.rpc_endpoints['ethereum'].insert(0, infura_endpoint)
print(f"✅ تم تكوين Infura بنجاح - الشبكة: Ethereum")
self.current_rpc_index = {network: 0 for network in self.rpc_endpoints.keys()}
self.rpc_failures = {network: 0 for network in self.rpc_endpoints.keys()}
self.price_cache = {}
self.last_scan_time = {}
# رموز KuCoin للعملات
self.kucoin_symbols = {
'ethereum': 'ETH',
'bsc': 'BNB'
}
print("🎯 نظام تتبع الحيتان المحسن - تحليل صافي التدفق الذكي مفعل")
def _initialize_dynamic_labels(self):
"""تهيئة التصنيفات الديناميكية للعناوين"""
# فئات التصنيف
self.address_categories = {
'exchange': set(),
'cex': set(),
'dex': set(),
'institution': set(),
'whale': set(),
'contract': set(),
'unknown': set()
}
# أنماط العناوين للمنصات المركزية
self.exchange_patterns = {
'binance': ['0x3f5ce5fbfe3e9af3971dd833d26ba9b5c936f0be', '0xd551234ae421e3bcba99a0da6d736074f22192ff'],
'coinbase': ['0x71660c4005ba85c37ccec55d0c4493e66fe775d3', '0x503828976d22510aad0201ac7ec88293211d23da'],
'kraken': ['0x2910543af39aba0cd09dbb2d50200b3e800a63d2', '0xa160cdab225685da1d56aa342ad8841c3b53f291'],
'kucoin': ['0x2b5634c42055806a59e9107ed44d43c426e58258', '0x689c56aef474df92d44a1b70850f808488f9769c']
}
# تحميل العناوين الأولية
self._load_initial_exchange_addresses()
def _load_initial_exchange_addresses(self):
"""تحميل عناوين المنصات المعروفة مبدئياً"""
for exchange, addresses in self.exchange_patterns.items():
for address in addresses:
self.address_labels[address.lower()] = 'cex'
self.address_categories['cex'].add(address.lower())
self.address_categories['exchange'].add(address.lower())
def _classify_address_dynamic(self, address, transaction_history=None):
"""تصنيف ديناميكي للعناوين بناءً على الأنماط السلوكية"""
address_lower = address.lower()
# التحقق من العناوين المصنفة مسبقاً
if address_lower in self.address_labels:
return self.address_labels[address_lower]
# تصنيف بناءً على أنماط المعاملات
if transaction_history:
if self._detect_exchange_pattern(transaction_history):
self.address_labels[address_lower] = 'suspected_cex'
self.address_categories['cex'].add(address_lower)
return 'suspected_cex'
if self._detect_whale_pattern(transaction_history):
self.address_labels[address_lower] = 'suspected_whale'
self.address_categories['whale'].add(address_lower)
return 'suspected_whale'
if self._detect_contract_pattern(transaction_history):
self.address_labels[address_lower] = 'contract_user'
self.address_categories['contract'].add(address_lower)
return 'contract_user'
# إذا لم يتم التصنيف، نضيفه للمجهول
self.address_labels[address_lower] = 'unknown'
self.address_categories['unknown'].add(address_lower)
return 'unknown'
def _detect_exchange_pattern(self, transactions):
"""اكتشاف نمط المنصات"""
if len(transactions) < 10:
return False
unique_senders = set()
unique_receivers = set()
for tx in transactions[-20:]:
if 'from' in tx:
unique_senders.add(tx['from'])
if 'to' in tx:
unique_receivers.add(tx['to'])
if len(unique_senders) > 15 and len(unique_receivers) < 5:
return True
return False
def _detect_whale_pattern(self, transactions):
"""اكتشاف نمط الحيتان"""
large_txs = [tx for tx in transactions if tx.get('value_usd', 0) > 100000]
return len(large_txs) >= 3
def _detect_contract_pattern(self, transactions):
"""اكتشاف نمط العقود"""
contract_interactions = [tx for tx in transactions if tx.get('to', '') and len(tx.get('to', '')) == 42 and tx.get('input', '0x') != '0x']
return len(contract_interactions) > len(transactions) * 0.7
def _is_exchange_address(self, address):
"""التحقق إذا كان العنوان ينتمي لمنصة"""
address_lower = address.lower()
return (address_lower in self.address_categories['cex'] or
address_lower in self.address_categories['exchange'] or
self.address_labels.get(address_lower) in ['cex', 'suspected_cex'])
async def _update_netflow_metrics(self, network, token_symbol, from_address, to_address, value_usd, transaction_hash):
"""تحديث مقاييس صافي التدفق مع التصنيف الذكي"""
try:
# تصنيف العناوين
from_label = self._classify_address_dynamic(from_address)
to_label = self._classify_address_dynamic(to_address)
# تحديث التدفق الداخل إلى المنصات
if self._is_exchange_address(to_address):
if token_symbol not in self.netflow_data[network]:
self._initialize_token_metrics(network, token_symbol)
self.netflow_data[network][token_symbol]['inflow'].append(value_usd)
self.netflow_data[network][token_symbol]['timestamps'].append(datetime.now())
print(f"📥 تدفق إلى منصة: {value_usd:,.0f} USD ({token_symbol})")
# تحديث التدفق الخارج من المنصات
if self._is_exchange_address(from_address):
if token_symbol not in self.netflow_data[network]:
self._initialize_token_metrics(network, token_symbol)
self.netflow_data[network][token_symbol]['outflow'].append(value_usd)
self.netflow_data[network][token_symbol]['timestamps'].append(datetime.now())
print(f"📤 تدفق من منصة: {value_usd:,.0f} USD ({token_symbol})")
# حساب صافي التدفق الحالي
if token_symbol in self.netflow_data[network]:
current_inflow = sum(list(self.netflow_data[network][token_symbol]['inflow'])[-12:])
current_outflow = sum(list(self.netflow_data[network][token_symbol]['outflow'])[-12:])
current_netflow = current_inflow - current_outflow
self.netflow_data[network][token_symbol]['netflow'].append(current_netflow)
except Exception as e:
print(f"⚠️ خطأ في تحديث مقاييس صافي التدفق: {e}")
def _initialize_token_metrics(self, network, token_symbol):
"""تهيئة مقاييس الرمز المميز"""
self.netflow_data[network][token_symbol] = {
'inflow': deque(maxlen=288),
'outflow': deque(maxlen=288),
'netflow': deque(maxlen=288),
'timestamps': deque(maxlen=288),
'volume_24h': 0
}
def _calculate_netflow_zscore(self, network, token_symbol, window_hours=24):
"""حساب Z-score لصافي التدفق"""
try:
if token_symbol not in self.netflow_data[network]:
return 0
data = self.netflow_data[network][token_symbol]
netflow_values = list(data['netflow'])
if len(netflow_values) < 10:
return 0
window_size = min(len(netflow_values), window_hours * 12)
recent_values = netflow_values[-window_size:]
if len(recent_values) < 5:
return 0
mean_val = np.mean(recent_values)
std_val = np.std(recent_values)
if std_val == 0:
return 0
current_netflow = recent_values[-1] if recent_values else 0
zscore = (current_netflow - mean_val) / std_val
return zscore
except Exception as e:
print(f"⚠️ خطأ في حساب Z-score: {e}")
return 0
def _generate_netflow_signal(self, network, token_symbol):
"""توليد إشارات تداول بناءً على صافي التدفق"""
try:
if token_symbol not in self.netflow_data[network]:
return None
data = self.netflow_data[network][token_symbol]
netflow_values = list(data['netflow'])
if len(netflow_values) < 12:
return None
recent_inflow = sum(list(data['inflow'])[-12:])
recent_outflow = sum(list(data['outflow'])[-12:])
recent_netflow = recent_inflow - recent_outflow
zscore = self._calculate_netflow_zscore(network, token_symbol)
signal = {
'symbol': token_symbol,
'network': network,
'netflow_1h': recent_netflow,
'inflow_1h': recent_inflow,
'outflow_1h': recent_outflow,
'z_score': zscore,
'timestamp': datetime.now().isoformat()
}
if recent_netflow < -500000 and zscore < -2.5:
signal.update({
'action': 'STRONG_SELL',
'confidence': min(0.95, abs(zscore) / 3),
'reason': f'تدفق بيعي قوي: ${abs(recent_netflow):,.0f} إلى المنصات',
'critical_alert': abs(recent_netflow) > 1000000
})
return signal
elif recent_netflow < -100000 and zscore < -1.5:
signal.update({
'action': 'SELL',
'confidence': min(0.8, abs(zscore) / 2),
'reason': f'تدفق بيعي: ${abs(recent_netflow):,.0f} إلى المنصات',
'critical_alert': False
})
return signal
elif recent_netflow > 500000 and zscore > 2.5:
signal.update({
'action': 'STRONG_BUY',
'confidence': min(0.95, zscore / 3),
'reason': f'تدفق شرائي قوي: ${recent_netflow:,.0f} من المنصات',
'critical_alert': recent_netflow > 1000000
})
return signal
elif recent_netflow > 100000 and zscore > 1.5:
signal.update({
'action': 'BUY',
'confidence': min(0.8, zscore / 2),
'reason': f'تدفق شرائي: ${recent_netflow:,.0f} من المنصات',
'critical_alert': False
})
return signal
signal.update({
'action': 'HOLD',
'confidence': 0.5,
'reason': f'تدفق متوازن: ${recent_netflow:,.0f}',
'critical_alert': False
})
return signal
except Exception as e:
print(f"⚠️ خطأ في توليد إشارة التداول: {e}")
return None
async def _scan_single_evm_network(self, network):
"""مسح شبكة EVM واحدة مع تحليل صافي التدفق"""
whale_alerts = []
trading_signals = []
try:
price_usd = await self._get_native_coin_price(network)
if price_usd is None:
print(f"⚠️ سعر {network} غير متوفر، تخطي المسح")
return [], []
latest_block_hex = await self._call_rpc_async(network, 'eth_blockNumber')
if not latest_block_hex:
return [], []
latest_block = int(latest_block_hex, 16)
blocks_to_scan = 15
scanned_blocks = 0
for block_offset in range(blocks_to_scan):
block_number = latest_block - block_offset
if block_number < 0:
break
block_data = await self._call_rpc_async(network, 'eth_getBlockByNumber', [hex(block_number), True])
if not block_data or 'transactions' not in block_data:
continue
scanned_blocks += 1
block_timestamp_hex = block_data.get('timestamp', '0x0')
block_timestamp = int(block_timestamp_hex, 16)
block_time = datetime.fromtimestamp(block_timestamp)
time_ago = datetime.now() - block_time
for tx in block_data.get('transactions', []):
value_wei = int(tx.get('value', '0x0'), 16)
if value_wei > 0:
value_native = value_wei / 1e18
value_usd = value_native * price_usd
if value_usd >= self.whale_threshold_usd:
from_address = tx.get('from', '')
to_address = tx.get('to', '')
tx_hash = tx.get('hash', '')
await self._update_netflow_metrics(network, 'NATIVE', from_address, to_address, value_usd, tx_hash)
from_label = self._classify_address_dynamic(from_address)
to_label = self._classify_address_dynamic(to_address)
whale_alerts.append({
'network': network,
'value_usd': value_usd,
'from': from_address,
'to': to_address,
'from_label': from_label,
'to_label': to_label,
'hash': tx_hash,
'block_number': block_number,
'timestamp': block_timestamp,
'human_time': block_time.isoformat(),
'minutes_ago': time_ago.total_seconds() / 60,
'transaction_type': 'native_transfer',
'flow_direction': 'TO_EXCHANGE' if self._is_exchange_address(to_address) else
'FROM_EXCHANGE' if self._is_exchange_address(from_address) else 'UNKNOWN'
})
if block_offset % 3 == 0:
await asyncio.sleep(0.1)
signal = self._generate_netflow_signal(network, 'NATIVE')
if signal:
trading_signals.append(signal)
print(f"✅ مسح {network}: {scanned_blocks} كتل، {len(whale_alerts)} تنبيهات، {len(trading_signals)} إشارات")
except Exception as e:
print(f"⚠️ خطأ في مسح شبكة {network}: {e}")
return whale_alerts, trading_signals
async def get_general_whale_activity(self):
"""الوظيفة الرئيسية لمراقبة الحيتان"""
print("🌊 بدء مراقبة الحيتان وتحليل صافي التدفق...")
try:
tasks = []
networks_to_scan = ['ethereum', 'bsc']
for network in networks_to_scan:
tasks.append(self._scan_single_evm_network(network))
results = await asyncio.gather(*tasks, return_exceptions=True)
all_alerts = []
all_signals = []
successful_networks = 0
for res in results:
if isinstance(res, tuple) and len(res) == 2:
alerts, signals = res
all_alerts.extend(alerts)
all_signals.extend(signals)
successful_networks += 1
all_alerts.sort(key=lambda x: x['timestamp'], reverse=True)
total_volume = sum(alert['value_usd'] for alert in all_alerts)
alert_count = len(all_alerts)
exchange_inflow = sum(alert['value_usd'] for alert in all_alerts
if alert['flow_direction'] == 'TO_EXCHANGE')
exchange_outflow = sum(alert['value_usd'] for alert in all_alerts
if alert['flow_direction'] == 'FROM_EXCHANGE')
net_exchange_flow = exchange_inflow - exchange_outflow
critical_signals = [s for s in all_signals if s.get('critical_alert', False)]
if not all_alerts:
return {
'data_available': False,
'description': 'غير متوفر - لم يتم اكتشاف نشاط حيتان كبير',
'critical_alert': False,
'sentiment': 'UNKNOWN',
'total_volume_usd': 0,
'transaction_count': 0,
'data_quality': 'HIGH',
'networks_scanned': successful_networks,
'trading_signals': all_signals,
'netflow_analysis': {
'inflow_to_exchanges': 0,
'outflow_from_exchanges': 0,
'net_flow': 0,
'flow_direction': 'BALANCED'
}
}
latest_alert = all_alerts[0] if all_alerts else None
latest_time_info = f"آخر نشاط منذ {latest_alert['minutes_ago']:.1f} دقيقة" if latest_alert else ""
if net_exchange_flow < -1000000:
sentiment = 'BEARISH'
flow_description = f"ضغط بيعي قوي: ${abs(net_exchange_flow):,.0f} إلى المنصات"
market_impact = "HIGH"
elif net_exchange_flow < -500000:
sentiment = 'SLIGHTLY_BEARISH'
flow_description = f"ضغط بيعي: ${abs(net_exchange_flow):,.0f} إلى المنصات"
market_impact = "MEDIUM"
elif net_exchange_flow > 1000000:
sentiment = 'BULLISH'
flow_description = f"تراكم شرائي قوي: ${net_exchange_flow:,.0f} من المنصات"
market_impact = "HIGH"
elif net_exchange_flow > 500000:
sentiment = 'SLIGHTLY_BULLISH'
flow_description = f"تراكم شرائي: ${net_exchange_flow:,.0f} من المنصات"
market_impact = "MEDIUM"
else:
sentiment = 'NEUTRAL'
flow_description = f"تدفق متوازن: ${net_exchange_flow:,.0f} صافي"
market_impact = "LOW"
critical_alert = (
total_volume > 10_000_000 or
any(tx['value_usd'] > 5_000_000 for tx in all_alerts) or
abs(net_exchange_flow) > 5_000_000 or
len(critical_signals) > 0
)
description = f"تم اكتشاف {alert_count} معاملة حوت بإجمالي ${total_volume:,.0f} عبر {successful_networks} شبكات. {flow_description}. {latest_time_info}"
return {
'data_available': True,
'description': description,
'critical_alert': critical_alert,
'sentiment': sentiment,
'market_impact': market_impact,
'total_volume_usd': total_volume,
'transaction_count': alert_count,
'netflow_analysis': {
'inflow_to_exchanges': exchange_inflow,
'outflow_from_exchanges': exchange_outflow,
'net_flow': net_exchange_flow,
'flow_direction': 'TO_EXCHANGES' if net_exchange_flow < 0 else 'FROM_EXCHANGES',
'market_impact': market_impact
},
'recent_alerts': all_alerts[:10],
'latest_activity': latest_alert['human_time'] if latest_alert else None,
'trading_signals': all_signals,
'critical_signals_count': len(critical_signals),
'address_classification_stats': {
'total_classified': len(self.address_labels),
'exchange_addresses': len(self.address_categories['cex']),
'whale_addresses': len(self.address_categories['whale']),
'unknown_addresses': len(self.address_categories['unknown'])
},
'data_quality': 'HIGH',
'networks_scanned': successful_networks
}
except Exception as e:
print(f"❌ فشل مراقبة الحيتان العامة: {e}")
return {
'data_available': False,
'description': f'غير متوفر - فشل في مراقبة الحيتان: {str(e)}',
'critical_alert': False,
'sentiment': 'UNKNOWN',
'total_volume_usd': 0,
'transaction_count': 0,
'data_quality': 'LOW',
'error': str(e),
'trading_signals': []
}
async def _get_native_coin_price(self, network):
"""جلب الأسعار من مصادر حقيقية"""
now = time.time()
cache_key = f"{network}_price"
if cache_key in self.price_cache and (now - self.price_cache[cache_key]['timestamp']) < 300:
return self.price_cache[cache_key]['price']
symbol = self.kucoin_symbols.get(network)
if not symbol:
return await self._get_price_from_coingecko_fallback(network)
try:
price = await self._get_price_from_kucoin(symbol)
if price and price > 0:
self.price_cache[cache_key] = {'price': price, 'timestamp': now, 'source': 'kucoin'}
return price
price = await self._get_price_from_coingecko_fallback(network)
if price and price > 0:
self.price_cache[cache_key] = {'price': price, 'timestamp': now, 'source': 'coingecko'}
return price
return None
except Exception as e:
print(f"⚠️ فشل جلب سعر {network}: {e}")
return None
async def _get_price_from_kucoin(self, symbol):
"""جلب السعر من KuCoin"""
try:
exchange = ccxt.kucoin({
'sandbox': False,
'enableRateLimit': True
})
trading_symbol = f"{symbol}/USDT"
try:
ticker = await exchange.fetch_ticker(trading_symbol)
price = ticker.get('last')
if price and price > 0:
print(f"✅ سعر {symbol} من KuCoin: ${price:.2f}")
await exchange.close()
return float(price)
except Exception as e:
print(f"⚠️ رمز التداول {trading_symbol} غير مدعوم في KuCoin: {e}")
await exchange.close()
return None
except Exception as e:
print(f"⚠️ فشل جلب السعر من KuCoin لـ {symbol}: {e}")
return None
async def _get_price_from_coingecko_fallback(self, network):
"""الاحتياطي: جلب السعر من CoinGecko"""
coin_map = {
'ethereum': 'ethereum',
'bsc': 'binancecoin'
}
coin_id = coin_map.get(network)
if not coin_id:
return None
try:
await asyncio.sleep(0.5)
url = f"https://api.coingecko.com/api/v3/simple/price?ids={coin_id}&vs_currencies=usd"
response = await self.http_client.get(url)
response.raise_for_status()
price = response.json().get(coin_id, {}).get('usd', 0)
if price > 0:
print(f"✅ سعر {network} من CoinGecko: ${price:.2f}")
return price
return None
except Exception as e:
print(f"⚠️ فشل جلب سعر {network} من CoinGecko: {e}")
return None
async def _call_rpc_async(self, network, method, params=[]):
"""اتصال RPC غير متزامن"""
max_retries = 2
for attempt in range(max_retries):
endpoint = self._get_next_rpc_endpoint(network)
if not endpoint:
print(f"❌ لا توجد نقاط نهاية RPC متاحة لـ {network}")
return None
try:
if 'infura' in endpoint and self.infura_key:
self._update_api_usage_stats('infura')
if await self._api_rate_limit_delay('infura'):
continue
payload = {"jsonrpc": "2.0", "method": method, "params": params, "id": 1}
timeout = 25.0 if method == 'eth_getBlockByNumber' else 12.0
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(endpoint, json=payload)
if response.status_code == 401:
print(f"🔐 خطأ مصادقة في {endpoint}")
self.rpc_failures[network] += 1
continue
elif response.status_code == 429:
print(f"⏳ Rate limit على {endpoint}")
await asyncio.sleep(2 * (attempt + 1))
continue
response.raise_for_status()
result = response.json().get('result')
self.rpc_failures[network] = 0
return result
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
print(f"⚠️ Rate limit على {endpoint} لـ {network}")
self.rpc_failures[network] += 1
await asyncio.sleep(3 * (attempt + 1))
continue
elif e.response.status_code == 401:
print(f"🔐 خطأ مصادقة في {endpoint}")
self.rpc_failures[network] += 1
continue
else:
print(f"⚠️ خطأ HTTP {e.response.status_code} في {endpoint}")
self.rpc_failures[network] += 1
except Exception as e:
print(f"⚠️ فشل اتصال RPC لـ {network}: {e}")
self.rpc_failures[network] += 1
if attempt < max_retries - 1:
await asyncio.sleep(1 * (attempt + 1))
print(f"❌ فشل جميع محاولات RPC لـ {network}")
return None
def _get_next_rpc_endpoint(self, network):
"""الحصول على عنوان RPC التالي"""
if network not in self.rpc_endpoints:
return None
endpoints = self.rpc_endpoints[network]
if not endpoints:
return None
index = self.current_rpc_index[network]
endpoint = endpoints[index]
self.current_rpc_index[network] = (index + 1) % len(endpoints)
return endpoint
def _update_api_usage_stats(self, api_name):
"""تحديث إحصائيات استخدام API"""
now = datetime.now()
current_date = now.date()
stats = self.api_usage_stats[api_name]
if current_date != stats['last_reset']:
stats['requests_today'] = 0
stats['last_reset'] = current_date
current_time = time.time()
time_diff = current_time - stats['last_request_time']
if time_diff < 1.0:
stats['requests_per_second'] += 1
else:
stats['requests_per_second'] = 1
stats['last_request_time'] = current_time
stats['requests_today'] += 1
if api_name == 'etherscan':
if stats['requests_today'] > 90000:
print(f"🚨 تحذير: طلبات {api_name} اليومية تقترب من الحد")
if stats['requests_per_second'] > 4:
print(f"🚨 تحذير: طلبات {api_name} في الثانية تقترب من الحد")
elif api_name == 'infura':
if stats['requests_today'] > 2500000:
print(f"🚨 تحذير: طلبات {api_name} اليومية تقترب من الحد")
if stats['requests_per_second'] > 450:
print(f"🚨 تحذير: طلبات {api_name} في الثانية تقترب من الحد")
async def _api_rate_limit_delay(self, api_name):
"""تأخير ذكي لتجنب تجاوز حدود API"""
stats = self.api_usage_stats[api_name]
if api_name == 'etherscan':
if stats['requests_per_second'] > 4:
delay = 0.2 * (stats['requests_per_second'] - 4)
print(f"⏳ تأخير {delay:.2f} ثانية لـ {api_name}")
await asyncio.sleep(delay)
if stats['requests_today'] > 95000:
print(f"🚨 تجاوز الحد اليومي لطلبات {api_name}")
return True
elif api_name == 'infura':
if stats['requests_per_second'] > 400:
delay = 0.1 * (stats['requests_per_second'] - 400)
print(f"⏳ تأخير {delay:.2f} ثانية لـ {api_name}")
await asyncio.sleep(delay)
if stats['requests_today'] > 2800000:
print(f"🚨 تجاوز الحد اليومي لطلبات {api_name}")
return True
return False
async def get_symbol_specific_whale_data(self, symbol, contract_address=None):
"""جلب بيانات الحيتان الخاصة بعملة محددة"""
try:
base_symbol = symbol.split("/")[0] if '/' in symbol else symbol
if not contract_address:
contract_address = await self._find_contract_address(base_symbol)
if not contract_address:
return await self._scan_networks_for_symbol(symbol, base_symbol)
print(f"🔍 جلب بيانات الحيتان لـ {symbol}")
api_data = await self._get_combined_api_data(contract_address)
if api_data:
enriched_data = await self._enrich_api_data_with_timing(api_data)
return self._analyze_symbol_specific_data(enriched_data, symbol)
else:
return await self._scan_networks_for_symbol(symbol, base_symbol)
except Exception as e:
print(f"❌ فشل جلب بيانات الحيتان لـ {symbol}: {e}")
return {
'data_available': False,
'description': f'غير متوفر - خطأ في جلب بيانات الحيتان',
'total_volume': 0,
'transfer_count': 0,
'source': 'error'
}
async def _get_combined_api_data(self, contract_address):
"""جلب البيانات المجمعة من مصادر API"""
tasks = []
if self.moralis_key:
tasks.append(self._get_moralis_token_data(contract_address))
if self.etherscan_key:
tasks.append(self._get_etherscan_token_data_v2(contract_address))
if not tasks:
return []
results = await asyncio.gather(*tasks, return_exceptions=True)
all_transfers = []
for res in results:
if isinstance(res, list):
all_transfers.extend(res)
return all_transfers
async def _get_etherscan_token_data_v2(self, contract_address):
"""جلب بيانات Etherscan"""
if not self.etherscan_key:
return []
try:
self._update_api_usage_stats('etherscan')
if await self._api_rate_limit_delay('etherscan'):
print("⚠️ تجاوز حدود Etherscan، تخطي الطلب")
return []
params = {
"module": "account",
"action": "tokentx",
"contractaddress": contract_address,
"page": 1,
"offset": 10,
"sort": "desc",
"apikey": self.etherscan_key
}
base_url = "https://api.etherscan.io/api"
print(f"🔍 جلب بيانات Etherscan للعقد: {contract_address[:10]}...")
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(base_url, params=params)
if response.status_code == 429:
print("⏳ تجاوز حد معدل Etherscan")
await asyncio.sleep(2)
return []
response.raise_for_status()
data = response.json()
if data.get('status') == '1' and data.get('message') == 'OK':
result = data.get('result', [])
print(f"✅ بيانات Etherscan: {len(result)} تحويل")
return result
else:
error_message = data.get('message', 'Unknown error')
print(f"⚠️ خطأ في استجابة Etherscan: {error_message}")
return []
except httpx.HTTPStatusError as e:
print(f"⚠️ خطأ HTTP في Etherscan API: {e.response.status_code}")
return []
except Exception as e:
print(f"⚠️ فشل جلب بيانات Etherscan: {e}")
return []
async def _get_moralis_token_data(self, contract_address):
"""جلب بيانات Moralis"""
if not self.moralis_key:
return []
try:
response = await self.http_client.get(
f"https://deep-index.moralis.io/api/v2/erc20/{contract_address}/transfers",
headers={"X-API-Key": self.moralis_key},
params={"chain": "eth", "limit": 10}
)
if response.status_code == 200:
result = response.json().get('result', [])
print(f"✅ بيانات Moralis: {len(result)} تحويل")
return result
else:
print(f"⚠️ خطأ Moralis API: {response.status_code}")
return []
except Exception as e:
print(f"⚠️ Moralis API error: {e}")
return []
async def _enrich_api_data_with_timing(self, api_data):
"""إثراء بيانات API بتوقيتات إضافية"""
enriched_data = []
for transfer in api_data:
try:
if 'timeStamp' in transfer:
timestamp = int(transfer['timeStamp'])
elif 'block_timestamp' in transfer:
timestamp = int(transfer['block_timestamp'])
else:
timestamp = int(time.time())
transfer_time = datetime.fromtimestamp(timestamp)
time_ago = datetime.now() - transfer_time
enriched_transfer = {
**transfer,
'human_time': transfer_time.isoformat(),
'minutes_ago': time_ago.total_seconds() / 60,
'timestamp': timestamp
}
enriched_data.append(enriched_transfer)
except Exception as e:
print(f"⚠️ خطأ في إثراء بيانات التحويل: {e}")
continue
return enriched_data
def _analyze_symbol_specific_data(self, enriched_data, symbol):
"""تحليل بيانات الرمز المحدد"""
if not enriched_data:
return {
'data_available': False,
'description': f'غير متوفر - لا توجد بيانات تحويل لـ {symbol}',
'total_volume': 0,
'transfer_count': 0,
'source': 'no_data'
}
try:
volumes = []
large_transfers = []
for transfer in enriched_data:
value = float(transfer.get('value', 0))
volumes.append(value)
if value > 10000:
large_transfers.append(transfer)
total_volume = sum(volumes)
transfer_count = len(volumes)
avg_volume = total_volume / transfer_count if transfer_count > 0 else 0
latest_transfer = max(enriched_data, key=lambda x: x['timestamp'])
oldest_transfer = min(enriched_data, key=lambda x: x['timestamp'])
time_range_hours = (latest_transfer['timestamp'] - oldest_transfer['timestamp']) / 3600
if len(large_transfers) > 5:
activity_level = 'HIGH'
description = f"نشاط حيتان مرتفع لـ {symbol}: {len(large_transfers)} تحويل كبير"
elif len(large_transfers) > 2:
activity_level = 'MEDIUM'
description = f"نشاط حيتان متوسط لـ {symbol}: {len(large_transfers)} تحويل كبير"
else:
activity_level = 'LOW'
description = f"نشاط حيتان منخفض لـ {symbol}: {len(large_transfers)} تحويل كبير"
return {
'data_available': True,
'description': description,
'total_volume': total_volume,
'transfer_count': transfer_count,
'average_volume': avg_volume,
'large_transfers_count': len(large_transfers),
'activity_level': activity_level,
'latest_transfer_time': latest_transfer['human_time'],
'time_range_hours': time_range_hours,
'source': 'api_combined',
'recent_large_transfers': large_transfers[:5]
}
except Exception as e:
print(f"❌ خطأ في تحليل بيانات {symbol}: {e}")
return {
'data_available': False,
'description': f'غير متوفر - خطأ في تحليل البيانات',
'total_volume': 0,
'transfer_count': 0,
'source': 'error'
}
async def _find_contract_address(self, symbol):
"""البحث عن عنوان العقد للرمز المحدد"""
symbol_lower = symbol.lower()
for key, address in self.contracts_db.items():
if symbol_lower in key.lower():
return address
print(f"🔍 لم يتم العثور على عقد لـ {symbol} في قاعدة البيانات")
return None
async def _scan_networks_for_symbol(self, symbol, base_symbol):
"""مسح الشبكات للعثور على الرمز"""
print(f"🔍 مسح الشبكات للعثور على {symbol}...")
networks_to_scan = ['ethereum', 'bsc']
for network in networks_to_scan:
try:
price = await self._get_native_coin_price(network)
if price:
print(f"✅ تم العثور على {symbol} على شبكة {network} بسعر ${price:.2f}")
return {
'data_available': True,
'description': f'تم اكتشاف {symbol} على شبكة {network}',
'network': network,
'price_usd': price,
'source': 'network_scan'
}
except Exception as e:
print(f"⚠️ فشل مسح {network} لـ {symbol}: {e}")
continue
return {
'data_available': False,
'description': f'غير متوفر - لم يتم العثور على {symbol} على أي شبكة',
'source': 'not_found'
}
def get_api_usage_stats(self):
"""الحصول على إحصائيات استخدام APIs"""
stats = {}
for api_name, api_stats in self.api_usage_stats.items():
if api_name == 'etherscan':
daily_limit = 100000
per_second_limit = 5
elif api_name == 'infura':
daily_limit = 3000000
per_second_limit = 500
else:
continue
stats[api_name] = {
'requests_today': api_stats['requests_today'],
'requests_per_second': api_stats['requests_per_second'],
'daily_limit_remaining': daily_limit - api_stats['requests_today'],
'usage_percentage': (api_stats['requests_today'] / daily_limit) * 100,
'per_second_usage_percentage': (api_stats['requests_per_second'] / per_second_limit) * 100,
'last_reset': api_stats['last_reset'].isoformat(),
'api_available': getattr(self, f'{api_name}_key') is not None
}
return stats
# إنشاء نسخة عالمية
whale_monitor_global = EnhancedWhaleMonitor()
class DataManager:
def __init__(self, contracts_db):
self.contracts_db = contracts_db or {}
try:
self.exchange = ccxt.kucoin({
'sandbox': False,
'enableRateLimit': True
})
self.exchange.rateLimit = 800
print("✅ تم تهيئة KuCoin (الوضع العام)")
except Exception as e:
print(f"⚠️ فشل تهيئة KuCoin: {e}")
self.exchange = None
self._whale_data_cache = {}
self.http_client = None
self.fetch_stats = {'successful_fetches': 0, 'failed_fetches': 0, 'rate_limit_hits': 0}
self.whale_monitor = EnhancedWhaleMonitor(contracts_db)
self.price_cache = {}
async def initialize(self):
self.http_client = httpx.AsyncClient(timeout=20.0)
api_status = {
'KUCOIN': '🟢 عام (بدون مفتاح)',
'MORALIS_KEY': "🟢 متوفر" if os.getenv('MORALIS_KEY') else "🔴 غير متوفر",
'ETHERSCAN_KEY': "🟢 متوفر" if os.getenv('ETHERSCAN_KEY') else "🔴 غير متوفر",
'INFURA_KEY': "🟢 متوفر" if os.getenv('INFURA_KEY') else "🔴 غير متوفر"
}
print("✅ DataManager initialized - تحليل صافي التدفق المتقدم")
for key, status in api_status.items():
print(f" {key}: {status}")
async def close(self):
if self.http_client:
await self.http_client.aclose()
if self.exchange:
await self.exchange.close()
async def get_sentiment_safe_async(self):
"""جلب بيانات المشاعر من مصادر حقيقية"""
max_retries = 2
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(timeout=8) as client:
print(f"🎭 جلب بيانات المشاعر - المحاولة {attempt + 1}/{max_retries}...")
r = await client.get("https://api.alternative.me/fng/")
r.raise_for_status()
data = r.json()
if 'data' not in data or not data['data']:
raise ValueError("بيانات المشاعر غير متوفرة في الاستجابة")
latest_data = data['data'][0]
return {
"feargreed_value": int(latest_data['value']),
"feargreed_class": latest_data['value_classification'],
"source": "alternative.me",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
print(f"⚠️ فشل محاولة {attempt + 1}/{max_retries} لجلب بيانات المشاعر: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(1)
return None
async def get_market_context_async(self):
"""جلب سياق السوق من مصادر حقيقية"""
max_retries = 2
for attempt in range(max_retries):
try:
print(f"📊 جلب سياق السوق - المحاولة {attempt + 1}/{max_retries}")
sentiment_task = asyncio.wait_for(self.get_sentiment_safe_async(), timeout=10)
price_task = asyncio.wait_for(self._get_prices_with_fallback(), timeout=15)
whale_task = asyncio.wait_for(self.whale_monitor.get_general_whale_activity(), timeout=30)
results = await asyncio.gather(sentiment_task, price_task, whale_task,
return_exceptions=True)
sentiment_data = results[0] if not isinstance(results[0], Exception) else None
price_data = results[1] if not isinstance(results[1], Exception) else {}
general_whale_activity = results[2] if not isinstance(results[2], Exception) else None
bitcoin_price = price_data.get('bitcoin')
ethereum_price = price_data.get('ethereum')
if bitcoin_price is None or ethereum_price is None:
if attempt < max_retries - 1:
await asyncio.sleep(2)
continue
else:
return self._get_minimal_market_context()
market_trend = self._determine_market_trend(bitcoin_price, sentiment_data, general_whale_activity)
trading_decision = self._analyze_advanced_trading_signals(general_whale_activity, sentiment_data)
market_context = {
'timestamp': datetime.now().isoformat(),
'bitcoin_price_usd': bitcoin_price,
'ethereum_price_usd': ethereum_price,
'fear_and_greed_index': sentiment_data.get('feargreed_value') if sentiment_data else None,
'sentiment_class': sentiment_data.get('feargreed_class') if sentiment_data else 'UNKNOWN',
'general_whale_activity': general_whale_activity or {
'data_available': False,
'description': 'غير متوفر - فشل في مراقبة الحيتان',
'critical_alert': False,
'sentiment': 'UNKNOWN',
'trading_signals': []
},
'market_trend': market_trend,
'trading_decision': trading_decision,
'btc_sentiment': self._get_btc_sentiment(bitcoin_price),
'data_sources': {
'prices': bitcoin_price is not None and ethereum_price is not None,
'sentiment': sentiment_data is not None,
'general_whale_data': general_whale_activity.get('data_available', False) if general_whale_activity else False,
'netflow_analysis': general_whale_activity.get('netflow_analysis', {}).get('market_impact', 'UNKNOWN') if general_whale_activity else 'UNKNOWN'
},
'data_quality': 'HIGH',
'risk_assessment': self._assess_market_risk(general_whale_activity, sentiment_data)
}
print(f"📊 سياق السوق: BTC=${bitcoin_price:,.0f}, ETH=${ethereum_price:,.0f}")
if general_whale_activity and general_whale_activity.get('netflow_analysis'):
netflow = general_whale_activity['netflow_analysis']
print(f"📈 تحليل التدفق: صافي ${netflow['net_flow']:,.0f}")
if general_whale_activity and general_whale_activity.get('trading_signals'):
for signal in general_whale_activity['trading_signals']:
print(f"🎯 {signal['action']}: {signal['reason']}")
return market_context
except Exception as e:
print(f"❌ فشل محاولة {attempt + 1}/{max_retries} لجلب سياق السوق: {e}")
if attempt < max_retries - 1:
await asyncio.sleep(3)
return self._get_minimal_market_context()
def _analyze_advanced_trading_signals(self, whale_activity, sentiment_data):
"""تحليل إشارات التداول المتقدمة"""
if not whale_activity or not whale_activity.get('data_available'):
return {
'action': 'HOLD',
'confidence': 0.0,
'reason': 'غير متوفر - لا توجد بيانات كافية عن الحيتان',
'risk_level': 'UNKNOWN'
}
signals = whale_activity.get('trading_signals', [])
netflow_analysis = whale_activity.get('netflow_analysis', {})
whale_sentiment = whale_activity.get('sentiment', 'NEUTRAL')
strongest_signal = None
for signal in signals:
if not strongest_signal or signal.get('confidence', 0) > strongest_signal.get('confidence', 0):
strongest_signal = signal
if strongest_signal and strongest_signal.get('confidence', 0) > 0.7:
action = strongest_signal['action']
reason = strongest_signal['reason']
confidence = strongest_signal['confidence']
if 'STRONG_' in action:
risk_level = 'HIGH' if 'SELL' in action else 'LOW'
else:
risk_level = 'MEDIUM' if 'SELL' in action else 'LOW'
return {
'action': action,
'confidence': confidence,
'reason': reason,
'risk_level': risk_level,
'source': 'netflow_analysis'
}
net_flow = netflow_analysis.get('net_flow', 0)
flow_direction = netflow_analysis.get('flow_direction', 'BALANCED')
if flow_direction == 'TO_EXCHANGES' and abs(net_flow) > 500000:
return {
'action': 'SELL',
'confidence': 0.6,
'reason': f'تدفق بيعي إلى المنصات: ${abs(net_flow):,.0f}',
'risk_level': 'MEDIUM',
'source': 'netflow_direction'
}
elif flow_direction == 'FROM_EXCHANGES' and net_flow > 500000:
return {
'action': 'BUY',
'confidence': 0.6,
'reason': f'تراكم شرائي من المنصات: ${net_flow:,.0f}',
'risk_level': 'LOW',
'source': 'netflow_direction'
}
return {
'action': 'HOLD',
'confidence': 0.5,
'reason': f'أنماط التدفق طبيعية - مشاعر الحيتان: {whale_sentiment}',
'risk_level': 'LOW',
'source': 'balanced_flow'
}
def _assess_market_risk(self, whale_activity, sentiment_data):
"""تقييم مخاطر السوق"""
risk_factors = []
risk_score = 0
if whale_activity and whale_activity.get('data_available'):
if whale_activity.get('critical_alert', False):
risk_factors.append("نشاط حيتان حرج")
risk_score += 3
netflow = whale_activity.get('netflow_analysis', {})
if netflow.get('flow_direction') == 'TO_EXCHANGES' and abs(netflow.get('net_flow', 0)) > 1000000:
risk_factors.append("تدفق بيعي كبير إلى المنصات")
risk_score += 2
if whale_activity.get('sentiment') == 'BEARISH':
risk_factors.append("مشاعر حيتان هبوطية")
risk_score += 1
if sentiment_data and sentiment_data.get('feargreed_value', 50) < 30:
risk_factors.append("مخاوف السوق عالية")
risk_score += 2
elif sentiment_data and sentiment_data.get('feargreed_value', 50) > 70:
risk_factors.append("جشع السوق مرتفع")
risk_score += 1
if risk_score >= 4:
return {'level': 'HIGH', 'score': risk_score, 'factors': risk_factors}
elif risk_score >= 2:
return {'level': 'MEDIUM', 'score': risk_score, 'factors': risk_factors}
else:
return {'level': 'LOW', 'score': risk_score, 'factors': risk_factors}
def _get_btc_sentiment(self, bitcoin_price):
"""تحديد اتجاه البيتكوين"""
if bitcoin_price is None:
return 'UNKNOWN'
elif bitcoin_price > 60000:
return 'BULLISH'
elif bitcoin_price < 55000:
return 'BEARISH'
else:
return 'NEUTRAL'
async def _get_prices_with_fallback(self):
"""جلب الأسعار من مصادر حقيقية"""
try:
prices = await self._get_prices_from_kucoin_safe()
if prices.get('bitcoin') and prices.get('ethereum'):
return prices
prices = await self._get_prices_from_coingecko()
if prices.get('bitcoin') and prices.get('ethereum'):
return prices
return {'bitcoin': None, 'ethereum': None}
except Exception as e:
print(f"❌ فشل جلب الأسعار: {e}")
return {'bitcoin': None, 'ethereum': None}
async def _get_prices_from_kucoin_safe(self):
"""جلب الأسعار من KuCoin"""
if not self.exchange:
return {'bitcoin': None, 'ethereum': None}
try:
prices = {'bitcoin': None, 'ethereum': None}
try:
btc_ticker = await self.exchange.fetch_ticker('BTC/USDT')
btc_price = float(btc_ticker.get('last', 0)) if btc_ticker.get('last') else None
if btc_price and btc_price > 0:
prices['bitcoin'] = btc_price
self.price_cache['bitcoin'] = btc_price
print(f"✅ BTC من KuCoin: ${btc_price:.0f}")
except Exception as e:
print(f"⚠️ فشل جلب سعر BTC من KuCoin: {e}")
try:
eth_ticker = await self.exchange.fetch_ticker('ETH/USDT')
eth_price = float(eth_ticker.get('last', 0)) if eth_ticker.get('last') else None
if eth_price and eth_price > 0:
prices['ethereum'] = eth_price
self.price_cache['ethereum'] = eth_price
print(f"✅ ETH من KuCoin: ${eth_price:.0f}")
except Exception as e:
print(f"⚠️ فشل جلب سعر ETH من KuCoin: {e}")
return prices
except Exception as e:
print(f"⚠️ فشل جلب الأسعار من KuCoin: {e}")
return {'bitcoin': None, 'ethereum': None}
async def _get_prices_from_coingecko(self):
"""جلب الأسعار من CoinGecko"""
try:
await asyncio.sleep(0.5)
url = "https://api.coingecko.com/api/v3/simple/price?ids=bitcoin,ethereum&vs_currencies=usd"
async with httpx.AsyncClient() as client:
response = await client.get(url, timeout=10)
response.raise_for_status()
data = response.json()
btc_price = data.get('bitcoin', {}).get('usd')
eth_price = data.get('ethereum', {}).get('usd')
if btc_price and eth_price:
self.price_cache['bitcoin'] = btc_price
self.price_cache['ethereum'] = eth_price
print(f"✅ الأسعار من CoinGecko: BTC=${btc_price:.0f}, ETH=${eth_price:.0f}")
return {'bitcoin': btc_price, 'ethereum': eth_price}
return {'bitcoin': None, 'ethereum': None}
except Exception as e:
print(f"⚠️ فشل جلب الأسعار من CoinGecko: {e}")
return {'bitcoin': None, 'ethereum': None}
def _get_minimal_market_context(self):
"""إرجاع سياق سوق أساسي"""
return {
'timestamp': datetime.now().isoformat(),
'data_available': False,
'data_sources': {'prices': False, 'sentiment': False, 'general_whale_data': False},
'error': 'غير متوفر - فشل في جلب بيانات السوق من المصادر الخارجية',
'market_trend': 'UNKNOWN',
'btc_sentiment': 'UNKNOWN',
'data_quality': 'LOW',
'general_whale_activity': {
'data_available': False,
'description': 'غير متوفر - فشل في مراقبة الحيتان',
'critical_alert': False,
'sentiment': 'UNKNOWN'
},
'bitcoin_price_usd': None,
'ethereum_price_usd': None,
'fear_and_greed_index': None,
'sentiment_class': 'UNKNOWN',
'missing_data': ['غير متوفر - أسعار البيتكوين', 'غير متوفر - أسعار الإيثيريوم', 'غير متوفر - بيانات المشاعر', 'غير متوفر - بيانات الحيتان']
}
def _determine_market_trend(self, bitcoin_price, sentiment_data, whale_activity):
"""تحديد اتجاه السوق"""
try:
if bitcoin_price is None:
return "UNKNOWN"
score = 0
data_points = 1
if bitcoin_price > 60000:
score += 1
elif bitcoin_price < 55000:
score -= 1
if sentiment_data and sentiment_data.get('feargreed_value') is not None:
fear_greed = sentiment_data.get('feargreed_value')
if fear_greed > 60:
score += 1
elif fear_greed < 40:
score -= 1
data_points += 1
if (whale_activity and whale_activity.get('data_available') and
whale_activity.get('sentiment') != 'UNKNOWN'):
whale_sentiment = whale_activity.get('sentiment')
if whale_sentiment == 'BULLISH':
score += 1
elif whale_sentiment == 'BEARISH':
score -= 1
data_points += 1
if whale_activity.get('critical_alert', False):
score = -2
if data_points < 2:
return "UNKNOWN"
if score >= 2:
return "bull_market"
elif score <= -2:
return "bear_market"
elif -1 <= score <= 1:
return "sideways_market"
else:
return "volatile_market"
except Exception as e:
print(f"⚠️ خطأ في تحديد اتجاه السوق: {e}")
return "UNKNOWN"
def get_performance_stats(self):
"""الحصول على إحصائيات الأداء"""
total_attempts = self.fetch_stats['successful_fetches'] + self.fetch_stats['failed_fetches']
success_rate = (self.fetch_stats['successful_fetches'] / total_attempts * 100) if total_attempts > 0 else 0
stats = {
'total_attempts': total_attempts,
'successful_fetches': self.fetch_stats['successful_fetches'],
'failed_fetches': self.fetch_stats['failed_fetches'],
'rate_limit_hits': self.fetch_stats['rate_limit_hits'],
'success_rate': f"{success_rate:.1f}%",
'timestamp': datetime.now().isoformat(),
'exchange_available': self.exchange is not None
}
api_stats = self.whale_monitor.get_api_usage_stats()
stats['api_usage'] = api_stats
return stats
async def get_symbol_specific_whale_data(self, symbol, contract_address=None):
"""جلب بيانات الحيتان الخاصة بعملة محددة"""
return await self.whale_monitor.get_symbol_specific_whale_data(symbol, contract_address)
print("✅ Enhanced Data Manager Loaded - جميع البيانات حقيقية")