Trad / app.py
Riy777's picture
Rename app (32).py to app.py
f1316bd
raw
history blame
46 kB
# app.py - الإصدار المحدث مع إصلاح الأخطاء الحرجة
import os
import traceback
import signal
import sys
import uvicorn
import asyncio
from contextlib import asynccontextmanager
from fastapi import FastAPI, BackgroundTasks, HTTPException
from datetime import datetime, timedelta
from r2 import R2Service
from LLM import LLMService, local_analyze_opportunity, local_re_analyze_trade
from data_manager import DataManager
from ML import MLProcessor as FeatureProcessor
from learning_engine import LearningEngine
import time
import json
import state
import re
# إعدادات النظام
TOP_N_SYMBOLS = 100
OPPORTUNITY_COUNT = 10
CHUNK_SIZE = 5
# المتغيرات العامة للنظام
r2_service_global = None
data_manager_global = None
llm_service_global = None
learning_engine_global = None
realtime_monitor = None
# Real-time trade monitoring with enhanced risk management
class RealTimeTradeMonitor:
def __init__(self):
self.monitoring_tasks = {}
self.is_running = False
async def start_monitoring(self):
"""بدء مراقبة جميع الصفقات المفتوحة"""
self.is_running = True
print("🔍 Starting real-time trade monitoring...")
while self.is_running:
try:
open_trades = await r2_service_global.get_open_trades_async()
for trade in open_trades:
symbol = trade['symbol']
if symbol not in self.monitoring_tasks:
asyncio.create_task(self._monitor_single_trade(trade))
self.monitoring_tasks[symbol] = trade
current_symbols = {trade['symbol'] for trade in open_trades}
for symbol in list(self.monitoring_tasks.keys()):
if symbol not in current_symbols:
del self.monitoring_tasks[symbol]
await asyncio.sleep(10)
except Exception as error:
print(f"❌ Real-time monitor error: {error}")
await asyncio.sleep(30)
async def _monitor_single_trade(self, trade):
"""مراقبة صفقة فردية في الوقت الحقيقي"""
symbol = trade['symbol']
strategy = trade.get('strategy', 'GENERIC')
print(f"📊 Starting real-time monitoring for {symbol} (Strategy: {strategy})")
while symbol in self.monitoring_tasks and self.is_running:
try:
current_price = await data_manager_global.get_latest_price_async(symbol)
if not current_price:
await asyncio.sleep(15)
continue
entry_price = trade['entry_price']
stop_loss = trade.get('stop_loss')
take_profit = trade.get('take_profit')
should_close = False
close_reason = ""
if stop_loss and current_price <= stop_loss:
should_close = True
close_reason = f"Stop loss hit: {current_price} <= {stop_loss}"
elif take_profit and current_price >= take_profit:
should_close = True
close_reason = f"Take profit hit: {current_price} >= {take_profit}"
if not should_close and current_price > entry_price:
dynamic_stop = current_price * 0.98
if dynamic_stop > (stop_loss or 0):
trade['stop_loss'] = dynamic_stop
print(f"🔒 Updated trailing stop for {symbol}: {dynamic_stop:.4f}")
if should_close:
print(f"🚨 IMMEDIATE CLOSE: {symbol} - {close_reason} - Strategy: {strategy}")
if r2_service_global.acquire_lock():
try:
await r2_service_global.close_trade_async(trade, current_price)
print(f"✅ Trade {symbol} closed immediately at {current_price}. Strategy: {strategy}")
if learning_engine_global and learning_engine_global.initialized:
await learning_engine_global.analyze_trade_outcome(trade, 'CLOSED_BY_MONITOR')
asyncio.create_task(run_bot_cycle_async())
finally:
r2_service_global.release_lock()
if symbol in self.monitoring_tasks:
del self.monitoring_tasks[symbol]
break
await asyncio.sleep(15)
except Exception as error:
print(f"❌ Real-time monitoring error for {symbol}: {error}")
await asyncio.sleep(30)
def stop_monitoring(self):
"""إيقاف جميع مهام المراقبة"""
self.is_running = False
self.monitoring_tasks.clear()
print("🛑 Real-time trade monitoring stopped")
async def monitor_market_async():
"""Background task to continuously monitor market health"""
global data_manager_global
init_attempts = 0
while data_manager_global is None and init_attempts < 10:
print(f"⏳ Waiting for data manager initialization... (attempt {init_attempts + 1}/10)")
await asyncio.sleep(3)
init_attempts += 1
if data_manager_global is None:
print("❌ Data manager failed to initialize after 10 attempts")
return
while True:
try:
print("👁️ Monitoring market sentiment...")
try:
market_context = await data_manager_global.get_market_context_async()
except Exception as error:
print(f"⚠️ Failed to get market context: {error}")
market_context = await get_fallback_market_context()
if not market_context:
print("❌ Failed to get market context. Assuming neutral state.")
state.MARKET_STATE_OK = True
await asyncio.sleep(60)
continue
whale_analysis = market_context.get('general_whale_activity', {})
whale_sentiment = whale_analysis.get('sentiment', 'NEUTRAL')
is_critical = whale_analysis.get('critical_alert', False)
total_volume = whale_analysis.get('total_volume_usd', 0)
print(f"🐋 Whale Analysis: {whale_sentiment} | Critical: {is_critical} | Volume: ${total_volume:,.0f}")
print(f"📈 Whale Description: {whale_analysis.get('description', 'No data')}")
bitcoin_sentiment = market_context.get('btc_sentiment')
fear_greed_index = market_context.get('fear_and_greed_index')
should_halt_trading = False
halt_reason = ""
if is_critical:
should_halt_trading = True
halt_reason = f"CRITICAL whale activity detected: {whale_analysis.get('description')}"
elif bitcoin_sentiment == 'BEARISH' and (fear_greed_index is not None and fear_greed_index < 30):
should_halt_trading = True
halt_reason = f"Bearish market conditions (BTC: {bitcoin_sentiment}, F&G: {fear_greed_index})"
if should_halt_trading:
print(f"🚨🚨🚨 MARKET HALT: {halt_reason} 🚨🚨🚨")
state.MARKET_STATE_OK = False
try:
await r2_service_global.save_system_logs_async({
"market_halt": True,
"reason": halt_reason,
"whale_sentiment": whale_sentiment,
"is_critical": is_critical
})
except Exception as log_error:
print(f"⚠️ Failed to save market halt log: {log_error}")
else:
if not state.MARKET_STATE_OK:
print("🟢 Market conditions improved. Resuming normal operations.")
state.MARKET_STATE_OK = True
await asyncio.sleep(60)
except Exception as error:
print(f"❌ An error occurred during COMPREHENSIVE market monitoring: {error}")
traceback.print_exc()
state.MARKET_STATE_OK = True
await asyncio.sleep(60)
async def get_fallback_market_context():
"""Fallback function when main market context fails"""
return {
'timestamp': datetime.now().isoformat(),
'general_whale_activity': {
'sentiment': 'NEUTRAL',
'description': 'Fallback mode - system initializing',
'critical_alert': False,
'transaction_count': 0,
'total_volume_usd': 0
},
'btc_sentiment': 'NEUTRAL',
'fear_and_greed_index': 50
}
def safe_float_conversion(value, default=0.0):
"""تحويل آمن للقيم إلى أرقام"""
try:
if value is None:
return default
if isinstance(value, (int, float)):
return float(value)
if isinstance(value, str):
cleaned = ''.join(character for character in value if character.isdigit() or character in '.-')
return float(cleaned) if cleaned else default
return default
except (ValueError, TypeError):
return default
async def validate_candidate_data_enhanced(candidate):
"""✨ تحسين التحقق من جودة المرشحين"""
try:
required_fields = ['symbol', 'current_price', 'final_score', 'enhanced_final_score']
for field in required_fields:
if field not in candidate:
candidate[field] = 0.0 if field.endswith('_score') or field == 'current_price' else 'UNKNOWN'
candidate['current_price'] = safe_float_conversion(candidate.get('current_price'), 0.0)
candidate['final_score'] = safe_float_conversion(candidate.get('final_score'), 0.5)
candidate['enhanced_final_score'] = safe_float_conversion(candidate.get('enhanced_final_score'), candidate['final_score'])
if 'reasons_for_candidacy' not in candidate or not candidate['reasons_for_candidacy']:
candidate['reasons_for_candidacy'] = ['unknown_reason']
if 'sentiment_data' not in candidate or not candidate['sentiment_data']:
candidate['sentiment_data'] = {
'btc_sentiment': 'NEUTRAL',
'fear_and_greed_index': 50,
'general_whale_activity': {'sentiment': 'NEUTRAL', 'critical_alert': False}
}
if 'advanced_indicators' not in candidate:
candidate['advanced_indicators'] = {}
if 'strategy_scores' not in candidate:
candidate['strategy_scores'] = {}
if 'recommended_strategy' not in candidate:
candidate['recommended_strategy'] = 'unknown'
# ✅ الإصلاح: التأكد من وجود استراتيجية مستهدفة صالحة
if 'target_strategy' not in candidate or not candidate['target_strategy'] or candidate['target_strategy'] == 'unknown':
candidate['target_strategy'] = 'GENERIC'
return True
except Exception as error:
print(f"❌ Failed to validate candidate data for {candidate.get('symbol')}: {error}")
return False
async def analyze_market_strategy(market_context):
"""تحديد الاستراتيجية المثلى بناءً على ظروف السوق"""
try:
prompt = f"""
You are a professional crypto portfolio manager. Analyze the current market conditions and determine the most suitable strategy.
**Market Data:**
- BTC Price: {market_context.get('bitcoin_price_usd')}
- BTC Sentiment: {market_context.get('btc_sentiment')}
- Fear & Greed Index: {market_context.get('fear_and_greed_index')}
- Whale Analysis: {market_context.get('general_whale_activity', {}).get('sentiment')}
- Critical Whale Alert: {market_context.get('general_whale_activity', {}).get('critical_alert')}
**Available Strategies:**
1. AGGRESSIVE_GROWTH - For strong bull markets.
2. DEFENSIVE_GROWTH - For volatile or uncertain markets.
3. CONSERVATIVE - For bearish or high-risk markets.
4. HIGH_FREQUENCY - For sideways markets.
5. WHALE_FOLLOWING - When whale activity is high and clear.
6. GENERIC - Balanced approach for normal conditions.
**Required:**
- Choose one primary strategy.
- Explain why in a single sentence.
- Set an acceptable risk tolerance (1 to 10).
- Determine the optimal number of coins to scan (50 to 200).
**Output (JSON only):**
{{
"primary_strategy": "STRATEGY_NAME",
"reasoning": "Brief reasoning.",
"risk_tolerance": 5,
"optimal_scan_count": 100
}}
"""
response = await llm_service_global._call_llm(prompt)
try:
json_match = re.search(r'\{.*\}', response, re.DOTALL)
strategy_data = json.loads(json_match.group())
except:
strategy_data = {
"primary_strategy": "GENERIC",
"reasoning": "Fallback strategy for market stability",
"risk_tolerance": 5,
"optimal_scan_count": 100,
}
return strategy_data
except Exception as error:
print(f"❌ Failed to analyze market strategy: {error}")
return {
"primary_strategy": "GENERIC",
"reasoning": "Fallback due to analysis error",
"risk_tolerance": 5,
"optimal_scan_count": 100,
}
async def find_strategy_specific_candidates(strategy, scan_count):
"""✨ نظام فلترة ذكي يستخدم الاستراتيجيات المتخصصة - عتبات مخفضة"""
try:
# 1. جلب قائمة المرشحين الأولية
all_candidates = await data_manager_global.find_high_potential_candidates(scan_count * 2)
if not all_candidates:
print(f"⚠️ الماسح العام لم يجد أي مرشحين أوليين.")
return []
# 2. تحديث market_context قبل المعالجة
market_context = await data_manager_global.get_market_context_async()
if not market_context:
print("❌ Failed to get market context for strategy analysis")
return []
feature_processor = FeatureProcessor(market_context, data_manager_global, learning_engine_global)
processed_candidates = []
for candidate in all_candidates[:30]: # ⬇️ تخفيض من 50 إلى 30 للأداء
try:
# تحويل البيانات الخام إلى بيانات معالجة
symbol_with_reasons = [{'symbol': candidate['symbol'], 'reasons': candidate.get('reasons', [])}]
ohlcv_data = await data_manager_global.get_fast_pass_data_async(symbol_with_reasons)
if ohlcv_data and ohlcv_data[0]:
# ✅ تحديث market_context قبل كل معالجة لمنع الخطأ
try:
updated_market_context = await data_manager_global.get_market_context_async()
if updated_market_context:
feature_processor.market_context = updated_market_context
except Exception as e:
print(f"⚠️ Failed to update market context for {candidate['symbol']}: {e}")
# الاستمرار بالسياق القديم إذا فشل التحديث
processed = await feature_processor.process_and_score_symbol_enhanced(ohlcv_data[0])
if processed:
processed_candidates.append(processed)
except Exception as e:
print(f"⚠️ فشل معالجة {candidate.get('symbol')}: {e}")
if not processed_candidates:
print("⚠️ لم يتم معالجة أي مرشح بنجاح")
return []
# 3. فرز المرشحين حسب الاستراتيجية المطلوبة
if strategy != 'GENERIC':
# تحديد أفضل المرشحين للاستراتيجية المحددة
strategy_candidates = []
for candidate in processed_candidates:
# ✅ استخدام الدرجات الأساسية بدلاً من المرجحة
base_scores = candidate.get('base_strategy_scores', {})
strategy_score = base_scores.get(strategy, 0)
# ✅ تخفيض كبير في عتبة القبول
if strategy_score > 0.2: # ⬇️ تخفيض من 0.4 إلى 0.2
candidate['strategy_match_score'] = strategy_score
strategy_candidates.append(candidate)
print(f"✅ {candidate['symbol']} مناسب لـ {strategy} (درجة: {strategy_score:.3f})")
# فرز حسب تطابق الاستراتيجية
sorted_candidates = sorted(strategy_candidates,
key=lambda x: x.get('strategy_match_score', 0),
reverse=True)
top_candidates = sorted_candidates[:15] # ⬇️ تخفيض من 20 إلى 15
print(f"✅ تم اختيار {len(top_candidates)} مرشحًا لاستراتيجية {strategy}")
else:
# للاستراتيجية العامة، استخدم النقاط المحسنة
sorted_candidates = sorted(processed_candidates,
key=lambda x: x.get('enhanced_final_score', 0),
reverse=True)
top_candidates = sorted_candidates[:15] # ⬇️ تخفيض من 20 إلى 15
print(f"✅ تم اختيار {len(top_candidates)} مرشحًا للاستراتيجية العامة")
return top_candidates
except Exception as error:
print(f"❌ فشل في نظام الفلترة المتقدم: {error}")
traceback.print_exc()
return []
async def find_new_opportunities_async():
"""✨ NEW: المسح المحسن باستراتيجية مسبقة مع عتبات مخفضة"""
print("🔍 Scanning for new opportunities with reduced thresholds...")
try:
await r2_service_global.save_system_logs_async({
"opportunity_scan_started": True, "timestamp": datetime.now().isoformat()
})
print("🧠 Determining trading strategy...")
market_context = await data_manager_global.get_market_context_async()
if not market_context:
print("❌ Failed to fetch market context. Cannot determine strategy.")
return
strategy_decision = await analyze_market_strategy(market_context)
print(f"🎯 Selected Strategy: {strategy_decision['primary_strategy']}")
print(f"📝 Reasoning: {strategy_decision['reasoning']}")
print(f"⚡ Risk Tolerance: {strategy_decision.get('risk_tolerance', 5)}/10")
print(f"🔍 Optimal Scan Count: {strategy_decision.get('optimal_scan_count', 100)}")
print(f"🔍 Finding top candidates using dynamic ranking...")
high_potential_candidates = await find_strategy_specific_candidates(
strategy_decision['primary_strategy'],
strategy_decision.get('optimal_scan_count', 100)
)
if not high_potential_candidates:
print("🔄 لا توجد مرشحين متخصصين، جلب مرشحين عامين...")
# ✅ استرجاع مرشحين عامين كبديل
high_potential_candidates = await data_manager_global.find_high_potential_candidates(20)
if high_potential_candidates:
for candidate in high_potential_candidates:
candidate['target_strategy'] = 'GENERIC'
print(f"✅ تم تحميل {len(high_potential_candidates)} مرشح عام")
else:
print("✅ No new candidates found after dynamic ranking.")
await r2_service_global.save_system_logs_async({
"no_candidates_found": True, "strategy": strategy_decision['primary_strategy'],
"reason": "Scanner did not return any initial candidates."
})
return
all_processed_candidates = []
for index in range(0, len(high_potential_candidates), CHUNK_SIZE):
chunk = high_potential_candidates[index:index+CHUNK_SIZE]
chunk_data = await data_manager_global.get_fast_pass_data_async(chunk)
print(f"⏳ Processing and scoring chunk {index//CHUNK_SIZE + 1}...")
# ✅ تحديث market_context قبل معالجة كل شريحة
updated_market_context = await data_manager_global.get_market_context_async()
if not updated_market_context:
updated_market_context = market_context # استخدام السياق القديم كبديل
feature_processor = FeatureProcessor(updated_market_context, data_manager_global, learning_engine_global)
processed_chunk = await asyncio.gather(*[
feature_processor.process_and_score_symbol_enhanced(data) for data in chunk_data
])
all_processed_candidates.extend([c for c in processed_chunk if c is not None])
await asyncio.sleep(1)
if not all_processed_candidates:
print("❌ No candidates were processed successfully.")
return
# ✅ استخدام السياق المحدث للتصفية النهائية
updated_market_context = await data_manager_global.get_market_context_async()
if not updated_market_context:
updated_market_context = market_context
feature_processor = FeatureProcessor(updated_market_context, data_manager_global, learning_engine_global)
top_candidates = feature_processor.filter_top_candidates(all_processed_candidates, OPPORTUNITY_COUNT)
print(f"✅ Identified {len(top_candidates)} top candidates after final scoring.")
await r2_service_global.save_candidates_data_async(
candidates_data=top_candidates,
reanalysis_data={"strategy_used": strategy_decision, "market_conditions": market_context}
)
if not top_candidates:
print("❌ No strong candidates left after final filtering.")
await r2_service_global.save_system_logs_async({
"no_strong_candidates": True, "strategy": strategy_decision['primary_strategy'],
"initial_candidates_count": len(high_potential_candidates)
})
return
print("🧠 Getting LLM analysis for top candidates...")
for candidate in top_candidates:
try:
if not await validate_candidate_data_enhanced(candidate):
print(f"⚠️ Skipping {candidate.get('symbol')} due to quality issues")
continue
llm_analysis_data = await llm_service_global.get_trading_decision(candidate)
if not llm_analysis_data:
print(f"⚠️ LLM analysis failed for {candidate['symbol']}. Moving to next.")
continue
if llm_analysis_data.get('action') == "HOLD":
print(f"🧠 LLM decided to HOLD on {candidate['symbol']}. Moving to next.")
continue
if llm_analysis_data.get('action') in ["BUY", "SELL"]:
# ✅ التحقق النهائي من الاستراتيجية
final_strategy = llm_analysis_data.get('strategy')
candidate_strategy = candidate.get('target_strategy', 'GENERIC')
# إذا كانت استراتيجية LLM غير صالحة، استخدم استراتيجية المرشح
if not final_strategy or final_strategy == 'unknown' or final_strategy == 'GENERIC':
final_strategy = candidate_strategy
llm_analysis_data['strategy'] = final_strategy
print(f"🔧 تصحيح استراتيجية LLM لـ {candidate['symbol']}: {final_strategy}")
print(f"🎯 الاستراتيجية النهائية: {final_strategy}")
print("\n========================================================")
print(f"💎💎💎 New Trading Opportunity Identified! 💎💎💎")
print(f" Symbol: {candidate['symbol']}")
print(f" Action: {llm_analysis_data.get('action')}")
print(f" Strategy: {final_strategy}")
print(f" Reasoning: {llm_analysis_data.get('reasoning')}")
print(f" Confidence: {llm_analysis_data.get('confidence_level')}")
print("========================================================\n")
await r2_service_global.save_system_logs_async({
"new_opportunity_found": True, "symbol": candidate['symbol'],
"action": llm_analysis_data.get('action'), "strategy": final_strategy,
"confidence": llm_analysis_data.get('confidence_level', 0)
})
return {
"symbol": candidate['symbol'],
"decision": llm_analysis_data,
"current_price": candidate['current_price'],
"strategy": final_strategy
}
except Exception as error:
print(f"❌ LLM/Fallback error for {candidate.get('symbol', 'unknown')}: {error}")
traceback.print_exc()
print("✅ Cycle finished. No actionable BUY/SELL opportunities found by LLM.")
return None
except Exception as error:
print(f"❌ An error occurred while scanning for opportunities: {error}")
traceback.print_exc()
await r2_service_global.save_system_logs_async({"opportunity_scan_error": True, "error": str(error)})
return None
async def re_analyze_open_trade_async(trade_data):
"""Re-analyzes an open trade with enhanced strategy preservation"""
symbol = trade_data.get('symbol')
try:
entry_time = datetime.fromisoformat(trade_data['entry_timestamp'])
current_time = datetime.now()
hold_minutes = (current_time - entry_time).total_seconds() / 60
print(f"⏳ Re-analyzing trade: {symbol} (held for {hold_minutes:.1f} minutes)")
# ✅ الإصلاح المحسن: الحفاظ على الاستراتيجية الأصلية مع التحقق الشامل
original_strategy = trade_data.get('strategy')
if not original_strategy or original_strategy == 'unknown':
original_strategy = trade_data.get('decision_data', {}).get('strategy', 'GENERIC')
print(f"🔧 Fixed original strategy for {symbol}: {original_strategy}")
reanalysis_context = {
'trade_data': {
'symbol': trade_data.get('symbol'),
'entry_price': trade_data.get('entry_price'),
'entry_time': trade_data.get('entry_timestamp'),
'hold_minutes': hold_minutes,
'strategy': original_strategy
}
}
try:
market_context = await data_manager_global.get_market_context_async()
except Exception as error:
print(f"⚠️ Failed to get market context: {error}. Using basic market data...")
market_context = {'btc_sentiment': 'NEUTRAL'}
symbol_with_reasons = [{'symbol': symbol, 'reasons': ['re-analysis']}]
ohlcv_data_list = await data_manager_global.get_fast_pass_data_async(symbol_with_reasons)
if not ohlcv_data_list:
print(f"❌ Failed to fetch latest data for {symbol}.")
return None
raw_data = ohlcv_data_list[0]
# ✅ تحديث market_context قبل المعالجة
try:
updated_market_context = await data_manager_global.get_market_context_async()
if updated_market_context:
market_context = updated_market_context
except Exception as e:
print(f"⚠️ Failed to update market context for re-analysis: {e}")
feature_processor = FeatureProcessor(market_context, data_manager_global, learning_engine_global)
processed_data = await feature_processor.process_and_score_symbol(raw_data)
if not processed_data:
print(f"❌ Failed to process latest data for {symbol}.")
return None
await r2_service_global.save_candidates_data_async(
candidates_data=None,
reanalysis_data={'market_context': market_context, 'processed_data': processed_data}
)
print(f"🧠 Getting LLM re-analysis for {symbol}...")
try:
re_analysis_decision = await llm_service_global.re_analyze_trade_async(trade_data, processed_data)
source = re_analysis_decision.get('model_source', 'LLM')
except Exception as error:
print(f"❌ LLM re-analysis error: {error}. Falling back to local.")
re_analysis_decision = local_re_analyze_trade(trade_data, processed_data)
source = 'local_fallback'
final_decision = _apply_patience_logic(re_analysis_decision, hold_minutes, trade_data, processed_data)
# ✅ الإصلاح النهائي: التأكد من وجود الاستراتيجية في القرار النهائي
if not final_decision.get('strategy') or final_decision['strategy'] == 'unknown':
final_decision['strategy'] = original_strategy
print(f"🔧 Final re-analysis strategy fix for {symbol}: {original_strategy}")
print(f"✅ Re-analysis decision for {symbol}: {final_decision.get('action')}. Strategy: {final_decision.get('strategy')}. Source: {source}")
await r2_service_global.save_system_logs_async({
"trade_reanalyzed": True, "symbol": symbol, "action": final_decision.get('action'),
"hold_minutes": hold_minutes, "source": source, "strategy": final_decision.get('strategy')
})
return {
"symbol": symbol, "decision": final_decision,
"current_price": processed_data.get('current_price'), "hold_minutes": hold_minutes
}
except Exception as error:
print(f"❌ An error occurred during trade re-analysis: {error}")
traceback.print_exc()
await r2_service_global.save_system_logs_async({"reanalysis_error": True, "symbol": symbol, "error": str(error)})
return None
def _apply_patience_logic(decision, hold_minutes, trade_data, processed_data):
"""Apply patience logic to prevent premature selling decisions"""
action = decision.get('action')
if action == "CLOSE_TRADE" and hold_minutes < 20:
current_price = processed_data.get('current_price', 0)
entry_price = trade_data.get('entry_price', 0)
try:
profit_loss_percent = ((current_price - entry_price) / entry_price) * 100
except (TypeError, ZeroDivisionError):
profit_loss_percent = 0
if profit_loss_percent < 2:
print(f"🛑 Blocked premature selling! Only {hold_minutes:.1f} minutes held, PnL: {profit_loss_percent:.2f}%")
decision['action'] = "HOLD"
decision['reasoning'] = f"Patience Filter: Blocked premature sell. Held for {hold_minutes:.1f}m. Giving trade more time."
return decision
return decision
async def run_bot_cycle_async():
"""The main asynchronous bot cycle with enhanced strategy validation"""
print(f"\n{'='*70}")
print(f"⏳ New cycle initiated at: {datetime.now().isoformat()}")
print(f"{'='*70}")
try:
await r2_service_global.save_system_logs_async({"cycle_started": True})
if not r2_service_global.acquire_lock():
print("❌ Failed to acquire lock. Skipping cycle.")
return
open_trades = []
try:
open_trades = await r2_service_global.get_open_trades_async()
print(f"✅ Found {len(open_trades)} open trade(s).")
# ✅ الإصلاح المحسن: فحص وإصلاح الاستراتيجيات الفارغة في الصفقات المفتوحة
trades_fixed = 0
for trade in open_trades:
if not trade.get('strategy') or trade['strategy'] == 'unknown':
original_strategy = trade.get('decision_data', {}).get('strategy', 'GENERIC')
trade['strategy'] = original_strategy
trades_fixed += 1
print(f"🔧 Fixed missing strategy for {trade['symbol']}: {trade['strategy']}")
if trades_fixed > 0:
print(f"✅ Fixed strategies for {trades_fixed} trades.")
await r2_service_global.save_open_trades_async(open_trades)
should_look_for_new_trade = not open_trades
if open_trades:
now = datetime.now()
trades_to_reanalyze = [
trade for trade in open_trades
if now >= datetime.fromisoformat(trade.get('expected_target_time', now.isoformat()))
]
if trades_to_reanalyze:
print(f"🔍 Re-analyzing {len(trades_to_reanalyze)} trade(s)...")
for trade in trades_to_reanalyze:
result = await re_analyze_open_trade_async(trade)
if result and result['decision'].get('action') == "CLOSE_TRADE":
await r2_service_global.close_trade_async(trade, result['current_price'])
print(f"✅ Trade for {trade['symbol']} CLOSED. Strategy: {trade.get('strategy', 'unknown')}")
if learning_engine_global and learning_engine_global.initialized:
trade_with_strategy = trade.copy()
strategy = result['decision'].get('strategy', trade.get('strategy', 'GENERIC'))
trade_with_strategy['strategy'] = strategy
await learning_engine_global.analyze_trade_outcome(trade_with_strategy, 'CLOSED_BY_REANALYSIS')
should_look_for_new_trade = True
elif result and result['decision'].get('action') == "UPDATE_TRADE":
await r2_service_global.update_trade_async(trade, result['decision'])
print(f"✅ Trade for {trade['symbol']} UPDATED. Strategy: {trade.get('strategy', 'unknown')}")
else:
print(f"✅ Trade for {trade['symbol']} is on HOLD. Strategy: {trade.get('strategy', 'unknown')}")
else:
print("✅ No trades due for re-analysis yet.")
if should_look_for_new_trade:
portfolio_state = await r2_service_global.get_portfolio_state_async()
if portfolio_state.get("current_capital_usd", 0) > 1:
print(f"✅ Capital available (${portfolio_state['current_capital_usd']:.2f}). Scanning...")
new_opportunity = await find_new_opportunities_async()
if new_opportunity:
print(f"✅ Opportunity for {new_opportunity['symbol']} confirmed! Saving trade. Strategy: {new_opportunity.get('strategy')}")
# ✅ التحقق النهائي قبل الحفظ
if not new_opportunity['decision'].get('strategy'):
new_opportunity['decision']['strategy'] = new_opportunity.get('strategy', 'GENERIC')
print(f"🔧 Final pre-save strategy fix: {new_opportunity['decision']['strategy']}")
await r2_service_global.save_new_trade_async(
new_opportunity['symbol'],
new_opportunity['decision'],
new_opportunity['current_price']
)
newly_opened_trades = await r2_service_global.get_open_trades_async()
for trade in newly_opened_trades:
if trade['symbol'] == new_opportunity['symbol']:
asyncio.create_task(realtime_monitor._monitor_single_trade(trade))
break
else:
print("✅ Scan complete. No actionable opportunities identified.")
else:
print("😴 No available capital. Waiting for current trade to close.")
finally:
print("✅ Cycle finished. Releasing lock.")
r2_service_global.release_lock()
await r2_service_global.save_system_logs_async({"cycle_completed": True, "open_trades": len(open_trades)})
except Exception as error:
print(f"❌ Unhandled error in main cycle: {error}")
traceback.print_exc()
await r2_service_global.save_system_logs_async({"cycle_error": True, "error": str(error)})
if r2_service_global.lock_acquired:
r2_service_global.release_lock()
@asynccontextmanager
async def lifespan(application: FastAPI):
global r2_service_global, data_manager_global, llm_service_global, learning_engine_global, realtime_monitor
print("===== Application Startup =====")
try:
r2_service_global = R2Service()
llm_service_global = LLMService()
contracts_database = await r2_service_global.load_contracts_db_async()
data_manager_global = DataManager(contracts_database)
await data_manager_global.initialize()
# ✅ تهيئة نظام التعلم مع تمرير data_manager
learning_engine_global = LearningEngine(r2_service_global, data_manager_global)
await learning_engine_global.initialize_enhanced() # ✅ استخدام التهيئة المحسنة
# ✅ إجبار تحديث الاستراتيجيات من البيانات الحالية
await learning_engine_global.force_strategy_learning()
# ✅ التحقق من أن الأوزان يتم تحميلها
if learning_engine_global.initialized:
weights = await learning_engine_global.get_optimized_strategy_weights("bull_market")
print(f"🎯 الأوزان المحملة: {weights}")
realtime_monitor = RealTimeTradeMonitor()
asyncio.create_task(monitor_market_async())
asyncio.create_task(realtime_monitor.start_monitoring())
await r2_service_global.save_system_logs_async({"application_started": True})
print("\n✅ All services initialized. Application is ready.\n")
yield
except Exception as error:
print(f"❌ Application startup failed: {error}")
traceback.print_exc()
if r2_service_global:
await r2_service_global.save_system_logs_async({"application_startup_failed": True, "error": str(error)})
raise
finally:
await cleanup_on_shutdown()
application = FastAPI(lifespan=lifespan)
@application.get("/run-cycle")
async def run_cycle_api():
"""API endpoint to trigger the bot cycle."""
asyncio.create_task(run_bot_cycle_async())
return {"message": "Bot cycle initiated in the background."}
@application.get("/health")
async def health_check():
"""Detailed health check."""
learning_metrics = {}
if learning_engine_global and learning_engine_global.initialized:
learning_metrics = await learning_engine_global.calculate_performance_metrics()
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"services": {
"r2_service": "initialized" if r2_service_global else "uninitialized",
"llm_service": "initialized" if llm_service_global else "uninitialized",
"data_manager": "initialized" if data_manager_global else "uninitialized",
"learning_engine": "active" if learning_engine_global and learning_engine_global.initialized else "inactive",
"realtime_monitor": "running" if realtime_monitor and realtime_monitor.is_running else "stopped"
},
"market_state_ok": state.MARKET_STATE_OK,
"learning_engine": learning_metrics
}
@application.get("/stats")
async def get_performance_stats():
"""Get performance statistics for all services."""
try:
market_context = await data_manager_global.get_market_context_async() if data_manager_global else {}
learning_stats = {}
improvement_suggestions = []
if learning_engine_global and learning_engine_global.initialized:
learning_stats = await learning_engine_global.calculate_performance_metrics()
improvement_suggestions = await learning_engine_global.suggest_improvements()
stats = {
"timestamp": datetime.now().isoformat(),
"data_manager": data_manager_global.get_performance_stats() if data_manager_global else {},
"market_state": {
"is_healthy": state.MARKET_STATE_OK,
"description": "Market is healthy for trading" if state.MARKET_STATE_OK else "Market conditions are unfavorable",
"context": market_context
},
"realtime_monitoring": {
"active_trades": len(realtime_monitor.monitoring_tasks) if realtime_monitor else 0,
"is_running": realtime_monitor.is_running if realtime_monitor else False
},
"learning_engine": learning_stats,
"improvement_suggestions": improvement_suggestions
}
return stats
except Exception as error:
raise HTTPException(status_code=500, detail=f"Failed to retrieve stats: {str(error)}")
@application.get("/logs/status")
async def get_logs_status():
"""Get status of logging system."""
try:
open_trades = await r2_service_global.get_open_trades_async()
portfolio_state = await r2_service_global.get_portfolio_state_async()
return {
"logging_system": "active",
"open_trades_count": len(open_trades),
"current_capital": portfolio_state.get("current_capital_usd", 0),
"total_trades": portfolio_state.get("total_trades", 0),
"timestamp": datetime.now().isoformat()
}
except Exception as error:
raise HTTPException(status_code=500, detail=f"Failed to get logs status: {str(error)}")
async def cleanup_on_shutdown():
"""Cleanup function for graceful shutdown."""
global r2_service_global, data_manager_global, realtime_monitor, learning_engine_global
print("\n🛑 Shutdown signal received. Cleaning up...")
if r2_service_global:
try:
await r2_service_global.save_system_logs_async({"application_shutdown": True})
except Exception as log_error:
print(f"⚠️ Failed to save shutdown log: {log_error}")
if learning_engine_global and learning_engine_global.initialized:
try:
await learning_engine_global.save_weights_to_r2()
await learning_engine_global.save_performance_history()
print("✅ Learning engine data saved.")
except Exception as e:
print(f"⚠️ Failed to save learning engine data: {e}")
if realtime_monitor:
realtime_monitor.stop_monitoring()
if r2_service_global and r2_service_global.lock_acquired:
r2_service_global.release_lock()
print("✅ Lock released.")
if data_manager_global:
await data_manager_global.close()
print("✅ Cleanup completed.")
def signal_handler(signum, frame):
"""Handle shutdown signals."""
print(f"\n⚠️ Received signal {signum}")
asyncio.create_task(cleanup_on_shutdown())
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
if __name__ == "__main__":
uvicorn.run(application, host="0.0.0.0", port=7860)