Tradcloneai / ml_engine /processor.py
Riy777's picture
Auto-Fix by Architect: Auto-Fix: ASSIGN_ARCHITECT | Quant Silence during SYSTEM_BOOT suggests a critical code failure preventing analysis. System boot events should not inherently block quant analysis unless a bug exists in initialization logic or data pipelines. by Agent Architect
85d7e9d verified
# ============================================================
# 🧠 ml_engine/processor.py
# (V68.1 - GEM-Architect: Realistic Oracle Fallback)
# ============================================================
import asyncio
import traceback
import logging
import os
import sys
import numpy as np
from typing import Dict, Any, List, Optional
# --- استيراد المحركات (كما هي) ---
try: from .titan_engine import TitanEngine
except ImportError: TitanEngine = None
try: from .patterns import ChartPatternAnalyzer
except ImportError: ChartPatternAnalyzer = None
try: from .monte_carlo import MonteCarloEngine
except ImportError: MonteCarloEngine = None
try: from .oracle_engine import OracleEngine
except ImportError: OracleEngine = None
try: from .sniper_engine import SniperEngine
except ImportError: SniperEngine = None
try: from .hybrid_guardian import HybridDeepSteward
except ImportError: HybridDeepSteward = None
try: from .guardian_hydra import GuardianHydra
except ImportError: GuardianHydra = None
# ============================================================
# 📂 مسارات النماذج
# ============================================================
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
MODELS_L2_DIR = os.path.join(BASE_DIR, "ml_models", "layer2")
MODELS_PATTERN_DIR = os.path.join(BASE_DIR, "ml_models", "xgboost_pattern2")
MODELS_UNIFIED_DIR = os.path.join(BASE_DIR, "ml_models", "Unified_Models_V1")
MODELS_SNIPER_DIR = os.path.join(BASE_DIR, "ml_models", "guard_v2")
MODELS_HYDRA_DIR = os.path.join(BASE_DIR, "ml_models", "guard_v1")
MODEL_V2_PATH = os.path.join(BASE_DIR, "ml_models", "DeepSteward_V2_Production.json")
MODEL_V3_PATH = os.path.join(BASE_DIR, "ml_models", "DeepSteward_V3_Production.json")
MODEL_V3_FEAT = os.path.join(BASE_DIR, "ml_models", "DeepSteward_V3_Features.json")
# ============================================================
# 🎛️ SYSTEM LIMITS (Realistic Defaults)
# ============================================================
class SystemLimits:
"""
GEM-Architect: Adjusted Defaults based on real-world calibration.
Oracle Ceiling detected at ~0.75, so Threshold set to 0.60.
"""
# --- Layer 1 ---
L1_MIN_AFFINITY_SCORE = 15.0
# --- Layer 2 Hard Gates (Loosened) ---
L2_GATE_TITAN = 0.60
L2_GATE_PATTERN = 0.50
L2_GATE_MC = 0.50
# --- Layer 2 Weights & Min Score ---
L2_MIN_SCORE = 0.65
L2_WEIGHT_TITAN = 0.40
L2_WEIGHT_PATTERNS = 0.40
L2_WEIGHT_MC = 0.20
# Pattern Config
PATTERN_TF_WEIGHTS = {'1h': 0.35, '15m': 0.25, '1d': 0.20, '5m': 0.10, '4h': 0.10}
PATTERN_THRESH_BULLISH = 0.50
PATTERN_THRESH_BEARISH = 0.40
# --- Layer 3 ---
L3_CONFIDENCE_THRESHOLD = 0.60 # ✅ Adjusted: Realistic Entry (>55)
L3_WHALE_IMPACT_MAX = 0.10
L3_NEWS_IMPACT_MAX = 0.05
L3_MC_ADVANCED_MAX = 0.10
# --- Layer 4 ---
L4_ENTRY_THRESHOLD = 0.40
L4_WEIGHT_ML = 0.60
L4_WEIGHT_OB = 0.40
L4_OB_WALL_RATIO = 0.35
# --- Layer 0: Hydra & Guardian Defaults ---
HYDRA_CRASH_THRESH = 0.60
HYDRA_GIVEBACK_THRESH = 0.80
HYDRA_STAGNATION_THRESH = 0.60
# Fixed Legacy Guards
LEGACY_V2_PANIC_THRESH = 0.98
LEGACY_V3_HARD_THRESH = 0.95
LEGACY_V3_SOFT_THRESH = 0.88
LEGACY_V3_ULTRA_THRESH = 0.99
@classmethod
def to_dict(cls) -> Dict[str, Any]:
return {k: v for k, v in cls.__dict__.items() if not k.startswith('__') and not callable(v)}
# ============================================================
# 🧠 MLProcessor Class
# ============================================================
class MLProcessor:
def __init__(self, data_manager=None):
self.data_manager = data_manager
self.initialized = False
self.initialization_attempted = False
self.titan = TitanEngine(model_dir=MODELS_L2_DIR) if TitanEngine else None
self.pattern_engine = ChartPatternAnalyzer(models_dir=MODELS_PATTERN_DIR) if ChartPatternAnalyzer else None
self.mc_analyzer = MonteCarloEngine() if MonteCarloEngine else None
self.oracle = OracleEngine(model_dir=MODELS_UNIFIED_DIR) if OracleEngine else None
self.sniper = SniperEngine(models_dir=MODELS_SNIPER_DIR) if SniperEngine else None
self.guardian_hydra = None
if GuardianHydra:
self.guardian_hydra = GuardianHydra(model_dir=MODELS_HYDRA_DIR)
self.guardian_legacy = None
if HybridDeepSteward:
self.guardian_legacy = HybridDeepSteward(
v2_model_path=MODEL_V2_PATH,
v3_model_path=MODEL_V3_PATH,
v3_features_map_path=MODEL_V3_FEAT
)
print(f"🧠 [MLProcessor V68.1] Realistic Mode Loaded (Oracle 0.60).")
async def initialize(self):
if self.initialized:
return True
# Prevent multiple initialization attempts
if self.initialization_attempted:
return self.initialized
self.initialization_attempted = True
print("⚙️ [Processor] Initializing Neural Grid...")
try:
initialization_results = []
# Initialize Titan Engine
if self.titan:
try:
await self.titan.initialize()
initialization_results.append(("Titan", True, "Success"))
except Exception as e:
initialization_results.append(("Titan", False, str(e)))
print(f"⚠️ [Processor] Titan initialization warning: {e}")
# Initialize Pattern Engine
if self.pattern_engine:
try:
self.pattern_engine.configure_thresholds(
weights=SystemLimits.PATTERN_TF_WEIGHTS,
bull_thresh=SystemLimits.PATTERN_THRESH_BULLISH,
bear_thresh=SystemLimits.PATTERN_THRESH_BEARISH
)
await self.pattern_engine.initialize()
initialization_results.append(("Pattern", True, "Success"))
except Exception as e:
initialization_results.append(("Pattern", False, str(e)))
print(f"⚠️ [Processor] Pattern engine initialization warning: {e}")
# Initialize Monte Carlo Engine
if self.mc_analyzer:
try:
# Monte Carlo engine might have an initialize method, try both approaches
if hasattr(self.mc_analyzer, 'initialize'):
if asyncio.iscoroutinefunction(self.mc_analyzer.initialize):
await self.mc_analyzer.initialize()
else:
self.mc_analyzer.initialize()
initialization_results.append(("MonteCarlo", True, "Success"))
except Exception as e:
initialization_results.append(("MonteCarlo", False, str(e)))
print(f"⚠️ [Processor] Monte Carlo initialization warning: {e}")
# Initialize Oracle Engine
if self.oracle:
try:
if hasattr(self.oracle, 'set_threshold'):
self.oracle.set_threshold(SystemLimits.L3_CONFIDENCE_THRESHOLD)
await self.oracle.initialize()
initialization_results.append(("Oracle", True, "Success"))
except Exception as e:
initialization_results.append(("Oracle", False, str(e)))
print(f"⚠️ [Processor] Oracle initialization warning: {e}")
# Initialize Sniper Engine
if self.sniper:
try:
if hasattr(self.sniper, 'configure_settings'):
self.sniper.configure_settings(
threshold=SystemLimits.L4_ENTRY_THRESHOLD,
wall_ratio=SystemLimits.L4_OB_WALL_RATIO,
w_ml=SystemLimits.L4_WEIGHT_ML,
w_ob=SystemLimits.L4_WEIGHT_OB
)
await self.sniper.initialize()
initialization_results.append(("Sniper", True, "Success"))
except Exception as e:
initialization_results.append(("Sniper", False, str(e)))
print(f"⚠️ [Processor] Sniper initialization warning: {e}")
# Initialize Guardian Hydra
if self.guardian_hydra:
try:
self.guardian_hydra.initialize()
initialization_results.append(("Hydra", True, "Success"))
print(" 🛡️ [Guard 1] Hydra X-Ray: Active")
except Exception as e:
initialization_results.append(("Hydra", False, str(e)))
print(f"⚠️ [Processor] Hydra initialization warning: {e}")
# Initialize Legacy Guardian
if self.guardian_legacy:
try:
if asyncio.iscoroutinefunction(self.guardian_legacy.initialize):
await self.guardian_legacy.initialize()
else:
self.guardian_legacy.initialize()
# Default init
self.guardian_legacy.configure_thresholds(
v2_panic=SystemLimits.LEGACY_V2_PANIC_THRESH,
v3_hard=SystemLimits.LEGACY_V3_HARD_THRESH,
v3_soft=SystemLimits.LEGACY_V3_SOFT_THRESH,
v3_ultra=SystemLimits.LEGACY_V3_ULTRA_THRESH
)
initialization_results.append(("Legacy", True, "Success"))
print(f" 🛡️ [Guard 2] Legacy Steward: Active")
except Exception as e:
initialization_results.append(("Legacy", False, str(e)))
print(f"⚠️ [Processor] Legacy guardian initialization warning: {e}")
# Check if critical components are initialized
critical_components = ["Oracle", "Titan"]
critical_initialized = True
for component_name, success, _ in initialization_results:
if component_name in critical_components and not success:
critical_initialized = False
print(f"❌ [Processor CRITICAL] {component_name} failed to initialize")
if not critical_initialized:
raise RuntimeError("Critical system components failed to initialize")
self.initialized = True
print("✅ [Processor] All Systems Operational.")
return True
except Exception as e:
print(f"❌ [Processor FATAL] Init failed: {e}")
traceback.print_exc()
self.initialized = False # Ensure we don't mark as initialized on failure
return False
async def process_compound_signal(self, raw_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""
L2 Processing with Hybrid Gated Scoring (Full Visibility).
"""
if not self.initialized:
init_success = await self.initialize()
if not init_success:
print("❌ [Processor] Cannot process signal - initialization failed")
return None
symbol = raw_data.get('symbol')
ohlcv_data = raw_data.get('ohlcv')
current_price = raw_data.get('current_price', 0.0)
# ✅ الحقن المباشر للقيم
limits = raw_data.get('dynamic_limits', {})
if not symbol or not ohlcv_data:
return None
try:
# 1. Titan Prediction
score_titan = 0.5
titan_res = {}
if self.titan:
try:
titan_res = await asyncio.to_thread(self.titan.predict, ohlcv_data)
score_titan = titan_res.get('score', 0.5)
except Exception as e:
print(f"⚠️ [Processor] Titan prediction error for {symbol}: {e}")
score_titan = 0.5
# 2. Pattern Analysis
score_patterns = 0.5
pattern_res = {}
pattern_name = "Neutral"
if self.pattern_engine:
try:
pattern_res = await self.pattern_engine.detect_chart_patterns(ohlcv_data)
score_patterns = pattern_res.get('pattern_confidence', 0.5)
pattern_name = pattern_res.get('pattern_detected', 'Neutral')
except Exception as e:
print(f"⚠️ [Processor] Pattern detection error for {symbol}: {e}")
score_patterns = 0.5
# 3. Monte Carlo Light
mc_score = 0.5
if self.mc_analyzer and '1h' in ohlcv_data:
try:
closes = [c[4] for c in ohlcv_data['1h']]
raw_mc = self.mc_analyzer.run_light_check(closes)
mc_score = 0.5 + (raw_mc * 5.0)
mc_score = max(0.0, min(1.0, mc_score))
except Exception as e:
print(f"⚠️ [Processor] MC analysis error for {symbol}: {e}")
mc_score = 0.5
# --- 4. Hybrid Gated Logic (Aggressive) ---
# A) Extract Gates (Injectable, fallback to Aggressive SystemLimits)
gate_titan = limits.get('l2_gate_titan', SystemLimits.L2_GATE_TITAN)
gate_patt = limits.get('l2_gate_pattern', SystemLimits.L2_GATE_PATTERN)
gate_mc = limits.get('l2_gate_mc', SystemLimits.L2_GATE_MC)
rejection_reason = None
is_valid = True
# B) HARD GATES Check
if score_titan < gate_titan:
is_valid = False
rejection_reason = f"Titan {score_titan:.2f} < {gate_titan}"
elif score_patterns < gate_patt:
is_valid = False
rejection_reason = f"Pattern {score_patterns:.2f} < {gate_patt}"
elif mc_score < gate_mc:
is_valid = False
rejection_reason = f"MC {mc_score:.2f} < {gate_mc}"
# C) Weighted Score Calculation
w_titan = limits.get('w_titan', SystemLimits.L2_WEIGHT_TITAN)
w_patt = limits.get('w_patt', SystemLimits.L2_WEIGHT_PATTERNS)
w_mc = limits.get('w_mc', SystemLimits.L2_WEIGHT_MC)
total_w = w_titan + w_patt + w_mc
if total_w <= 0: total_w = 1.0
hybrid_score = ((score_titan * w_titan) + (score_patterns * w_patt) + (mc_score * w_mc)) / total_w
# D) Final Score Gate
min_l2_score = limits.get('l2_min_score', SystemLimits.L2_MIN_SCORE)
if is_valid and hybrid_score < min_l2_score:
is_valid = False
rejection_reason = f"Hybrid {hybrid_score:.2f} < {min_l2_score}"
return {
'symbol': symbol,
'current_price': current_price,
'enhanced_final_score': hybrid_score,
'is_valid': is_valid, # ✅ Validity Flag
'rejection_reason': rejection_reason, # ✅ Reason
'dynamic_limits': limits,
'asset_regime': raw_data.get('asset_regime', 'UNKNOWN'),
'strategy_type': raw_data.get('strategy_type', 'NORMAL'),
'titan_score': score_titan,
'patterns_score': score_patterns,
'mc_score': mc_score,
'components': {
'titan_score': score_titan,
'patterns_score': score_patterns,
'mc_score': mc_score
},
'pattern_name': pattern_name,
'ohlcv': ohlcv_data,
'titan_details': titan_res,
'pattern_details': pattern_res.get('details', {})
}
except Exception as e:
print(f"❌ [Processor] Error processing {symbol}: {e}")
traceback.print_exc()
return None
async def consult_oracle(self, symbol_data: Dict[str, Any]) -> Dict[str, Any]:
if not self.initialized:
init_success = await self.initialize()
if not init_success:
return {'action': 'WAIT', 'reason': 'System initialization failed'}
# ✅ الحقن المباشر للعتبة
limits = symbol_data.get('dynamic_limits', {})
threshold = limits.get('l3_oracle_thresh', SystemLimits.L3_CONFIDENCE_THRESHOLD)
if self.oracle:
try:
if hasattr(self.oracle, 'set_threshold'):
self.oracle.set_threshold(threshold)
decision = await self.oracle.predict(symbol_data)
conf = decision.get('confidence', 0.0)
if decision.get('action') in ['WATCH', 'BUY'] and conf < threshold:
decision['action'] = 'WAIT'
decision['reason'] = f"Context Veto: Conf {conf:.2f} < Limit {threshold:.2f}"
return decision
except Exception as e:
print(f"❌ [Processor] Oracle consultation error: {e}")
traceback.print_exc()
return {'action': 'WAIT', 'reason': f'Oracle error: {str(e)}'}
return {'action': 'WAIT', 'reason': 'Oracle Engine Missing'}
async def check_sniper_entry(self, ohlcv_1m_data: List, order_book_data: Dict[str, Any], context_data: Dict = None) -> Dict[str, Any]:
if not self.initialized:
init_success = await self.initialize()
if not init_success:
return {'signal': 'WAIT', 'reason': 'System initialization failed'}
limits = context_data.get('dynamic_limits', {}) if context_data else {}
thresh = limits.get('l4_sniper_thresh', SystemLimits.L4_ENTRY_THRESHOLD)
wall_r = limits.get('l4_ob_wall_ratio', SystemLimits.L4_OB_WALL_RATIO)
if self.sniper:
try:
if hasattr(self.sniper, 'configure_settings'):
self.sniper.configure_settings(
threshold=thresh,
wall_ratio=wall_r,
w_ml=SystemLimits.L4_WEIGHT_ML,
w_ob=SystemLimits.L4_WEIGHT_OB
)
return await self.sniper.check_entry_signal_async(ohlcv_1m_data, order_book_data)
except Exception as e:
print(f"❌ [Processor] Sniper entry check error: {e}")
traceback.print_exc()
return {'signal': 'WAIT', 'reason': f'Sniper error: {str(e)}'}
return {'signal': 'WAIT', 'reason': 'Sniper Engine Missing'}
def consult_dual_guardians(self, symbol, ohlcv_1m, ohlcv_5m, ohlcv_15m, trade_context, order_book_snapshot=None):
"""
💎 GEM-Architect: Conditional Hydra & Fixed Legacy Logic
"""
# Ensure initialization before proceeding - FIXED: Added proper initialization check
if not self.initialized:
print("⚠️ [Processor] Guardians consulted before initialization")
return {'action': 'HOLD', 'detailed_log': 'System not initialized', 'probs': {}, 'scores': {}}
response = {'action': 'HOLD', 'detailed_log': '', 'probs': {}, 'scores': {}}
# 1. استخراج الحدود الديناميكية من سياق الصفقة
limits = trade_context.get('dynamic_limits', {})
# ✅ سحب القيم مع Fallback آمن
h_crash_thresh = limits.get('hydra_crash', SystemLimits.HYDRA_CRASH_THRESH)
h_giveback_thresh = limits.get('hydra_giveback', SystemLimits.HYDRA_GIVEBACK_THRESH)
h_stag_thresh = limits.get('hydra_stagnation', SystemLimits.HYDRA_STAGNATION_THRESH)
# ✅ Context Data
entry_price = float(trade_context.get('entry_price', 0.0))
highest_price = trade_context.get('highest_price', entry_price)
max_pnl_pct = ((highest_price - entry_price) / entry_price) * 100 if entry_price > 0 else 0.0
time_in_trade_mins = trade_context.get('time_in_trade_mins', 0.0)
# -----------------------------------------------
# 1. Hydra Execution (Conditional)
# -----------------------------------------------
hydra_result = {'action': 'HOLD', 'reason': 'Disabled', 'probs': {}}
if self.guardian_hydra and getattr(self.guardian_hydra, 'initialized', False):
try:
hydra_result = self.guardian_hydra.analyze_position(symbol, ohlcv_1m, ohlcv_5m, ohlcv_15m, trade_context)
h_probs = hydra_result.get('probs', {})
p_crash = h_probs.get('crash', 0.0)
p_giveback = h_probs.get('giveback', 0.0)
p_stagnation = h_probs.get('stagnation', 0.0)
# 🛑 CRASH: Always Active (Safety Net)
if p_crash >= h_crash_thresh:
hydra_result['action'] = 'EXIT_HARD'
hydra_result['reason'] = f"Hydra Crash Risk {p_crash:.2f} >= {h_crash_thresh}"
# 🛑 GIVEBACK: Conditional (Profit > 0.6%)
elif p_giveback >= h_giveback_thresh:
if max_pnl_pct >= 0.6:
hydra_result['action'] = 'EXIT_SOFT'
hydra_result['reason'] = f"Hydra Giveback {p_giveback:.2f} (Max PnL {max_pnl_pct:.2f}%)"
else:
hydra_result['action'] = 'HOLD' # Ignore noise
# 🛑 STAGNATION: Conditional (Time > 90 mins)
elif p_stagnation >= h_stag_thresh:
if time_in_trade_mins > 90:
hydra_result['action'] = 'EXIT_SOFT'
hydra_result['reason'] = f"Hydra Stagnation {p_stagnation:.2f} (>90m)"
else:
hydra_result['action'] = 'HOLD' # Too early
except Exception as e:
print(f"⚠️ [Processor] Hydra analysis error: {e}")
traceback.print_exc()
hydra_result = {'action': 'HOLD', 'reason': f'Hydra error: {str(e)}', 'probs': {}}
# -----------------------------------------------
# 2. Legacy Execution (Fixed Thresholds)
# -----------------------------------------------
legacy_result = {'action': 'HOLD', 'reason': 'Disabled', 'scores': {}}
if self.guardian_legacy and getattr(self.guardian_legacy, 'initialized', False):
try:
self.guardian_legacy.configure_thresholds(
v2_panic=0.98,
v3_hard=0.95,
v3_soft=0.88,
v3_ultra=0.99
)
vol_30m = trade_context.get('volume_30m_usd', 0.0)
legacy_result = self.guardian_legacy.analyze_position(
ohlcv_1m, ohlcv_5m, ohlcv_15m, entry_price,
order_book=order_book_snapshot,
volume_30m_usd=vol_30m
)
except Exception as e:
print(f"⚠️ [Processor] Legacy guardian analysis error: {e}")
traceback.print_exc()
legacy_result = {'action': 'HOLD', 'reason': f'Legacy error: {str(e)}', 'scores': {}}
# -----------------------------------------------
# 3. Final Arbitration
# -----------------------------------------------
h_probs = hydra_result.get('probs', {})
l_scores = legacy_result.get('scores', {})
h_c = h_probs.get('crash', 0.0)
h_g = h_probs.get('giveback', 0.0)
l_v2 = l_scores.get('v2', 0.0)
stamp_str = f"🐲[C:{h_c:.2f}|G:{h_g:.2f}] 🕸️[V2:{l_v2:.2f}]"
final_action = 'HOLD'
final_reason = f"Safe. {stamp_str}"
# Ensure both results have action keys before comparison
hydra_action = hydra_result.get('action', 'HOLD')
legacy_action = legacy_result.get('action', 'HOLD')
if hydra_action in ['EXIT_HARD', 'EXIT_SOFT', 'TIGHTEN_SL', 'TRAIL_SL']:
final_action = hydra_action
final_reason = f"🐲 HYDRA: {hydra_result.get('reason', 'Unknown Hydra action')}"
elif legacy_action in ['EXIT_HARD', 'EXIT_SOFT']:
final_action = legacy_action
final_reason = f"🕸️ LEGACY: {legacy_result.get('reason', 'Unknown Legacy action')}"
return {
'action': final_action,
'reason': final_reason,
'detailed_log': f"{final_action} | {stamp_str}",
'probs': h_probs,
'scores': l_scores
}
async def run_advanced_monte_carlo(self, symbol, timeframe='1h'):
if self.mc_analyzer and self.data_manager:
try:
ohlcv = await self.data_manager.get_latest_ohlcv(symbol, timeframe, limit=300)
if ohlcv:
return self.mc_analyzer.run_advanced_simulation([c[4] for c in ohlcv])
except Exception as e:
print(f"⚠️ [Processor] Advanced MC error for {symbol}: {e}")
traceback.print_exc()
pass
return 0.0