Trad / learning_hub /statistical_analyzer.py
Riy777's picture
Update learning_hub/statistical_analyzer.py
c5478c2
# learning_hub/statistical_analyzer.py
# (V12.3 Full - Adaptive Hybrid Weights + VADER Learning)
import json
import asyncio
import traceback
from datetime import datetime
from typing import Dict, Any, List
import numpy as np
# دوال مساعدة
def normalize_weights(weights_dict):
total = sum(weights_dict.values())
if total > 0:
for key in weights_dict:
weights_dict[key] /= total
return weights_dict
def should_update_weights(history_length):
return history_length % 5 == 0
class StatisticalAnalyzer:
def __init__(self, r2_service: Any, data_manager: Any):
self.r2_service = r2_service
self.data_manager = data_manager
# حالة التعلم
self.weights = {}
self.performance_history = []
self.strategy_effectiveness = {}
self.exit_profile_effectiveness = {}
self.market_patterns = {}
self.vader_bin_effectiveness = {} # لتعلم الأخبار
# 🔴 جديد: تتبع أداء مكونات النظام الهجين
self.component_performance = {
"titan": {"correct_calls": 0, "total_calls": 0, "accuracy": 0.5},
"patterns": {"correct_calls": 0, "total_calls": 0, "accuracy": 0.5},
"monte_carlo": {"correct_calls": 0, "total_calls": 0, "accuracy": 0.5}
}
self.initialized = False
self.lock = asyncio.Lock()
print("✅ Learning Hub: Statistical Analyzer (Adaptive Hybrid) loaded")
async def initialize(self):
async with self.lock:
if self.initialized: return
print("🔄 [StatsAnalyzer] تهيئة التعلم الإحصائي المتكيف...")
try:
await self.load_weights_from_r2()
await self.load_performance_history()
await self.load_exit_profile_effectiveness()
await self.load_vader_effectiveness()
if not self.weights:
await self.initialize_default_weights()
self.initialized = True
print("✅ [StatsAnalyzer] جاهز.")
except Exception as e:
print(f"❌ [StatsAnalyzer] فشل التهيئة: {e}")
await self.initialize_default_weights()
self.initialized = True
async def initialize_default_weights(self):
"""إعادة تعيين الأوزان للافتراضيات"""
self.weights = {
# 🔴 الأوزان الهجينة الديناميكية (الافتراضية)
"hybrid_weights": {
"titan": 0.50,
"patterns": 0.40,
"monte_carlo": 0.10
},
# أوزان الاستراتيجيات القديمة (للمرجعية)
"strategy_weights": {
"trend_following": 0.18, "mean_reversion": 0.15, "breakout_momentum": 0.22,
"volume_spike": 0.12, "whale_tracking": 0.15, "pattern_recognition": 0.10,
"hybrid_ai": 0.08
},
"entry_trigger_threshold": 0.90 # عتبة الدخول الافتراضية
}
# إعادة تعيين سجلات الأداء
self.strategy_effectiveness = {}
self.vader_bin_effectiveness = {k: {"total_trades": 0, "total_pnl_percent": 0}
for k in ["Strong_Positive", "Positive", "Neutral", "Negative", "Strong_Negative"]}
self.component_performance = {k: {"correct_calls": 0, "total_calls": 0, "accuracy": 0.5}
for k in ["titan", "patterns", "monte_carlo"]}
async def update_statistics(self, trade_object: Dict[str, Any], close_reason: str):
"""تحديث الإحصائيات وتكييف الأوزان الهجينة"""
if not self.initialized: await self.initialize()
try:
strategy = trade_object.get('strategy', 'unknown')
pnl_percent = trade_object.get('pnl_percent', 0)
is_success = pnl_percent > 0.1
decision_data = trade_object.get('decision_data', {})
vader_score = decision_data.get('news_score', 0.0)
vader_bin = self._get_vader_bin(vader_score)
# 1. تحديث السجل العام
self.performance_history.append({
"timestamp": datetime.now().isoformat(),
"symbol": trade_object.get('symbol'),
"pnl_percent": pnl_percent,
"strategy": strategy,
"vader_bin": vader_bin
})
# 2. تحديث أداء VADER
if vader_bin not in self.vader_bin_effectiveness:
self.vader_bin_effectiveness[vader_bin] = {"total_trades": 0, "total_pnl_percent": 0}
self.vader_bin_effectiveness[vader_bin]["total_trades"] += 1
self.vader_bin_effectiveness[vader_bin]["total_pnl_percent"] += pnl_percent
# 🔴 3. تحديث أداء مكونات النظام الهجين (الجديد)
components = decision_data.get('components', {})
if components:
# هل كان تيتان على حق؟ (نعتبر > 0.6 توقعاً للصعود)
titan_bullish = components.get('titan_score', 0.5) >= 0.6
if (titan_bullish and is_success) or (not titan_bullish and not is_success):
self.component_performance["titan"]["correct_calls"] += 1
self.component_performance["titan"]["total_calls"] += 1
# هل كانت الأنماط على حق؟
pat_bullish = components.get('patterns_score', 0.5) >= 0.6
if (pat_bullish and is_success) or (not pat_bullish and not is_success):
self.component_performance["patterns"]["correct_calls"] += 1
self.component_performance["patterns"]["total_calls"] += 1
# هل كان مونت كارلو على حق؟
mc_bullish = components.get('mc_score', 0.5) >= 0.6
if (mc_bullish and is_success) or (not mc_bullish and not is_success):
self.component_performance["monte_carlo"]["correct_calls"] += 1
self.component_performance["monte_carlo"]["total_calls"] += 1
# 4. تكييف الأوزان دورياً
if should_update_weights(len(self.performance_history)):
await self.adapt_hybrid_weights()
await self.save_weights_to_r2()
await self.save_performance_history()
await self.save_vader_effectiveness()
print(f"✅ [StatsAnalyzer] Stats updated for {strategy}. Hybrid weights adapted.")
except Exception as e:
print(f"❌ [StatsAnalyzer] Update failed: {e}")
traceback.print_exc()
async def adapt_hybrid_weights(self):
"""تعديل الأوزان الهجينة (0.5/0.4/0.1) بناءً على الدقة الحقيقية"""
print("⚖️ [StatsAnalyzer] تكييف الأوزان الهجينة...")
try:
# حساب الدقة الحالية لكل مكون
for data in self.component_performance.values():
if data["total_calls"] > 5: # نحتاج عينة صغيرة على الأقل
data["accuracy"] = data["correct_calls"] / data["total_calls"]
# حساب الأوزان الجديدة النسبية (مع حد أدنى 0.05 لعدم إلغاء أي مكون تماماً)
t_acc = max(self.component_performance["titan"]["accuracy"], 0.05)
p_acc = max(self.component_performance["patterns"]["accuracy"], 0.05)
m_acc = max(self.component_performance["monte_carlo"]["accuracy"], 0.05)
total_acc = t_acc + p_acc + m_acc
new_weights = {
"titan": t_acc / total_acc,
"patterns": p_acc / total_acc,
"monte_carlo": m_acc / total_acc
}
# تطبيق التغيير بنعومة (80% قديم + 20% جديد) لتجنب التقلب الشديد
current = self.weights.get("hybrid_weights", {"titan":0.5, "patterns":0.4, "monte_carlo":0.1})
final_weights = {k: (current.get(k,0.33) * 0.8) + (new_weights[k] * 0.2) for k in new_weights}
self.weights["hybrid_weights"] = normalize_weights(final_weights)
print(f"✅ [StatsAnalyzer] New Hybrid Weights: {self.weights['hybrid_weights']}")
except Exception as e:
print(f"❌ [StatsAnalyzer] Weight adaptation failed: {e}")
# --- دوال مساعدة وتحميل/حفظ (R2) ---
def _get_vader_bin(self, score):
if score > 0.5: return "Strong_Positive"
if score > 0.05: return "Positive"
if score < -0.5: return "Strong_Negative"
if score < -0.05: return "Negative"
return "Neutral"
async def load_weights_from_r2(self):
try:
resp = self.r2_service.s3_client.get_object(Bucket="trading", Key="learning_statistical_weights.json")
data = json.loads(resp['Body'].read())
self.weights = data.get("weights", {})
self.component_performance = data.get("component_performance", self.component_performance)
except: pass # استخدام الافتراضيات
async def save_weights_to_r2(self):
try:
data = {"weights": self.weights, "component_performance": self.component_performance, "last_updated": datetime.now().isoformat()}
self.r2_service.s3_client.put_object(Bucket="trading", Key="learning_statistical_weights.json", Body=json.dumps(data).encode('utf-8'))
except Exception as e: print(f"❌ Failed to save weights: {e}")
async def load_performance_history(self):
try:
resp = self.r2_service.s3_client.get_object(Bucket="trading", Key="learning_performance_history.json")
self.performance_history = json.loads(resp['Body'].read()).get("history", [])
except: self.performance_history = []
async def save_performance_history(self):
try:
self.r2_service.s3_client.put_object(Bucket="trading", Key="learning_performance_history.json", Body=json.dumps({"history": self.performance_history[-1000:]}).encode('utf-8'))
except: pass
async def load_vader_effectiveness(self):
try:
resp = self.r2_service.s3_client.get_object(Bucket="trading", Key="learning_vader_effectiveness.json")
self.vader_bin_effectiveness = json.loads(resp['Body'].read()).get("effectiveness", {})
except: pass
async def save_vader_effectiveness(self):
try:
self.r2_service.s3_client.put_object(Bucket="trading", Key="learning_vader_effectiveness.json", Body=json.dumps({"effectiveness": self.vader_bin_effectiveness}).encode('utf-8'))
except: pass
async def load_exit_profile_effectiveness(self): pass # (تم تبسيطها للتركيز على الجديد)
async def save_exit_profile_effectiveness(self): pass
async def get_statistical_vader_pnl(self, score):
bin_data = self.vader_bin_effectiveness.get(self._get_vader_bin(score))
if bin_data and bin_data["total_trades"] >= 3:
return bin_data["total_pnl_percent"] / bin_data["total_trades"]
return 0.0
async def get_best_exit_profile(self, strategy): return "unknown" # (مبسطة)