Spaces:
Running
Running
| # app.py - الإصدار المحدث مع إصلاح الأخطاء الحرجة | |
| import os | |
| import traceback | |
| import signal | |
| import sys | |
| import uvicorn | |
| import asyncio | |
| from contextlib import asynccontextmanager | |
| from fastapi import FastAPI, BackgroundTasks, HTTPException | |
| from datetime import datetime, timedelta | |
| from r2 import R2Service | |
| from LLM import LLMService, local_analyze_opportunity, local_re_analyze_trade | |
| from data_manager import DataManager | |
| from ML import MLProcessor as FeatureProcessor | |
| from learning_engine import LearningEngine | |
| import time | |
| import json | |
| import state | |
| import re | |
| # إعدادات النظام | |
| TOP_N_SYMBOLS = 100 | |
| OPPORTUNITY_COUNT = 10 | |
| CHUNK_SIZE = 5 | |
| # المتغيرات العامة للنظام | |
| r2_service_global = None | |
| data_manager_global = None | |
| llm_service_global = None | |
| learning_engine_global = None | |
| realtime_monitor = None | |
| # Real-time trade monitoring with enhanced risk management | |
| class RealTimeTradeMonitor: | |
| def __init__(self): | |
| self.monitoring_tasks = {} | |
| self.is_running = False | |
| async def start_monitoring(self): | |
| """بدء مراقبة جميع الصفقات المفتوحة""" | |
| self.is_running = True | |
| print("🔍 Starting real-time trade monitoring...") | |
| while self.is_running: | |
| try: | |
| open_trades = await r2_service_global.get_open_trades_async() | |
| for trade in open_trades: | |
| symbol = trade['symbol'] | |
| if symbol not in self.monitoring_tasks: | |
| asyncio.create_task(self._monitor_single_trade(trade)) | |
| self.monitoring_tasks[symbol] = trade | |
| current_symbols = {trade['symbol'] for trade in open_trades} | |
| for symbol in list(self.monitoring_tasks.keys()): | |
| if symbol not in current_symbols: | |
| del self.monitoring_tasks[symbol] | |
| await asyncio.sleep(10) | |
| except Exception as error: | |
| print(f"❌ Real-time monitor error: {error}") | |
| await asyncio.sleep(30) | |
| async def _monitor_single_trade(self, trade): | |
| """مراقبة صفقة فردية في الوقت الحقيقي""" | |
| symbol = trade['symbol'] | |
| strategy = trade.get('strategy', 'GENERIC') | |
| print(f"📊 Starting real-time monitoring for {symbol} (Strategy: {strategy})") | |
| while symbol in self.monitoring_tasks and self.is_running: | |
| try: | |
| current_price = await data_manager_global.get_latest_price_async(symbol) | |
| if not current_price: | |
| await asyncio.sleep(15) | |
| continue | |
| entry_price = trade['entry_price'] | |
| stop_loss = trade.get('stop_loss') | |
| take_profit = trade.get('take_profit') | |
| should_close = False | |
| close_reason = "" | |
| if stop_loss and current_price <= stop_loss: | |
| should_close = True | |
| close_reason = f"Stop loss hit: {current_price} <= {stop_loss}" | |
| elif take_profit and current_price >= take_profit: | |
| should_close = True | |
| close_reason = f"Take profit hit: {current_price} >= {take_profit}" | |
| if not should_close and current_price > entry_price: | |
| dynamic_stop = current_price * 0.98 | |
| if dynamic_stop > (stop_loss or 0): | |
| trade['stop_loss'] = dynamic_stop | |
| print(f"🔒 Updated trailing stop for {symbol}: {dynamic_stop:.4f}") | |
| if should_close: | |
| print(f"🚨 IMMEDIATE CLOSE: {symbol} - {close_reason} - Strategy: {strategy}") | |
| if r2_service_global.acquire_lock(): | |
| try: | |
| await r2_service_global.close_trade_async(trade, current_price) | |
| print(f"✅ Trade {symbol} closed immediately at {current_price}. Strategy: {strategy}") | |
| if learning_engine_global and learning_engine_global.initialized: | |
| await learning_engine_global.analyze_trade_outcome(trade, 'CLOSED_BY_MONITOR') | |
| asyncio.create_task(run_bot_cycle_async()) | |
| finally: | |
| r2_service_global.release_lock() | |
| if symbol in self.monitoring_tasks: | |
| del self.monitoring_tasks[symbol] | |
| break | |
| await asyncio.sleep(15) | |
| except Exception as error: | |
| print(f"❌ Real-time monitoring error for {symbol}: {error}") | |
| await asyncio.sleep(30) | |
| def stop_monitoring(self): | |
| """إيقاف جميع مهام المراقبة""" | |
| self.is_running = False | |
| self.monitoring_tasks.clear() | |
| print("🛑 Real-time trade monitoring stopped") | |
| async def monitor_market_async(): | |
| """Background task to continuously monitor market health""" | |
| global data_manager_global | |
| init_attempts = 0 | |
| while data_manager_global is None and init_attempts < 10: | |
| print(f"⏳ Waiting for data manager initialization... (attempt {init_attempts + 1}/10)") | |
| await asyncio.sleep(3) | |
| init_attempts += 1 | |
| if data_manager_global is None: | |
| print("❌ Data manager failed to initialize after 10 attempts") | |
| return | |
| while True: | |
| try: | |
| print("👁️ Monitoring market sentiment...") | |
| try: | |
| market_context = await data_manager_global.get_market_context_async() | |
| except Exception as error: | |
| print(f"⚠️ Failed to get market context: {error}") | |
| market_context = await get_fallback_market_context() | |
| if not market_context: | |
| print("❌ Failed to get market context. Assuming neutral state.") | |
| state.MARKET_STATE_OK = True | |
| await asyncio.sleep(60) | |
| continue | |
| whale_analysis = market_context.get('general_whale_activity', {}) | |
| whale_sentiment = whale_analysis.get('sentiment', 'NEUTRAL') | |
| is_critical = whale_analysis.get('critical_alert', False) | |
| total_volume = whale_analysis.get('total_volume_usd', 0) | |
| print(f"🐋 Whale Analysis: {whale_sentiment} | Critical: {is_critical} | Volume: ${total_volume:,.0f}") | |
| print(f"📈 Whale Description: {whale_analysis.get('description', 'No data')}") | |
| bitcoin_sentiment = market_context.get('btc_sentiment') | |
| fear_greed_index = market_context.get('fear_and_greed_index') | |
| should_halt_trading = False | |
| halt_reason = "" | |
| if is_critical: | |
| should_halt_trading = True | |
| halt_reason = f"CRITICAL whale activity detected: {whale_analysis.get('description')}" | |
| elif bitcoin_sentiment == 'BEARISH' and (fear_greed_index is not None and fear_greed_index < 30): | |
| should_halt_trading = True | |
| halt_reason = f"Bearish market conditions (BTC: {bitcoin_sentiment}, F&G: {fear_greed_index})" | |
| if should_halt_trading: | |
| print(f"🚨🚨🚨 MARKET HALT: {halt_reason} 🚨🚨🚨") | |
| state.MARKET_STATE_OK = False | |
| try: | |
| await r2_service_global.save_system_logs_async({ | |
| "market_halt": True, | |
| "reason": halt_reason, | |
| "whale_sentiment": whale_sentiment, | |
| "is_critical": is_critical | |
| }) | |
| except Exception as log_error: | |
| print(f"⚠️ Failed to save market halt log: {log_error}") | |
| else: | |
| if not state.MARKET_STATE_OK: | |
| print("🟢 Market conditions improved. Resuming normal operations.") | |
| state.MARKET_STATE_OK = True | |
| await asyncio.sleep(60) | |
| except Exception as error: | |
| print(f"❌ An error occurred during COMPREHENSIVE market monitoring: {error}") | |
| traceback.print_exc() | |
| state.MARKET_STATE_OK = True | |
| await asyncio.sleep(60) | |
| async def get_fallback_market_context(): | |
| """Fallback function when main market context fails""" | |
| return { | |
| 'timestamp': datetime.now().isoformat(), | |
| 'general_whale_activity': { | |
| 'sentiment': 'NEUTRAL', | |
| 'description': 'Fallback mode - system initializing', | |
| 'critical_alert': False, | |
| 'transaction_count': 0, | |
| 'total_volume_usd': 0 | |
| }, | |
| 'btc_sentiment': 'NEUTRAL', | |
| 'fear_and_greed_index': 50 | |
| } | |
| def safe_float_conversion(value, default=0.0): | |
| """تحويل آمن للقيم إلى أرقام""" | |
| try: | |
| if value is None: | |
| return default | |
| if isinstance(value, (int, float)): | |
| return float(value) | |
| if isinstance(value, str): | |
| cleaned = ''.join(character for character in value if character.isdigit() or character in '.-') | |
| return float(cleaned) if cleaned else default | |
| return default | |
| except (ValueError, TypeError): | |
| return default | |
| async def validate_candidate_data_enhanced(candidate): | |
| """✨ تحسين التحقق من جودة المرشحين""" | |
| try: | |
| required_fields = ['symbol', 'current_price', 'final_score', 'enhanced_final_score'] | |
| for field in required_fields: | |
| if field not in candidate: | |
| candidate[field] = 0.0 if field.endswith('_score') or field == 'current_price' else 'UNKNOWN' | |
| candidate['current_price'] = safe_float_conversion(candidate.get('current_price'), 0.0) | |
| candidate['final_score'] = safe_float_conversion(candidate.get('final_score'), 0.5) | |
| candidate['enhanced_final_score'] = safe_float_conversion(candidate.get('enhanced_final_score'), candidate['final_score']) | |
| if 'reasons_for_candidacy' not in candidate or not candidate['reasons_for_candidacy']: | |
| candidate['reasons_for_candidacy'] = ['unknown_reason'] | |
| if 'sentiment_data' not in candidate or not candidate['sentiment_data']: | |
| candidate['sentiment_data'] = { | |
| 'btc_sentiment': 'NEUTRAL', | |
| 'fear_and_greed_index': 50, | |
| 'general_whale_activity': {'sentiment': 'NEUTRAL', 'critical_alert': False} | |
| } | |
| if 'advanced_indicators' not in candidate: | |
| candidate['advanced_indicators'] = {} | |
| if 'strategy_scores' not in candidate: | |
| candidate['strategy_scores'] = {} | |
| if 'recommended_strategy' not in candidate: | |
| candidate['recommended_strategy'] = 'unknown' | |
| # ✅ الإصلاح: التأكد من وجود استراتيجية مستهدفة صالحة | |
| if 'target_strategy' not in candidate or not candidate['target_strategy'] or candidate['target_strategy'] == 'unknown': | |
| candidate['target_strategy'] = 'GENERIC' | |
| return True | |
| except Exception as error: | |
| print(f"❌ Failed to validate candidate data for {candidate.get('symbol')}: {error}") | |
| return False | |
| async def analyze_market_strategy(market_context): | |
| """تحديد الاستراتيجية المثلى بناءً على ظروف السوق""" | |
| try: | |
| prompt = f""" | |
| You are a professional crypto portfolio manager. Analyze the current market conditions and determine the most suitable strategy. | |
| **Market Data:** | |
| - BTC Price: {market_context.get('bitcoin_price_usd')} | |
| - BTC Sentiment: {market_context.get('btc_sentiment')} | |
| - Fear & Greed Index: {market_context.get('fear_and_greed_index')} | |
| - Whale Analysis: {market_context.get('general_whale_activity', {}).get('sentiment')} | |
| - Critical Whale Alert: {market_context.get('general_whale_activity', {}).get('critical_alert')} | |
| **Available Strategies:** | |
| 1. AGGRESSIVE_GROWTH - For strong bull markets. | |
| 2. DEFENSIVE_GROWTH - For volatile or uncertain markets. | |
| 3. CONSERVATIVE - For bearish or high-risk markets. | |
| 4. HIGH_FREQUENCY - For sideways markets. | |
| 5. WHALE_FOLLOWING - When whale activity is high and clear. | |
| 6. GENERIC - Balanced approach for normal conditions. | |
| **Required:** | |
| - Choose one primary strategy. | |
| - Explain why in a single sentence. | |
| - Set an acceptable risk tolerance (1 to 10). | |
| - Determine the optimal number of coins to scan (50 to 200). | |
| **Output (JSON only):** | |
| {{ | |
| "primary_strategy": "STRATEGY_NAME", | |
| "reasoning": "Brief reasoning.", | |
| "risk_tolerance": 5, | |
| "optimal_scan_count": 100 | |
| }} | |
| """ | |
| response = await llm_service_global._call_llm(prompt) | |
| try: | |
| json_match = re.search(r'\{.*\}', response, re.DOTALL) | |
| strategy_data = json.loads(json_match.group()) | |
| except: | |
| strategy_data = { | |
| "primary_strategy": "GENERIC", | |
| "reasoning": "Fallback strategy for market stability", | |
| "risk_tolerance": 5, | |
| "optimal_scan_count": 100, | |
| } | |
| return strategy_data | |
| except Exception as error: | |
| print(f"❌ Failed to analyze market strategy: {error}") | |
| return { | |
| "primary_strategy": "GENERIC", | |
| "reasoning": "Fallback due to analysis error", | |
| "risk_tolerance": 5, | |
| "optimal_scan_count": 100, | |
| } | |
| async def find_strategy_specific_candidates(strategy, scan_count): | |
| """✨ نظام فلترة ذكي يستخدم الاستراتيجيات المتخصصة - عتبات مخفضة""" | |
| try: | |
| # 1. جلب قائمة المرشحين الأولية | |
| all_candidates = await data_manager_global.find_high_potential_candidates(scan_count * 2) | |
| if not all_candidates: | |
| print(f"⚠️ الماسح العام لم يجد أي مرشحين أوليين.") | |
| return [] | |
| # 2. تحديث market_context قبل المعالجة | |
| market_context = await data_manager_global.get_market_context_async() | |
| if not market_context: | |
| print("❌ Failed to get market context for strategy analysis") | |
| return [] | |
| feature_processor = FeatureProcessor(market_context, data_manager_global, learning_engine_global) | |
| processed_candidates = [] | |
| for candidate in all_candidates[:30]: # ⬇️ تخفيض من 50 إلى 30 للأداء | |
| try: | |
| # تحويل البيانات الخام إلى بيانات معالجة | |
| symbol_with_reasons = [{'symbol': candidate['symbol'], 'reasons': candidate.get('reasons', [])}] | |
| ohlcv_data = await data_manager_global.get_fast_pass_data_async(symbol_with_reasons) | |
| if ohlcv_data and ohlcv_data[0]: | |
| # ✅ تحديث market_context قبل كل معالجة لمنع الخطأ | |
| try: | |
| updated_market_context = await data_manager_global.get_market_context_async() | |
| if updated_market_context: | |
| feature_processor.market_context = updated_market_context | |
| except Exception as e: | |
| print(f"⚠️ Failed to update market context for {candidate['symbol']}: {e}") | |
| # الاستمرار بالسياق القديم إذا فشل التحديث | |
| processed = await feature_processor.process_and_score_symbol_enhanced(ohlcv_data[0]) | |
| if processed: | |
| processed_candidates.append(processed) | |
| except Exception as e: | |
| print(f"⚠️ فشل معالجة {candidate.get('symbol')}: {e}") | |
| if not processed_candidates: | |
| print("⚠️ لم يتم معالجة أي مرشح بنجاح") | |
| return [] | |
| # 3. فرز المرشحين حسب الاستراتيجية المطلوبة | |
| if strategy != 'GENERIC': | |
| # تحديد أفضل المرشحين للاستراتيجية المحددة | |
| strategy_candidates = [] | |
| for candidate in processed_candidates: | |
| # ✅ استخدام الدرجات الأساسية بدلاً من المرجحة | |
| base_scores = candidate.get('base_strategy_scores', {}) | |
| strategy_score = base_scores.get(strategy, 0) | |
| # ✅ تخفيض كبير في عتبة القبول | |
| if strategy_score > 0.2: # ⬇️ تخفيض من 0.4 إلى 0.2 | |
| candidate['strategy_match_score'] = strategy_score | |
| strategy_candidates.append(candidate) | |
| print(f"✅ {candidate['symbol']} مناسب لـ {strategy} (درجة: {strategy_score:.3f})") | |
| # فرز حسب تطابق الاستراتيجية | |
| sorted_candidates = sorted(strategy_candidates, | |
| key=lambda x: x.get('strategy_match_score', 0), | |
| reverse=True) | |
| top_candidates = sorted_candidates[:15] # ⬇️ تخفيض من 20 إلى 15 | |
| print(f"✅ تم اختيار {len(top_candidates)} مرشحًا لاستراتيجية {strategy}") | |
| else: | |
| # للاستراتيجية العامة، استخدم النقاط المحسنة | |
| sorted_candidates = sorted(processed_candidates, | |
| key=lambda x: x.get('enhanced_final_score', 0), | |
| reverse=True) | |
| top_candidates = sorted_candidates[:15] # ⬇️ تخفيض من 20 إلى 15 | |
| print(f"✅ تم اختيار {len(top_candidates)} مرشحًا للاستراتيجية العامة") | |
| return top_candidates | |
| except Exception as error: | |
| print(f"❌ فشل في نظام الفلترة المتقدم: {error}") | |
| traceback.print_exc() | |
| return [] | |
| async def find_new_opportunities_async(): | |
| """✨ NEW: المسح المحسن باستراتيجية مسبقة مع عتبات مخفضة""" | |
| print("🔍 Scanning for new opportunities with reduced thresholds...") | |
| try: | |
| await r2_service_global.save_system_logs_async({ | |
| "opportunity_scan_started": True, "timestamp": datetime.now().isoformat() | |
| }) | |
| print("🧠 Determining trading strategy...") | |
| market_context = await data_manager_global.get_market_context_async() | |
| if not market_context: | |
| print("❌ Failed to fetch market context. Cannot determine strategy.") | |
| return | |
| strategy_decision = await analyze_market_strategy(market_context) | |
| print(f"🎯 Selected Strategy: {strategy_decision['primary_strategy']}") | |
| print(f"📝 Reasoning: {strategy_decision['reasoning']}") | |
| print(f"⚡ Risk Tolerance: {strategy_decision.get('risk_tolerance', 5)}/10") | |
| print(f"🔍 Optimal Scan Count: {strategy_decision.get('optimal_scan_count', 100)}") | |
| print(f"🔍 Finding top candidates using dynamic ranking...") | |
| high_potential_candidates = await find_strategy_specific_candidates( | |
| strategy_decision['primary_strategy'], | |
| strategy_decision.get('optimal_scan_count', 100) | |
| ) | |
| if not high_potential_candidates: | |
| print("🔄 لا توجد مرشحين متخصصين، جلب مرشحين عامين...") | |
| # ✅ استرجاع مرشحين عامين كبديل | |
| high_potential_candidates = await data_manager_global.find_high_potential_candidates(20) | |
| if high_potential_candidates: | |
| for candidate in high_potential_candidates: | |
| candidate['target_strategy'] = 'GENERIC' | |
| print(f"✅ تم تحميل {len(high_potential_candidates)} مرشح عام") | |
| else: | |
| print("✅ No new candidates found after dynamic ranking.") | |
| await r2_service_global.save_system_logs_async({ | |
| "no_candidates_found": True, "strategy": strategy_decision['primary_strategy'], | |
| "reason": "Scanner did not return any initial candidates." | |
| }) | |
| return | |
| all_processed_candidates = [] | |
| for index in range(0, len(high_potential_candidates), CHUNK_SIZE): | |
| chunk = high_potential_candidates[index:index+CHUNK_SIZE] | |
| chunk_data = await data_manager_global.get_fast_pass_data_async(chunk) | |
| print(f"⏳ Processing and scoring chunk {index//CHUNK_SIZE + 1}...") | |
| # ✅ تحديث market_context قبل معالجة كل شريحة | |
| updated_market_context = await data_manager_global.get_market_context_async() | |
| if not updated_market_context: | |
| updated_market_context = market_context # استخدام السياق القديم كبديل | |
| feature_processor = FeatureProcessor(updated_market_context, data_manager_global, learning_engine_global) | |
| processed_chunk = await asyncio.gather(*[ | |
| feature_processor.process_and_score_symbol_enhanced(data) for data in chunk_data | |
| ]) | |
| all_processed_candidates.extend([c for c in processed_chunk if c is not None]) | |
| await asyncio.sleep(1) | |
| if not all_processed_candidates: | |
| print("❌ No candidates were processed successfully.") | |
| return | |
| # ✅ استخدام السياق المحدث للتصفية النهائية | |
| updated_market_context = await data_manager_global.get_market_context_async() | |
| if not updated_market_context: | |
| updated_market_context = market_context | |
| feature_processor = FeatureProcessor(updated_market_context, data_manager_global, learning_engine_global) | |
| top_candidates = feature_processor.filter_top_candidates(all_processed_candidates, OPPORTUNITY_COUNT) | |
| print(f"✅ Identified {len(top_candidates)} top candidates after final scoring.") | |
| await r2_service_global.save_candidates_data_async( | |
| candidates_data=top_candidates, | |
| reanalysis_data={"strategy_used": strategy_decision, "market_conditions": market_context} | |
| ) | |
| if not top_candidates: | |
| print("❌ No strong candidates left after final filtering.") | |
| await r2_service_global.save_system_logs_async({ | |
| "no_strong_candidates": True, "strategy": strategy_decision['primary_strategy'], | |
| "initial_candidates_count": len(high_potential_candidates) | |
| }) | |
| return | |
| print("🧠 Getting LLM analysis for top candidates...") | |
| for candidate in top_candidates: | |
| try: | |
| if not await validate_candidate_data_enhanced(candidate): | |
| print(f"⚠️ Skipping {candidate.get('symbol')} due to quality issues") | |
| continue | |
| llm_analysis_data = await llm_service_global.get_trading_decision(candidate) | |
| if not llm_analysis_data: | |
| print(f"⚠️ LLM analysis failed for {candidate['symbol']}. Moving to next.") | |
| continue | |
| if llm_analysis_data.get('action') == "HOLD": | |
| print(f"🧠 LLM decided to HOLD on {candidate['symbol']}. Moving to next.") | |
| continue | |
| if llm_analysis_data.get('action') in ["BUY", "SELL"]: | |
| # ✅ التحقق النهائي من الاستراتيجية | |
| final_strategy = llm_analysis_data.get('strategy') | |
| candidate_strategy = candidate.get('target_strategy', 'GENERIC') | |
| # إذا كانت استراتيجية LLM غير صالحة، استخدم استراتيجية المرشح | |
| if not final_strategy or final_strategy == 'unknown' or final_strategy == 'GENERIC': | |
| final_strategy = candidate_strategy | |
| llm_analysis_data['strategy'] = final_strategy | |
| print(f"🔧 تصحيح استراتيجية LLM لـ {candidate['symbol']}: {final_strategy}") | |
| print(f"🎯 الاستراتيجية النهائية: {final_strategy}") | |
| print("\n========================================================") | |
| print(f"💎💎💎 New Trading Opportunity Identified! 💎💎💎") | |
| print(f" Symbol: {candidate['symbol']}") | |
| print(f" Action: {llm_analysis_data.get('action')}") | |
| print(f" Strategy: {final_strategy}") | |
| print(f" Reasoning: {llm_analysis_data.get('reasoning')}") | |
| print(f" Confidence: {llm_analysis_data.get('confidence_level')}") | |
| print("========================================================\n") | |
| await r2_service_global.save_system_logs_async({ | |
| "new_opportunity_found": True, "symbol": candidate['symbol'], | |
| "action": llm_analysis_data.get('action'), "strategy": final_strategy, | |
| "confidence": llm_analysis_data.get('confidence_level', 0) | |
| }) | |
| return { | |
| "symbol": candidate['symbol'], | |
| "decision": llm_analysis_data, | |
| "current_price": candidate['current_price'], | |
| "strategy": final_strategy | |
| } | |
| except Exception as error: | |
| print(f"❌ LLM/Fallback error for {candidate.get('symbol', 'unknown')}: {error}") | |
| traceback.print_exc() | |
| print("✅ Cycle finished. No actionable BUY/SELL opportunities found by LLM.") | |
| return None | |
| except Exception as error: | |
| print(f"❌ An error occurred while scanning for opportunities: {error}") | |
| traceback.print_exc() | |
| await r2_service_global.save_system_logs_async({"opportunity_scan_error": True, "error": str(error)}) | |
| return None | |
| async def re_analyze_open_trade_async(trade_data): | |
| """Re-analyzes an open trade with enhanced strategy preservation""" | |
| symbol = trade_data.get('symbol') | |
| try: | |
| entry_time = datetime.fromisoformat(trade_data['entry_timestamp']) | |
| current_time = datetime.now() | |
| hold_minutes = (current_time - entry_time).total_seconds() / 60 | |
| print(f"⏳ Re-analyzing trade: {symbol} (held for {hold_minutes:.1f} minutes)") | |
| # ✅ الإصلاح المحسن: الحفاظ على الاستراتيجية الأصلية مع التحقق الشامل | |
| original_strategy = trade_data.get('strategy') | |
| if not original_strategy or original_strategy == 'unknown': | |
| original_strategy = trade_data.get('decision_data', {}).get('strategy', 'GENERIC') | |
| print(f"🔧 Fixed original strategy for {symbol}: {original_strategy}") | |
| reanalysis_context = { | |
| 'trade_data': { | |
| 'symbol': trade_data.get('symbol'), | |
| 'entry_price': trade_data.get('entry_price'), | |
| 'entry_time': trade_data.get('entry_timestamp'), | |
| 'hold_minutes': hold_minutes, | |
| 'strategy': original_strategy | |
| } | |
| } | |
| try: | |
| market_context = await data_manager_global.get_market_context_async() | |
| except Exception as error: | |
| print(f"⚠️ Failed to get market context: {error}. Using basic market data...") | |
| market_context = {'btc_sentiment': 'NEUTRAL'} | |
| symbol_with_reasons = [{'symbol': symbol, 'reasons': ['re-analysis']}] | |
| ohlcv_data_list = await data_manager_global.get_fast_pass_data_async(symbol_with_reasons) | |
| if not ohlcv_data_list: | |
| print(f"❌ Failed to fetch latest data for {symbol}.") | |
| return None | |
| raw_data = ohlcv_data_list[0] | |
| # ✅ تحديث market_context قبل المعالجة | |
| try: | |
| updated_market_context = await data_manager_global.get_market_context_async() | |
| if updated_market_context: | |
| market_context = updated_market_context | |
| except Exception as e: | |
| print(f"⚠️ Failed to update market context for re-analysis: {e}") | |
| feature_processor = FeatureProcessor(market_context, data_manager_global, learning_engine_global) | |
| processed_data = await feature_processor.process_and_score_symbol(raw_data) | |
| if not processed_data: | |
| print(f"❌ Failed to process latest data for {symbol}.") | |
| return None | |
| await r2_service_global.save_candidates_data_async( | |
| candidates_data=None, | |
| reanalysis_data={'market_context': market_context, 'processed_data': processed_data} | |
| ) | |
| print(f"🧠 Getting LLM re-analysis for {symbol}...") | |
| try: | |
| re_analysis_decision = await llm_service_global.re_analyze_trade_async(trade_data, processed_data) | |
| source = re_analysis_decision.get('model_source', 'LLM') | |
| except Exception as error: | |
| print(f"❌ LLM re-analysis error: {error}. Falling back to local.") | |
| re_analysis_decision = local_re_analyze_trade(trade_data, processed_data) | |
| source = 'local_fallback' | |
| final_decision = _apply_patience_logic(re_analysis_decision, hold_minutes, trade_data, processed_data) | |
| # ✅ الإصلاح النهائي: التأكد من وجود الاستراتيجية في القرار النهائي | |
| if not final_decision.get('strategy') or final_decision['strategy'] == 'unknown': | |
| final_decision['strategy'] = original_strategy | |
| print(f"🔧 Final re-analysis strategy fix for {symbol}: {original_strategy}") | |
| print(f"✅ Re-analysis decision for {symbol}: {final_decision.get('action')}. Strategy: {final_decision.get('strategy')}. Source: {source}") | |
| await r2_service_global.save_system_logs_async({ | |
| "trade_reanalyzed": True, "symbol": symbol, "action": final_decision.get('action'), | |
| "hold_minutes": hold_minutes, "source": source, "strategy": final_decision.get('strategy') | |
| }) | |
| return { | |
| "symbol": symbol, "decision": final_decision, | |
| "current_price": processed_data.get('current_price'), "hold_minutes": hold_minutes | |
| } | |
| except Exception as error: | |
| print(f"❌ An error occurred during trade re-analysis: {error}") | |
| traceback.print_exc() | |
| await r2_service_global.save_system_logs_async({"reanalysis_error": True, "symbol": symbol, "error": str(error)}) | |
| return None | |
| def _apply_patience_logic(decision, hold_minutes, trade_data, processed_data): | |
| """Apply patience logic to prevent premature selling decisions""" | |
| action = decision.get('action') | |
| if action == "CLOSE_TRADE" and hold_minutes < 20: | |
| current_price = processed_data.get('current_price', 0) | |
| entry_price = trade_data.get('entry_price', 0) | |
| try: | |
| profit_loss_percent = ((current_price - entry_price) / entry_price) * 100 | |
| except (TypeError, ZeroDivisionError): | |
| profit_loss_percent = 0 | |
| if profit_loss_percent < 2: | |
| print(f"🛑 Blocked premature selling! Only {hold_minutes:.1f} minutes held, PnL: {profit_loss_percent:.2f}%") | |
| decision['action'] = "HOLD" | |
| decision['reasoning'] = f"Patience Filter: Blocked premature sell. Held for {hold_minutes:.1f}m. Giving trade more time." | |
| return decision | |
| return decision | |
| async def run_bot_cycle_async(): | |
| """The main asynchronous bot cycle with enhanced strategy validation""" | |
| print(f"\n{'='*70}") | |
| print(f"⏳ New cycle initiated at: {datetime.now().isoformat()}") | |
| print(f"{'='*70}") | |
| try: | |
| await r2_service_global.save_system_logs_async({"cycle_started": True}) | |
| if not r2_service_global.acquire_lock(): | |
| print("❌ Failed to acquire lock. Skipping cycle.") | |
| return | |
| open_trades = [] | |
| try: | |
| open_trades = await r2_service_global.get_open_trades_async() | |
| print(f"✅ Found {len(open_trades)} open trade(s).") | |
| # ✅ الإصلاح المحسن: فحص وإصلاح الاستراتيجيات الفارغة في الصفقات المفتوحة | |
| trades_fixed = 0 | |
| for trade in open_trades: | |
| if not trade.get('strategy') or trade['strategy'] == 'unknown': | |
| original_strategy = trade.get('decision_data', {}).get('strategy', 'GENERIC') | |
| trade['strategy'] = original_strategy | |
| trades_fixed += 1 | |
| print(f"🔧 Fixed missing strategy for {trade['symbol']}: {trade['strategy']}") | |
| if trades_fixed > 0: | |
| print(f"✅ Fixed strategies for {trades_fixed} trades.") | |
| await r2_service_global.save_open_trades_async(open_trades) | |
| should_look_for_new_trade = not open_trades | |
| if open_trades: | |
| now = datetime.now() | |
| trades_to_reanalyze = [ | |
| trade for trade in open_trades | |
| if now >= datetime.fromisoformat(trade.get('expected_target_time', now.isoformat())) | |
| ] | |
| if trades_to_reanalyze: | |
| print(f"🔍 Re-analyzing {len(trades_to_reanalyze)} trade(s)...") | |
| for trade in trades_to_reanalyze: | |
| result = await re_analyze_open_trade_async(trade) | |
| if result and result['decision'].get('action') == "CLOSE_TRADE": | |
| await r2_service_global.close_trade_async(trade, result['current_price']) | |
| print(f"✅ Trade for {trade['symbol']} CLOSED. Strategy: {trade.get('strategy', 'unknown')}") | |
| if learning_engine_global and learning_engine_global.initialized: | |
| trade_with_strategy = trade.copy() | |
| strategy = result['decision'].get('strategy', trade.get('strategy', 'GENERIC')) | |
| trade_with_strategy['strategy'] = strategy | |
| await learning_engine_global.analyze_trade_outcome(trade_with_strategy, 'CLOSED_BY_REANALYSIS') | |
| should_look_for_new_trade = True | |
| elif result and result['decision'].get('action') == "UPDATE_TRADE": | |
| await r2_service_global.update_trade_async(trade, result['decision']) | |
| print(f"✅ Trade for {trade['symbol']} UPDATED. Strategy: {trade.get('strategy', 'unknown')}") | |
| else: | |
| print(f"✅ Trade for {trade['symbol']} is on HOLD. Strategy: {trade.get('strategy', 'unknown')}") | |
| else: | |
| print("✅ No trades due for re-analysis yet.") | |
| if should_look_for_new_trade: | |
| portfolio_state = await r2_service_global.get_portfolio_state_async() | |
| if portfolio_state.get("current_capital_usd", 0) > 1: | |
| print(f"✅ Capital available (${portfolio_state['current_capital_usd']:.2f}). Scanning...") | |
| new_opportunity = await find_new_opportunities_async() | |
| if new_opportunity: | |
| print(f"✅ Opportunity for {new_opportunity['symbol']} confirmed! Saving trade. Strategy: {new_opportunity.get('strategy')}") | |
| # ✅ التحقق النهائي قبل الحفظ | |
| if not new_opportunity['decision'].get('strategy'): | |
| new_opportunity['decision']['strategy'] = new_opportunity.get('strategy', 'GENERIC') | |
| print(f"🔧 Final pre-save strategy fix: {new_opportunity['decision']['strategy']}") | |
| await r2_service_global.save_new_trade_async( | |
| new_opportunity['symbol'], | |
| new_opportunity['decision'], | |
| new_opportunity['current_price'] | |
| ) | |
| newly_opened_trades = await r2_service_global.get_open_trades_async() | |
| for trade in newly_opened_trades: | |
| if trade['symbol'] == new_opportunity['symbol']: | |
| asyncio.create_task(realtime_monitor._monitor_single_trade(trade)) | |
| break | |
| else: | |
| print("✅ Scan complete. No actionable opportunities identified.") | |
| else: | |
| print("😴 No available capital. Waiting for current trade to close.") | |
| finally: | |
| print("✅ Cycle finished. Releasing lock.") | |
| r2_service_global.release_lock() | |
| await r2_service_global.save_system_logs_async({"cycle_completed": True, "open_trades": len(open_trades)}) | |
| except Exception as error: | |
| print(f"❌ Unhandled error in main cycle: {error}") | |
| traceback.print_exc() | |
| await r2_service_global.save_system_logs_async({"cycle_error": True, "error": str(error)}) | |
| if r2_service_global.lock_acquired: | |
| r2_service_global.release_lock() | |
| async def lifespan(application: FastAPI): | |
| global r2_service_global, data_manager_global, llm_service_global, learning_engine_global, realtime_monitor | |
| print("===== Application Startup =====") | |
| try: | |
| r2_service_global = R2Service() | |
| llm_service_global = LLMService() | |
| contracts_database = await r2_service_global.load_contracts_db_async() | |
| data_manager_global = DataManager(contracts_database) | |
| await data_manager_global.initialize() | |
| # ✅ تهيئة نظام التعلم مع تمرير data_manager | |
| learning_engine_global = LearningEngine(r2_service_global, data_manager_global) | |
| await learning_engine_global.initialize_enhanced() # ✅ استخدام التهيئة المحسنة | |
| # ✅ إجبار تحديث الاستراتيجيات من البيانات الحالية | |
| await learning_engine_global.force_strategy_learning() | |
| # ✅ التحقق من أن الأوزان يتم تحميلها | |
| if learning_engine_global.initialized: | |
| weights = await learning_engine_global.get_optimized_strategy_weights("bull_market") | |
| print(f"🎯 الأوزان المحملة: {weights}") | |
| realtime_monitor = RealTimeTradeMonitor() | |
| asyncio.create_task(monitor_market_async()) | |
| asyncio.create_task(realtime_monitor.start_monitoring()) | |
| await r2_service_global.save_system_logs_async({"application_started": True}) | |
| print("\n✅ All services initialized. Application is ready.\n") | |
| yield | |
| except Exception as error: | |
| print(f"❌ Application startup failed: {error}") | |
| traceback.print_exc() | |
| if r2_service_global: | |
| await r2_service_global.save_system_logs_async({"application_startup_failed": True, "error": str(error)}) | |
| raise | |
| finally: | |
| await cleanup_on_shutdown() | |
| application = FastAPI(lifespan=lifespan) | |
| async def run_cycle_api(): | |
| """API endpoint to trigger the bot cycle.""" | |
| asyncio.create_task(run_bot_cycle_async()) | |
| return {"message": "Bot cycle initiated in the background."} | |
| async def health_check(): | |
| """Detailed health check.""" | |
| learning_metrics = {} | |
| if learning_engine_global and learning_engine_global.initialized: | |
| learning_metrics = await learning_engine_global.calculate_performance_metrics() | |
| return { | |
| "status": "healthy", | |
| "timestamp": datetime.now().isoformat(), | |
| "services": { | |
| "r2_service": "initialized" if r2_service_global else "uninitialized", | |
| "llm_service": "initialized" if llm_service_global else "uninitialized", | |
| "data_manager": "initialized" if data_manager_global else "uninitialized", | |
| "learning_engine": "active" if learning_engine_global and learning_engine_global.initialized else "inactive", | |
| "realtime_monitor": "running" if realtime_monitor and realtime_monitor.is_running else "stopped" | |
| }, | |
| "market_state_ok": state.MARKET_STATE_OK, | |
| "learning_engine": learning_metrics | |
| } | |
| async def get_performance_stats(): | |
| """Get performance statistics for all services.""" | |
| try: | |
| market_context = await data_manager_global.get_market_context_async() if data_manager_global else {} | |
| learning_stats = {} | |
| improvement_suggestions = [] | |
| if learning_engine_global and learning_engine_global.initialized: | |
| learning_stats = await learning_engine_global.calculate_performance_metrics() | |
| improvement_suggestions = await learning_engine_global.suggest_improvements() | |
| stats = { | |
| "timestamp": datetime.now().isoformat(), | |
| "data_manager": data_manager_global.get_performance_stats() if data_manager_global else {}, | |
| "market_state": { | |
| "is_healthy": state.MARKET_STATE_OK, | |
| "description": "Market is healthy for trading" if state.MARKET_STATE_OK else "Market conditions are unfavorable", | |
| "context": market_context | |
| }, | |
| "realtime_monitoring": { | |
| "active_trades": len(realtime_monitor.monitoring_tasks) if realtime_monitor else 0, | |
| "is_running": realtime_monitor.is_running if realtime_monitor else False | |
| }, | |
| "learning_engine": learning_stats, | |
| "improvement_suggestions": improvement_suggestions | |
| } | |
| return stats | |
| except Exception as error: | |
| raise HTTPException(status_code=500, detail=f"Failed to retrieve stats: {str(error)}") | |
| async def get_logs_status(): | |
| """Get status of logging system.""" | |
| try: | |
| open_trades = await r2_service_global.get_open_trades_async() | |
| portfolio_state = await r2_service_global.get_portfolio_state_async() | |
| return { | |
| "logging_system": "active", | |
| "open_trades_count": len(open_trades), | |
| "current_capital": portfolio_state.get("current_capital_usd", 0), | |
| "total_trades": portfolio_state.get("total_trades", 0), | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| except Exception as error: | |
| raise HTTPException(status_code=500, detail=f"Failed to get logs status: {str(error)}") | |
| async def cleanup_on_shutdown(): | |
| """Cleanup function for graceful shutdown.""" | |
| global r2_service_global, data_manager_global, realtime_monitor, learning_engine_global | |
| print("\n🛑 Shutdown signal received. Cleaning up...") | |
| if r2_service_global: | |
| try: | |
| await r2_service_global.save_system_logs_async({"application_shutdown": True}) | |
| except Exception as log_error: | |
| print(f"⚠️ Failed to save shutdown log: {log_error}") | |
| if learning_engine_global and learning_engine_global.initialized: | |
| try: | |
| await learning_engine_global.save_weights_to_r2() | |
| await learning_engine_global.save_performance_history() | |
| print("✅ Learning engine data saved.") | |
| except Exception as e: | |
| print(f"⚠️ Failed to save learning engine data: {e}") | |
| if realtime_monitor: | |
| realtime_monitor.stop_monitoring() | |
| if r2_service_global and r2_service_global.lock_acquired: | |
| r2_service_global.release_lock() | |
| print("✅ Lock released.") | |
| if data_manager_global: | |
| await data_manager_global.close() | |
| print("✅ Cleanup completed.") | |
| def signal_handler(signum, frame): | |
| """Handle shutdown signals.""" | |
| print(f"\n⚠️ Received signal {signum}") | |
| asyncio.create_task(cleanup_on_shutdown()) | |
| sys.exit(0) | |
| signal.signal(signal.SIGINT, signal_handler) | |
| signal.signal(signal.SIGTERM, signal_handler) | |
| if __name__ == "__main__": | |
| uvicorn.run(application, host="0.0.0.0", port=7860) |