# learning_hub/hub_manager.py # (محدث بالكامل - V4 - Adaptive Hybrid Weights Support) import asyncio import traceback from typing import Any, Dict, List from datetime import datetime, timezone from collections import defaultdict # (استيراد جميع المكونات الداخلية للمركز) from .schemas import * from .policy_engine import PolicyEngine from .memory_store import MemoryStore from .statistical_analyzer import StatisticalAnalyzer from .reflector import Reflector from .curator import Curator # استيراد لتحليل الارتباط (Whale Learning) try: import numpy as np from scipy.stats import pearsonr NUMPY_AVAILABLE = True except ImportError: print("❌ [HubManager] مكتبة numpy أو scipy غير مثبتة! لن يعمل تعلم الحيتان.") NUMPY_AVAILABLE = False class LearningHubManager: def __init__(self, r2_service: Any, llm_service: Any, data_manager: Any): print("🚀 Initializing Learning Hub Manager (V4 - Adaptive)...") # 1. الخدمات الأساسية self.r2_service = r2_service self.llm_service = llm_service self.data_manager = data_manager # 2. تهيئة المكونات self.policy_engine = PolicyEngine() self.memory_store = MemoryStore( r2_service=self.r2_service, policy_engine=self.policy_engine, llm_service=self.llm_service ) self.reflector = Reflector( llm_service=self.llm_service, memory_store=self.memory_store ) self.curator = Curator( llm_service=self.llm_service, memory_store=self.memory_store ) self.statistical_analyzer = StatisticalAnalyzer( r2_service=self.r2_service, data_manager=self.data_manager ) # متغيرات حالة لتعلم الحيتان self.whale_learning_lock = asyncio.Lock() self.optimal_whale_config = {} self.initialized = False print("✅ Learning Hub Manager constructed. Ready for initialization.") async def initialize(self): """تهيئة جميع الأنظمة الفرعية""" if self.initialized: return print("🔄 [HubManager] Initializing all sub-modules...") # تهيئة المحلل الإحصائي (المسؤول عن الأوزان المتكيفة) await self.statistical_analyzer.initialize() # تحميل إعدادات تعلم الحيتان if hasattr(self.r2_service, 'load_whale_learning_config_async'): self.optimal_whale_config = await self.r2_service.load_whale_learning_config_async() if self.optimal_whale_config: print(f"✅ [HubManager] Loaded optimal whale config: {self.optimal_whale_config.get('best_metric', 'N/A')}") self.initialized = True print("✅ [HubManager] All sub-modules initialized. Learning Hub is LIVE.") async def analyze_trade_and_learn(self, trade_object: Dict[str, Any], close_reason: str): """الدالة الرئيسية للتعلم من الصفقات المغلقة""" if not self.initialized: return print(f"🧠 [HubManager] Learning from trade {trade_object.get('symbol')}...") # 1. التعلم السريع (Reflector) try: await self.reflector.analyze_trade_outcome(trade_object, close_reason) except Exception as e: print(f"❌ [HubManager] Reflector failed: {e}") # 2. التعلم البطيء وتكييف الأوزان (StatisticalAnalyzer) try: await self.statistical_analyzer.update_statistics(trade_object, close_reason) except Exception as e: print(f"❌ [HubManager] StatisticalAnalyzer failed: {e}") print(f"✅ [HubManager] Learning complete for {trade_object.get('symbol')}.") async def get_optimized_weights(self, market_condition: str = None) -> Dict[str, Any]: """ جلب الأوزان المحسنة (بما في ذلك الأوزان الهجينة المتكيفة). يستخدمها MLProcessor لتحديث معادلته الهجينة. """ if not self.initialized: return await self.statistical_analyzer.get_default_strategy_weights() # إرجاع قاموس الأوزان الكامل من المحلل الإحصائي return self.statistical_analyzer.weights # --- دوال مساعدة أخرى (للـ LLM وغيرها) --- async def get_active_context_for_llm(self, domain: str, query: str) -> str: if not self.initialized: return "Learning Hub not initialized." return await self.memory_store.get_active_context(domain, query) async def get_statistical_feedback_for_llm(self, entry_strategy: str) -> str: if not self.initialized: return "Learning Hub not initialized." best_profile = await self.statistical_analyzer.get_best_exit_profile(entry_strategy) if best_profile != "unknown": return f"Statistical Feedback: For '{entry_strategy}', '{best_profile}' exit profile performed best." return "No statistical feedback available yet." async def get_statistical_news_score(self, raw_vader_score: float) -> float: if not self.initialized: return 0.0 return await self.statistical_analyzer.get_statistical_vader_pnl(raw_vader_score) async def run_distillation_check(self): """تشغيل دوري للتقطير (Curator)""" if not self.initialized: return # print("ℹ️ [HubManager] Running distillation check...") for domain in self.memory_store.domain_files.keys(): await self.curator.check_and_distill_domain(domain) async def shutdown(self): """حفظ كل البيانات عند الإغلاق""" if not self.initialized: return print("🔄 [HubManager] Shutting down... Saving learning data.") try: # المحلل الإحصائي يحفظ الأوزان المتكيفة وسجل الأداء await self.statistical_analyzer.save_weights_to_r2() await self.statistical_analyzer.save_performance_history() await self.statistical_analyzer.save_exit_profile_effectiveness() await self.statistical_analyzer.save_vader_effectiveness() print("✅ [HubManager] Data saved successfully.") except Exception as e: print(f"❌ [HubManager] Save failed: {e}") # --- Whale Learning Loop (كما هي من V3) --- async def run_whale_learning_check(self): if not self.initialized: await asyncio.sleep(60) print(f"🧠 [Whale-Logger] Starting background learning loop...") await asyncio.sleep(600) while True: try: pending = await self.r2_service.get_pending_whale_learning_records_async() if not pending: await asyncio.sleep(600) continue now_utc = datetime.now(timezone.utc) for record in pending: try: target_time = datetime.fromisoformat(record['target_time_utc']) if now_utc >= target_time: symbol = record['symbol'] target_price = await self.data_manager.get_latest_price_async(symbol) if target_price and target_price > 0 and record['start_price_usd'] > 0: pct_change = ((target_price - record['start_price_usd']) / record['start_price_usd']) * 100 record.update({'target_price_usd': target_price, 'price_change_percentage': pct_change, 'status': "COMPLETED"}) await self.r2_service.update_completed_whale_learning_record_async(record) except Exception: pass await self.update_optimal_whale_window() await asyncio.sleep(300) except Exception as e: print(f"❌ [Whale-Logger] Error: {e}") await asyncio.sleep(600) async def update_optimal_whale_window(self): if not NUMPY_AVAILABLE: return async with self.whale_learning_lock: try: all_completed = await self.r2_service.get_all_completed_whale_records_async() if len(all_completed) < 20: return price_changes = [] metrics_data = defaultdict(lambda: defaultdict(list)) windows = ['30m', '1h', '2h', '4h', '24h'] metric_keys = ['relative_net_flow_percent', 'transaction_density', 'net_flow_usd'] for r in all_completed: if r.get('price_change_percentage') is None: continue price_changes.append(r['price_change_percentage']) analysis = r.get('window_analysis', {}) for w in windows: for k in metric_keys: metrics_data[w][k].append(analysis.get(w, {}).get(k, 0.0)) price_np = np.array(price_changes) best_corr = 0 best_key = None for w in windows: for k in metric_keys: metric_np = np.array(metrics_data[w][k]) if len(metric_np) == len(price_np): corr, p_val = pearsonr(metric_np, price_np) if not np.isnan(corr) and p_val < 0.1 and abs(corr) > best_corr: best_corr = abs(corr) best_key = f"{w}_{k}" if best_key: w, m = best_key.split('_', 1) new_config = {"best_window": w, "best_metric": m, "correlation_score": best_corr, "total_samples": len(price_np), "last_updated_utc": datetime.now(timezone.utc).isoformat()} self.optimal_whale_config = new_config await self.r2_service.save_whale_learning_config_async(new_config) print(f"🏆 [Whale-Teacher] New best signal: {m} on {w} (Corr: {best_corr:.2f})") except Exception as e: print(f"❌ [Whale-Teacher] Error: {e}")