Trad / learning_hub /hub_manager.py
Riy777's picture
Update learning_hub/hub_manager.py
629cd1d verified
# learning_hub/hub_manager.py
# (محدث بالكامل - V4 - Adaptive Hybrid Weights Support)
import asyncio
import traceback
from typing import Any, Dict, List
from datetime import datetime, timezone
from collections import defaultdict
# (استيراد جميع المكونات الداخلية للمركز)
from .schemas import *
from .policy_engine import PolicyEngine
from .memory_store import MemoryStore
from .statistical_analyzer import StatisticalAnalyzer
from .reflector import Reflector
from .curator import Curator
# استيراد لتحليل الارتباط (Whale Learning)
try:
import numpy as np
from scipy.stats import pearsonr
NUMPY_AVAILABLE = True
except ImportError:
print("❌ [HubManager] مكتبة numpy أو scipy غير مثبتة! لن يعمل تعلم الحيتان.")
NUMPY_AVAILABLE = False
class LearningHubManager:
def __init__(self, r2_service: Any, llm_service: Any, data_manager: Any):
print("🚀 Initializing Learning Hub Manager (V4 - Adaptive)...")
# 1. الخدمات الأساسية
self.r2_service = r2_service
self.llm_service = llm_service
self.data_manager = data_manager
# 2. تهيئة المكونات
self.policy_engine = PolicyEngine()
self.memory_store = MemoryStore(
r2_service=self.r2_service,
policy_engine=self.policy_engine,
llm_service=self.llm_service
)
self.reflector = Reflector(
llm_service=self.llm_service,
memory_store=self.memory_store
)
self.curator = Curator(
llm_service=self.llm_service,
memory_store=self.memory_store
)
self.statistical_analyzer = StatisticalAnalyzer(
r2_service=self.r2_service,
data_manager=self.data_manager
)
# متغيرات حالة لتعلم الحيتان
self.whale_learning_lock = asyncio.Lock()
self.optimal_whale_config = {}
self.initialized = False
print("✅ Learning Hub Manager constructed. Ready for initialization.")
async def initialize(self):
"""تهيئة جميع الأنظمة الفرعية"""
if self.initialized: return
print("🔄 [HubManager] Initializing all sub-modules...")
# تهيئة المحلل الإحصائي (المسؤول عن الأوزان المتكيفة)
await self.statistical_analyzer.initialize()
# تحميل إعدادات تعلم الحيتان
if hasattr(self.r2_service, 'load_whale_learning_config_async'):
self.optimal_whale_config = await self.r2_service.load_whale_learning_config_async()
if self.optimal_whale_config:
print(f"✅ [HubManager] Loaded optimal whale config: {self.optimal_whale_config.get('best_metric', 'N/A')}")
self.initialized = True
print("✅ [HubManager] All sub-modules initialized. Learning Hub is LIVE.")
async def analyze_trade_and_learn(self, trade_object: Dict[str, Any], close_reason: str):
"""الدالة الرئيسية للتعلم من الصفقات المغلقة"""
if not self.initialized: return
print(f"🧠 [HubManager] Learning from trade {trade_object.get('symbol')}...")
# 1. التعلم السريع (Reflector)
try:
await self.reflector.analyze_trade_outcome(trade_object, close_reason)
except Exception as e:
print(f"❌ [HubManager] Reflector failed: {e}")
# 2. التعلم البطيء وتكييف الأوزان (StatisticalAnalyzer)
try:
await self.statistical_analyzer.update_statistics(trade_object, close_reason)
except Exception as e:
print(f"❌ [HubManager] StatisticalAnalyzer failed: {e}")
print(f"✅ [HubManager] Learning complete for {trade_object.get('symbol')}.")
async def get_optimized_weights(self, market_condition: str = None) -> Dict[str, Any]:
"""
جلب الأوزان المحسنة (بما في ذلك الأوزان الهجينة المتكيفة).
يستخدمها MLProcessor لتحديث معادلته الهجينة.
"""
if not self.initialized:
return await self.statistical_analyzer.get_default_strategy_weights()
# إرجاع قاموس الأوزان الكامل من المحلل الإحصائي
return self.statistical_analyzer.weights
# --- دوال مساعدة أخرى (للـ LLM وغيرها) ---
async def get_active_context_for_llm(self, domain: str, query: str) -> str:
if not self.initialized: return "Learning Hub not initialized."
return await self.memory_store.get_active_context(domain, query)
async def get_statistical_feedback_for_llm(self, entry_strategy: str) -> str:
if not self.initialized: return "Learning Hub not initialized."
best_profile = await self.statistical_analyzer.get_best_exit_profile(entry_strategy)
if best_profile != "unknown":
return f"Statistical Feedback: For '{entry_strategy}', '{best_profile}' exit profile performed best."
return "No statistical feedback available yet."
async def get_statistical_news_score(self, raw_vader_score: float) -> float:
if not self.initialized: return 0.0
return await self.statistical_analyzer.get_statistical_vader_pnl(raw_vader_score)
async def run_distillation_check(self):
"""تشغيل دوري للتقطير (Curator)"""
if not self.initialized: return
# print("ℹ️ [HubManager] Running distillation check...")
for domain in self.memory_store.domain_files.keys():
await self.curator.check_and_distill_domain(domain)
async def shutdown(self):
"""حفظ كل البيانات عند الإغلاق"""
if not self.initialized: return
print("🔄 [HubManager] Shutting down... Saving learning data.")
try:
# المحلل الإحصائي يحفظ الأوزان المتكيفة وسجل الأداء
await self.statistical_analyzer.save_weights_to_r2()
await self.statistical_analyzer.save_performance_history()
await self.statistical_analyzer.save_exit_profile_effectiveness()
await self.statistical_analyzer.save_vader_effectiveness()
print("✅ [HubManager] Data saved successfully.")
except Exception as e:
print(f"❌ [HubManager] Save failed: {e}")
# --- Whale Learning Loop (كما هي من V3) ---
async def run_whale_learning_check(self):
if not self.initialized: await asyncio.sleep(60)
print(f"🧠 [Whale-Logger] Starting background learning loop...")
await asyncio.sleep(600)
while True:
try:
pending = await self.r2_service.get_pending_whale_learning_records_async()
if not pending:
await asyncio.sleep(600)
continue
now_utc = datetime.now(timezone.utc)
for record in pending:
try:
target_time = datetime.fromisoformat(record['target_time_utc'])
if now_utc >= target_time:
symbol = record['symbol']
target_price = await self.data_manager.get_latest_price_async(symbol)
if target_price and target_price > 0 and record['start_price_usd'] > 0:
pct_change = ((target_price - record['start_price_usd']) / record['start_price_usd']) * 100
record.update({'target_price_usd': target_price, 'price_change_percentage': pct_change, 'status': "COMPLETED"})
await self.r2_service.update_completed_whale_learning_record_async(record)
except Exception: pass
await self.update_optimal_whale_window()
await asyncio.sleep(300)
except Exception as e:
print(f"❌ [Whale-Logger] Error: {e}")
await asyncio.sleep(600)
async def update_optimal_whale_window(self):
if not NUMPY_AVAILABLE: return
async with self.whale_learning_lock:
try:
all_completed = await self.r2_service.get_all_completed_whale_records_async()
if len(all_completed) < 20: return
price_changes = []
metrics_data = defaultdict(lambda: defaultdict(list))
windows = ['30m', '1h', '2h', '4h', '24h']
metric_keys = ['relative_net_flow_percent', 'transaction_density', 'net_flow_usd']
for r in all_completed:
if r.get('price_change_percentage') is None: continue
price_changes.append(r['price_change_percentage'])
analysis = r.get('window_analysis', {})
for w in windows:
for k in metric_keys:
metrics_data[w][k].append(analysis.get(w, {}).get(k, 0.0))
price_np = np.array(price_changes)
best_corr = 0
best_key = None
for w in windows:
for k in metric_keys:
metric_np = np.array(metrics_data[w][k])
if len(metric_np) == len(price_np):
corr, p_val = pearsonr(metric_np, price_np)
if not np.isnan(corr) and p_val < 0.1 and abs(corr) > best_corr:
best_corr = abs(corr)
best_key = f"{w}_{k}"
if best_key:
w, m = best_key.split('_', 1)
new_config = {"best_window": w, "best_metric": m, "correlation_score": best_corr, "total_samples": len(price_np), "last_updated_utc": datetime.now(timezone.utc).isoformat()}
self.optimal_whale_config = new_config
await self.r2_service.save_whale_learning_config_async(new_config)
print(f"🏆 [Whale-Teacher] New best signal: {m} on {w} (Corr: {best_corr:.2f})")
except Exception as e:
print(f"❌ [Whale-Teacher] Error: {e}")