Riy777 commited on
Commit
9c38b0c
·
verified ·
1 Parent(s): 2009274

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +288 -178
app.py CHANGED
@@ -8,119 +8,59 @@ from data_manager import DataManager
8
  from ML import MLProcessor as FeatureProcessor
9
  from learning_engine import LearningEngine
10
  from sentiment_news import SentimentAnalyzer
 
11
  import state
12
- from helpers import safe_float_conversion, _apply_patience_logic, local_analyze_opportunity, local_re_analyze_trade
13
 
14
  r2_service_global = None
15
  data_manager_global = None
16
  llm_service_global = None
17
  learning_engine_global = None
18
- realtime_monitor = None
19
  sentiment_analyzer_global = None
20
 
21
- class RealTimeTradeMonitor:
22
- def __init__(self):
23
- self.monitoring_tasks = {}
24
- self.is_running = False
25
-
26
- async def start_monitoring(self):
27
- self.is_running = True
28
- while self.is_running:
29
- try:
30
- open_trades = await r2_service_global.get_open_trades_async()
31
- for trade in open_trades:
32
- symbol = trade['symbol']
33
- if symbol not in self.monitoring_tasks:
34
- asyncio.create_task(self._monitor_single_trade(trade))
35
- self.monitoring_tasks[symbol] = trade
36
- current_symbols = {trade['symbol'] for trade in open_trades}
37
- for symbol in list(self.monitoring_tasks.keys()):
38
- if symbol not in current_symbols: del self.monitoring_tasks[symbol]
39
- await asyncio.sleep(10)
40
- except Exception as error:
41
- print(f"Real-time monitor error: {error}")
42
- await asyncio.sleep(30)
43
-
44
- async def _monitor_single_trade(self, trade):
45
- symbol = trade['symbol']
46
- while symbol in self.monitoring_tasks and self.is_running:
47
- try:
48
- current_price = await data_manager_global.get_latest_price_async(symbol)
49
- if not current_price: await asyncio.sleep(15); continue
50
- entry_price = trade['entry_price']
51
- stop_loss = trade.get('stop_loss')
52
- take_profit = trade.get('take_profit')
53
- should_close, close_reason = False, ""
54
- if stop_loss and current_price <= stop_loss: should_close, close_reason = True, f"Stop loss hit: {current_price} <= {stop_loss}"
55
- elif take_profit and current_price >= take_profit: should_close, close_reason = True, f"Take profit hit: {current_price} >= {take_profit}"
56
- if not should_close and current_price > entry_price:
57
- dynamic_stop = current_price * 0.98
58
- if dynamic_stop > (stop_loss or 0): trade['stop_loss'] = dynamic_stop
59
- if should_close:
60
- if r2_service_global.acquire_lock():
61
- try:
62
- await r2_service_global.close_trade_async(trade, current_price)
63
- if learning_engine_global and learning_engine_global.initialized:
64
- await learning_engine_global.analyze_trade_outcome(trade, 'CLOSED_BY_MONITOR')
65
- asyncio.create_task(run_bot_cycle_async())
66
- finally: r2_service_global.release_lock()
67
- if symbol in self.monitoring_tasks: del self.monitoring_tasks[symbol]
68
- break
69
- await asyncio.sleep(15)
70
- except Exception as error:
71
- print(f"Real-time monitoring error for {symbol}: {error}")
72
- await asyncio.sleep(30)
73
-
74
- def stop_monitoring(self):
75
- self.is_running = False
76
- self.monitoring_tasks.clear()
77
-
78
  async def monitor_market_async():
79
  global data_manager_global, sentiment_analyzer_global
80
  init_attempts = 0
81
- while data_manager_global is None and init_attempts < 10: await asyncio.sleep(3); init_attempts += 1
82
- if data_manager_global is None: return
 
 
 
 
83
  while True:
84
  try:
85
  market_context = await sentiment_analyzer_global.get_market_sentiment()
86
- if not market_context: state.MARKET_STATE_OK = True; await asyncio.sleep(60); continue
 
 
 
 
87
  whale_analysis = market_context.get('general_whale_activity', {})
88
  is_critical = whale_analysis.get('critical_alert', False)
89
  bitcoin_sentiment = market_context.get('btc_sentiment')
90
  fear_greed_index = market_context.get('fear_and_greed_index')
 
91
  should_halt_trading, halt_reason = False, ""
92
- if is_critical: should_halt_trading, halt_reason = True, f"CRITICAL whale activity detected"
93
- elif bitcoin_sentiment == 'BEARISH' and (fear_greed_index is not None and fear_greed_index < 30): should_halt_trading, halt_reason = True, f"Bearish market conditions"
 
 
 
94
  if should_halt_trading:
95
  state.MARKET_STATE_OK = False
96
  await r2_service_global.save_system_logs_async({"market_halt": True, "reason": halt_reason})
97
  else:
98
- if not state.MARKET_STATE_OK: print("Market conditions improved. Resuming normal operations.")
 
99
  state.MARKET_STATE_OK = True
 
100
  await asyncio.sleep(60)
101
  except Exception as error:
102
- print(f"Error during market monitoring: {error}")
103
  state.MARKET_STATE_OK = True
104
  await asyncio.sleep(60)
105
 
106
- async def validate_candidate_data_enhanced(candidate):
107
- try:
108
- required_fields = ['symbol', 'current_price', 'final_score', 'enhanced_final_score']
109
- for field in required_fields:
110
- if field not in candidate: candidate[field] = 0.0 if field.endswith('_score') or field == 'current_price' else 'UNKNOWN'
111
- candidate['current_price'] = safe_float_conversion(candidate.get('current_price'), 0.0)
112
- candidate['final_score'] = safe_float_conversion(candidate.get('final_score'), 0.5)
113
- candidate['enhanced_final_score'] = safe_float_conversion(candidate.get('enhanced_final_score'), candidate['final_score'])
114
- if 'reasons_for_candidacy' not in candidate: candidate['reasons_for_candidacy'] = ['unknown_reason']
115
- if 'sentiment_data' not in candidate: candidate['sentiment_data'] = {'btc_sentiment': 'NEUTRAL','fear_and_greed_index': 50,'general_whale_activity': {'sentiment': 'NEUTRAL', 'critical_alert': False}}
116
- if 'advanced_indicators' not in candidate: candidate['advanced_indicators'] = {}
117
- if 'strategy_scores' not in candidate: candidate['strategy_scores'] = {}
118
- if 'target_strategy' not in candidate: candidate['target_strategy'] = 'GENERIC'
119
- return True
120
- except Exception as error:
121
- print(f"Failed to validate candidate data for {candidate.get('symbol')}: {error}")
122
- return False
123
-
124
  async def analyze_market_strategy(market_context):
125
  try:
126
  whale_analysis = market_context.get('general_whale_activity', {})
@@ -133,33 +73,57 @@ async def analyze_market_strategy(market_context):
133
  strategy_data = json.loads(json_str)
134
  except:
135
  net_flow = netflow_analysis.get('net_flow', 0)
136
- if net_flow > 1000000: fallback_strategy = "AGGRESSIVE_GROWTH"
137
- elif net_flow < -1000000: fallback_strategy = "CONSERVATIVE"
138
- elif whale_analysis.get('critical_alert'): fallback_strategy = "CONSERVATIVE"
139
- else: fallback_strategy = "GENERIC"
140
- strategy_data = {"primary_strategy": fallback_strategy,"reasoning": "Fallback strategy","risk_tolerance": 5,"optimal_scan_count": 100}
 
 
 
 
 
 
 
 
 
141
  return strategy_data
142
  except Exception as error:
143
- print(f"Failed to analyze market strategy: {error}")
144
- return {"primary_strategy": "GENERIC","reasoning": "Fallback due to analysis error","risk_tolerance": 5,"optimal_scan_count": 100}
 
 
 
 
 
145
 
146
  async def find_strategy_specific_candidates(strategy, scan_count):
147
  try:
148
  all_candidates = await data_manager_global.find_high_potential_candidates(scan_count * 2)
149
- if not all_candidates: return []
 
 
150
  market_context = await data_manager_global.get_market_context_async()
151
- if not market_context: return []
 
 
152
  feature_processor = FeatureProcessor(market_context, data_manager_global, learning_engine_global)
153
  processed_candidates = []
 
154
  for candidate in all_candidates[:30]:
155
  try:
156
  symbol_with_reasons = [{'symbol': candidate['symbol'], 'reasons': candidate.get('reasons', [])}]
157
  ohlcv_data = await data_manager_global.get_fast_pass_data_async(symbol_with_reasons)
158
  if ohlcv_data and ohlcv_data[0]:
159
  processed = await feature_processor.process_and_score_symbol_enhanced(ohlcv_data[0])
160
- if processed: processed_candidates.append(processed)
161
- except Exception as e: print(f"Failed to process {candidate.get('symbol')}: {e}")
162
- if not processed_candidates: return []
 
 
 
 
 
163
  if strategy != 'GENERIC':
164
  strategy_candidates = []
165
  for candidate in processed_candidates:
@@ -173,6 +137,7 @@ async def find_strategy_specific_candidates(strategy, scan_count):
173
  else:
174
  sorted_candidates = sorted(processed_candidates, key=lambda x: x.get('enhanced_final_score', 0), reverse=True)
175
  top_candidates = sorted_candidates[:15]
 
176
  return top_candidates
177
  except Exception as error:
178
  print(f"Advanced filtering failed: {error}")
@@ -182,50 +147,104 @@ async def find_new_opportunities_async():
182
  try:
183
  await r2_service_global.save_system_logs_async({"opportunity_scan_started": True})
184
  market_context = await data_manager_global.get_market_context_async()
185
- if not market_context: return
 
 
186
  strategy_decision = await analyze_market_strategy(market_context)
187
- high_potential_candidates = await find_strategy_specific_candidates(strategy_decision['primary_strategy'], strategy_decision.get('optimal_scan_count', 100))
 
 
 
 
188
  if not high_potential_candidates:
189
  high_potential_candidates = await data_manager_global.find_high_potential_candidates(20)
190
  if high_potential_candidates:
191
- for candidate in high_potential_candidates: candidate['target_strategy'] = 'GENERIC'
192
- else: return
 
 
 
193
  all_processed_candidates = []
194
  CHUNK_SIZE = 5
 
195
  for index in range(0, len(high_potential_candidates), CHUNK_SIZE):
196
  chunk = high_potential_candidates[index:index+CHUNK_SIZE]
197
  chunk_data = await data_manager_global.get_fast_pass_data_async(chunk)
198
  updated_market_context = await data_manager_global.get_market_context_async()
199
- if not updated_market_context: updated_market_context = market_context
 
 
200
  feature_processor = FeatureProcessor(updated_market_context, data_manager_global, learning_engine_global)
201
- processed_chunk = await asyncio.gather(*[feature_processor.process_and_score_symbol_enhanced(data) for data in chunk_data])
 
 
202
  all_processed_candidates.extend([c for c in processed_chunk if c is not None])
203
  await asyncio.sleep(1)
204
- if not all_processed_candidates: return
 
 
 
205
  updated_market_context = await data_manager_global.get_market_context_async()
206
- if not updated_market_context: updated_market_context = market_context
 
 
207
  feature_processor = FeatureProcessor(updated_market_context, data_manager_global, learning_engine_global)
208
  OPPORTUNITY_COUNT = 10
209
  top_candidates = feature_processor.filter_top_candidates(all_processed_candidates, OPPORTUNITY_COUNT)
210
- await r2_service_global.save_candidates_data_async(candidates_data=top_candidates, reanalysis_data={"strategy_used": strategy_decision, "market_conditions": market_context})
211
- if not top_candidates: return
 
 
 
 
 
 
 
 
 
 
212
  for candidate in top_candidates:
213
  try:
214
- if not await validate_candidate_data_enhanced(candidate): continue
 
 
215
  llm_analysis_data = await llm_service_global.get_trading_decision(candidate)
216
- if not llm_analysis_data: continue
217
- if llm_analysis_data.get('action') == "HOLD": continue
 
 
 
 
218
  if llm_analysis_data.get('action') in ["BUY", "SELL"]:
219
  final_strategy = llm_analysis_data.get('strategy')
220
  candidate_strategy = candidate.get('target_strategy', 'GENERIC')
221
- if not final_strategy or final_strategy == 'unknown': final_strategy = candidate_strategy; llm_analysis_data['strategy'] = final_strategy
222
- await r2_service_global.save_system_logs_async({"new_opportunity_found": True, "symbol": candidate['symbol'],"action": llm_analysis_data.get('action'), "strategy": final_strategy})
223
- return {"symbol": candidate['symbol'],"decision": llm_analysis_data,"current_price": candidate['current_price'],"strategy": final_strategy}
224
- except Exception as error: print(f"LLM error for {candidate.get('symbol', 'unknown')}: {error}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
225
  return None
226
  except Exception as error:
227
  print(f"Error while scanning for opportunities: {error}")
228
- await r2_service_global.save_system_logs_async({"opportunity_scan_error": True, "error": str(error)})
 
 
 
229
  return None
230
 
231
  async def re_analyze_open_trade_async(trade_data):
@@ -234,63 +253,109 @@ async def re_analyze_open_trade_async(trade_data):
234
  entry_time = datetime.fromisoformat(trade_data['entry_timestamp'])
235
  current_time = datetime.now()
236
  hold_minutes = (current_time - entry_time).total_seconds() / 60
 
237
  original_strategy = trade_data.get('strategy')
238
- if not original_strategy or original_strategy == 'unknown': original_strategy = trade_data.get('decision_data', {}).get('strategy', 'GENERIC')
239
- try: market_context = await data_manager_global.get_market_context_async()
240
- except Exception: market_context = {'btc_sentiment': 'NEUTRAL'}
 
 
 
 
 
241
  symbol_with_reasons = [{'symbol': symbol, 'reasons': ['re-analysis']}]
242
  ohlcv_data_list = await data_manager_global.get_fast_pass_data_async(symbol_with_reasons)
243
- if not ohlcv_data_list: return None
 
 
244
  raw_data = ohlcv_data_list[0]
245
  try:
246
  updated_market_context = await data_manager_global.get_market_context_async()
247
- if updated_market_context: market_context = updated_market_context
248
- except Exception: pass
 
 
 
249
  feature_processor = FeatureProcessor(market_context, data_manager_global, learning_engine_global)
250
  processed_data = await feature_processor.process_and_score_symbol(raw_data)
251
- if not processed_data: return None
252
- await r2_service_global.save_candidates_data_async(candidates_data=None, reanalysis_data={'market_context': market_context, 'processed_data': processed_data})
253
- try: re_analysis_decision = await llm_service_global.re_analyze_trade_async(trade_data, processed_data)
254
- except Exception: re_analysis_decision = local_re_analyze_trade(trade_data, processed_data)
 
 
 
 
 
 
 
 
 
 
 
 
255
  final_decision = _apply_patience_logic(re_analysis_decision, hold_minutes, trade_data, processed_data)
256
- if not final_decision.get('strategy'): final_decision['strategy'] = original_strategy
257
- await r2_service_global.save_system_logs_async({"trade_reanalyzed": True, "symbol": symbol, "action": final_decision.get('action'),"hold_minutes": hold_minutes, "strategy": final_decision.get('strategy')})
258
- return {"symbol": symbol, "decision": final_decision,"current_price": processed_data.get('current_price'), "hold_minutes": hold_minutes}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
259
  except Exception as error:
260
  print(f"Error during trade re-analysis: {error}")
261
- await r2_service_global.save_system_logs_async({"reanalysis_error": True, "symbol": symbol, "error": str(error)})
 
 
 
 
262
  return None
263
 
264
  async def run_bot_cycle_async():
265
  try:
266
  await r2_service_global.save_system_logs_async({"cycle_started": True})
267
- if not r2_service_global.acquire_lock(): return
 
 
268
  open_trades = []
269
  try:
270
- open_trades = await r2_service_global.get_open_trades_async()
271
  trades_fixed = 0
272
  for trade in open_trades:
273
  if not trade.get('strategy') or trade['strategy'] == 'unknown':
274
  original_strategy = trade.get('decision_data', {}).get('strategy', 'GENERIC')
275
  trade['strategy'] = original_strategy
276
  trades_fixed += 1
277
- if trades_fixed > 0: await r2_service_global.save_open_trades_async(open_trades)
 
 
 
278
  should_look_for_new_trade = not open_trades
279
  if open_trades:
280
  now = datetime.now()
281
- trades_to_reanalyze = [trade for trade in open_trades if now >= datetime.fromisoformat(trade.get('expected_target_time', now.isoformat()))]
 
 
 
282
  if trades_to_reanalyze:
283
  for trade in trades_to_reanalyze:
284
  result = await re_analyze_open_trade_async(trade)
285
  if result and result['decision'].get('action') == "CLOSE_TRADE":
286
- await r2_service_global.close_trade_async(trade, result['current_price'])
287
- if learning_engine_global and learning_engine_global.initialized:
288
- trade_with_strategy = trade.copy()
289
- strategy = result['decision'].get('strategy', trade.get('strategy', 'GENERIC'))
290
- trade_with_strategy['strategy'] = strategy
291
- await learning_engine_global.analyze_trade_outcome(trade_with_strategy, 'CLOSED_BY_REANALYSIS')
292
  should_look_for_new_trade = True
293
- elif result and result['decision'].get('action') == "UPDATE_TRADE": await r2_service_global.update_trade_async(trade, result['decision'])
 
 
294
  if should_look_for_new_trade:
295
  portfolio_state = await r2_service_global.get_portfolio_state_async()
296
  current_capital = portfolio_state.get("current_capital_usd", 0)
@@ -302,27 +367,36 @@ async def run_bot_cycle_async():
302
  portfolio_state["invested_capital_usd"] = 0.0
303
  await r2_service_global.save_portfolio_state_async(portfolio_state)
304
  current_capital = initial_capital
 
305
  if current_capital > 1:
306
  new_opportunity = await find_new_opportunities_async()
307
  if new_opportunity:
308
- if not new_opportunity['decision'].get('strategy'): new_opportunity['decision']['strategy'] = new_opportunity.get('strategy', 'GENERIC')
309
- await r2_service_global.save_new_trade_async(new_opportunity['symbol'], new_opportunity['decision'], new_opportunity['current_price'])
310
- newly_opened_trades = await r2_service_global.get_open_trades_async()
311
- for trade in newly_opened_trades:
312
- if trade['symbol'] == new_opportunity['symbol']:
313
- asyncio.create_task(realtime_monitor._monitor_single_trade(trade))
314
- break
 
315
  finally:
316
  r2_service_global.release_lock()
317
- await r2_service_global.save_system_logs_async({"cycle_completed": True, "open_trades": len(open_trades)})
 
 
 
318
  except Exception as error:
319
  print(f"Unhandled error in main cycle: {error}")
320
- await r2_service_global.save_system_logs_async({"cycle_error": True, "error": str(error)})
321
- if r2_service_global.lock_acquired: r2_service_global.release_lock()
 
 
 
 
322
 
323
  @asynccontextmanager
324
  async def lifespan(application: FastAPI):
325
- global r2_service_global, data_manager_global, llm_service_global, learning_engine_global, realtime_monitor, sentiment_analyzer_global
326
  try:
327
  r2_service_global = R2Service()
328
  llm_service_global = LLMService()
@@ -333,16 +407,21 @@ async def lifespan(application: FastAPI):
333
  learning_engine_global = LearningEngine(r2_service_global, data_manager_global)
334
  await learning_engine_global.initialize_enhanced()
335
  await learning_engine_global.force_strategy_learning()
336
- realtime_monitor = RealTimeTradeMonitor()
337
  asyncio.create_task(monitor_market_async())
338
- asyncio.create_task(realtime_monitor.start_monitoring())
339
  await r2_service_global.save_system_logs_async({"application_started": True})
340
  yield
341
  except Exception as error:
342
  print(f"Application startup failed: {error}")
343
- if r2_service_global: await r2_service_global.save_system_logs_async({"application_startup_failed": True, "error": str(error)})
 
 
 
 
344
  raise
345
- finally: await cleanup_on_shutdown()
 
346
 
347
  application = FastAPI(lifespan=lifespan)
348
 
@@ -354,17 +433,25 @@ async def run_cycle_api():
354
  @application.get("/health")
355
  async def health_check():
356
  learning_metrics = {}
357
- if learning_engine_global and learning_engine_global.initialized: learning_metrics = await learning_engine_global.calculate_performance_metrics()
 
 
358
  api_stats = {}
359
- if data_manager_global: api_stats = data_manager_global.get_performance_stats()
 
 
360
  return {
361
- "status": "healthy", "timestamp": datetime.now().isoformat(), "services": {
 
 
362
  "r2_service": "initialized" if r2_service_global else "uninitialized",
363
  "llm_service": "initialized" if llm_service_global else "uninitialized",
364
  "data_manager": "initialized" if data_manager_global else "uninitialized",
365
  "learning_engine": "active" if learning_engine_global and learning_engine_global.initialized else "inactive",
366
- "realtime_monitor": "running" if realtime_monitor and realtime_monitor.is_running else "stopped"
367
- }, "market_state_ok": state.MARKET_STATE_OK, "learning_engine": learning_metrics
 
 
368
  }
369
 
370
  @application.get("/stats")
@@ -372,19 +459,29 @@ async def get_performance_stats():
372
  try:
373
  market_context = await data_manager_global.get_market_context_async() if data_manager_global else {}
374
  learning_stats = {}
375
- if learning_engine_global and learning_engine_global.initialized: learning_stats = await learning_engine_global.calculate_performance_metrics()
 
 
376
  api_stats = {}
377
- if data_manager_global: api_stats = data_manager_global.get_performance_stats()
 
 
378
  stats = {
379
- "timestamp": datetime.now().isoformat(), "data_manager": api_stats, "market_state": {
380
- "is_healthy": state.MARKET_STATE_OK, "context": market_context
381
- }, "realtime_monitoring": {
382
- "active_trades": len(realtime_monitor.monitoring_tasks) if realtime_monitor else 0,
383
- "is_running": realtime_monitor.is_running if realtime_monitor else False
384
- }, "learning_engine": learning_stats
 
 
 
 
 
385
  }
386
  return stats
387
- except Exception as error: raise HTTPException(status_code=500, detail=f"Failed to retrieve stats: {str(error)}")
 
388
 
389
  @application.get("/logs/status")
390
  async def get_logs_status():
@@ -392,27 +489,39 @@ async def get_logs_status():
392
  open_trades = await r2_service_global.get_open_trades_async()
393
  portfolio_state = await r2_service_global.get_portfolio_state_async()
394
  return {
395
- "logging_system": "active", "open_trades_count": len(open_trades),
 
396
  "current_capital": portfolio_state.get("current_capital_usd", 0),
397
  "total_trades": portfolio_state.get("total_trades", 0),
398
  "timestamp": datetime.now().isoformat()
399
  }
400
- except Exception as error: raise HTTPException(status_code=500, detail=f"Failed to get logs status: {str(error)}")
 
401
 
402
  async def cleanup_on_shutdown():
403
- global r2_service_global, data_manager_global, realtime_monitor, learning_engine_global
404
  print("Shutdown signal received. Cleaning up...")
405
  if r2_service_global:
406
- try: await r2_service_global.save_system_logs_async({"application_shutdown": True})
407
- except Exception: pass
 
 
 
408
  if learning_engine_global and learning_engine_global.initialized:
409
  try:
410
  await learning_engine_global.save_weights_to_r2()
411
  await learning_engine_global.save_performance_history()
412
- except Exception: pass
413
- if realtime_monitor: realtime_monitor.stop_monitoring()
414
- if r2_service_global and r2_service_global.lock_acquired: r2_service_global.release_lock()
415
- if data_manager_global: await data_manager_global.close()
 
 
 
 
 
 
 
416
 
417
  def signal_handler(signum, frame):
418
  asyncio.create_task(cleanup_on_shutdown())
@@ -421,4 +530,5 @@ def signal_handler(signum, frame):
421
  signal.signal(signal.SIGINT, signal_handler)
422
  signal.signal(signal.SIGTERM, signal_handler)
423
 
424
- if __name__ == "__main__": uvicorn.run(application, host="0.0.0.0", port=7860)
 
 
8
  from ML import MLProcessor as FeatureProcessor
9
  from learning_engine import LearningEngine
10
  from sentiment_news import SentimentAnalyzer
11
+ from trade_manager import TradeManager
12
  import state
13
+ from helpers import safe_float_conversion, _apply_patience_logic, local_analyze_opportunity, local_re_analyze_trade, validate_candidate_data_enhanced
14
 
15
  r2_service_global = None
16
  data_manager_global = None
17
  llm_service_global = None
18
  learning_engine_global = None
19
+ trade_manager_global = None
20
  sentiment_analyzer_global = None
21
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
22
  async def monitor_market_async():
23
  global data_manager_global, sentiment_analyzer_global
24
  init_attempts = 0
25
+ while data_manager_global is None and init_attempts < 10:
26
+ await asyncio.sleep(3)
27
+ init_attempts += 1
28
+ if data_manager_global is None:
29
+ return
30
+
31
  while True:
32
  try:
33
  market_context = await sentiment_analyzer_global.get_market_sentiment()
34
+ if not market_context:
35
+ state.MARKET_STATE_OK = True
36
+ await asyncio.sleep(60)
37
+ continue
38
+
39
  whale_analysis = market_context.get('general_whale_activity', {})
40
  is_critical = whale_analysis.get('critical_alert', False)
41
  bitcoin_sentiment = market_context.get('btc_sentiment')
42
  fear_greed_index = market_context.get('fear_and_greed_index')
43
+
44
  should_halt_trading, halt_reason = False, ""
45
+ if is_critical:
46
+ should_halt_trading, halt_reason = True, "نشاط حيتان حرج"
47
+ elif bitcoin_sentiment == 'BEARISH' and (fear_greed_index is not None and fear_greed_index < 30):
48
+ should_halt_trading, halt_reason = True, "ظروف سوق هابطة"
49
+
50
  if should_halt_trading:
51
  state.MARKET_STATE_OK = False
52
  await r2_service_global.save_system_logs_async({"market_halt": True, "reason": halt_reason})
53
  else:
54
+ if not state.MARKET_STATE_OK:
55
+ print("تحسنت ظروف السوق. استئناف العمليات العادية.")
56
  state.MARKET_STATE_OK = True
57
+
58
  await asyncio.sleep(60)
59
  except Exception as error:
60
+ print(f"خطأ أثناء مراقبة السوق: {error}")
61
  state.MARKET_STATE_OK = True
62
  await asyncio.sleep(60)
63
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
64
  async def analyze_market_strategy(market_context):
65
  try:
66
  whale_analysis = market_context.get('general_whale_activity', {})
 
73
  strategy_data = json.loads(json_str)
74
  except:
75
  net_flow = netflow_analysis.get('net_flow', 0)
76
+ if net_flow > 1000000:
77
+ fallback_strategy = "AGGRESSIVE_GROWTH"
78
+ elif net_flow < -1000000:
79
+ fallback_strategy = "CONSERVATIVE"
80
+ elif whale_analysis.get('critical_alert'):
81
+ fallback_strategy = "CONSERVATIVE"
82
+ else:
83
+ fallback_strategy = "GENERIC"
84
+ strategy_data = {
85
+ "primary_strategy": fallback_strategy,
86
+ "reasoning": "Fallback strategy",
87
+ "risk_tolerance": 5,
88
+ "optimal_scan_count": 100
89
+ }
90
  return strategy_data
91
  except Exception as error:
92
+ print(f"فشل تحليل استراتيجية السوق: {error}")
93
+ return {
94
+ "primary_strategy": "GENERIC",
95
+ "reasoning": "Fallback due to analysis error",
96
+ "risk_tolerance": 5,
97
+ "optimal_scan_count": 100
98
+ }
99
 
100
  async def find_strategy_specific_candidates(strategy, scan_count):
101
  try:
102
  all_candidates = await data_manager_global.find_high_potential_candidates(scan_count * 2)
103
+ if not all_candidates:
104
+ return []
105
+
106
  market_context = await data_manager_global.get_market_context_async()
107
+ if not market_context:
108
+ return []
109
+
110
  feature_processor = FeatureProcessor(market_context, data_manager_global, learning_engine_global)
111
  processed_candidates = []
112
+
113
  for candidate in all_candidates[:30]:
114
  try:
115
  symbol_with_reasons = [{'symbol': candidate['symbol'], 'reasons': candidate.get('reasons', [])}]
116
  ohlcv_data = await data_manager_global.get_fast_pass_data_async(symbol_with_reasons)
117
  if ohlcv_data and ohlcv_data[0]:
118
  processed = await feature_processor.process_and_score_symbol_enhanced(ohlcv_data[0])
119
+ if processed:
120
+ processed_candidates.append(processed)
121
+ except Exception as e:
122
+ print(f"Failed to process {candidate.get('symbol')}: {e}")
123
+
124
+ if not processed_candidates:
125
+ return []
126
+
127
  if strategy != 'GENERIC':
128
  strategy_candidates = []
129
  for candidate in processed_candidates:
 
137
  else:
138
  sorted_candidates = sorted(processed_candidates, key=lambda x: x.get('enhanced_final_score', 0), reverse=True)
139
  top_candidates = sorted_candidates[:15]
140
+
141
  return top_candidates
142
  except Exception as error:
143
  print(f"Advanced filtering failed: {error}")
 
147
  try:
148
  await r2_service_global.save_system_logs_async({"opportunity_scan_started": True})
149
  market_context = await data_manager_global.get_market_context_async()
150
+ if not market_context:
151
+ return
152
+
153
  strategy_decision = await analyze_market_strategy(market_context)
154
+ high_potential_candidates = await find_strategy_specific_candidates(
155
+ strategy_decision['primary_strategy'],
156
+ strategy_decision.get('optimal_scan_count', 100)
157
+ )
158
+
159
  if not high_potential_candidates:
160
  high_potential_candidates = await data_manager_global.find_high_potential_candidates(20)
161
  if high_potential_candidates:
162
+ for candidate in high_potential_candidates:
163
+ candidate['target_strategy'] = 'GENERIC'
164
+ else:
165
+ return
166
+
167
  all_processed_candidates = []
168
  CHUNK_SIZE = 5
169
+
170
  for index in range(0, len(high_potential_candidates), CHUNK_SIZE):
171
  chunk = high_potential_candidates[index:index+CHUNK_SIZE]
172
  chunk_data = await data_manager_global.get_fast_pass_data_async(chunk)
173
  updated_market_context = await data_manager_global.get_market_context_async()
174
+ if not updated_market_context:
175
+ updated_market_context = market_context
176
+
177
  feature_processor = FeatureProcessor(updated_market_context, data_manager_global, learning_engine_global)
178
+ processed_chunk = await asyncio.gather(*[
179
+ feature_processor.process_and_score_symbol_enhanced(data) for data in chunk_data
180
+ ])
181
  all_processed_candidates.extend([c for c in processed_chunk if c is not None])
182
  await asyncio.sleep(1)
183
+
184
+ if not all_processed_candidates:
185
+ return
186
+
187
  updated_market_context = await data_manager_global.get_market_context_async()
188
+ if not updated_market_context:
189
+ updated_market_context = market_context
190
+
191
  feature_processor = FeatureProcessor(updated_market_context, data_manager_global, learning_engine_global)
192
  OPPORTUNITY_COUNT = 10
193
  top_candidates = feature_processor.filter_top_candidates(all_processed_candidates, OPPORTUNITY_COUNT)
194
+
195
+ await r2_service_global.save_candidates_data_async(
196
+ candidates_data=top_candidates,
197
+ reanalysis_data={
198
+ "strategy_used": strategy_decision,
199
+ "market_conditions": market_context
200
+ }
201
+ )
202
+
203
+ if not top_candidates:
204
+ return
205
+
206
  for candidate in top_candidates:
207
  try:
208
+ if not await validate_candidate_data_enhanced(candidate):
209
+ continue
210
+
211
  llm_analysis_data = await llm_service_global.get_trading_decision(candidate)
212
+ if not llm_analysis_data:
213
+ continue
214
+
215
+ if llm_analysis_data.get('action') == "HOLD":
216
+ continue
217
+
218
  if llm_analysis_data.get('action') in ["BUY", "SELL"]:
219
  final_strategy = llm_analysis_data.get('strategy')
220
  candidate_strategy = candidate.get('target_strategy', 'GENERIC')
221
+ if not final_strategy or final_strategy == 'unknown':
222
+ final_strategy = candidate_strategy
223
+ llm_analysis_data['strategy'] = final_strategy
224
+
225
+ await r2_service_global.save_system_logs_async({
226
+ "new_opportunity_found": True,
227
+ "symbol": candidate['symbol'],
228
+ "action": llm_analysis_data.get('action'),
229
+ "strategy": final_strategy
230
+ })
231
+
232
+ return {
233
+ "symbol": candidate['symbol'],
234
+ "decision": llm_analysis_data,
235
+ "current_price": candidate['current_price'],
236
+ "strategy": final_strategy
237
+ }
238
+ except Exception as error:
239
+ print(f"LLM error for {candidate.get('symbol', 'unknown')}: {error}")
240
+
241
  return None
242
  except Exception as error:
243
  print(f"Error while scanning for opportunities: {error}")
244
+ await r2_service_global.save_system_logs_async({
245
+ "opportunity_scan_error": True,
246
+ "error": str(error)
247
+ })
248
  return None
249
 
250
  async def re_analyze_open_trade_async(trade_data):
 
253
  entry_time = datetime.fromisoformat(trade_data['entry_timestamp'])
254
  current_time = datetime.now()
255
  hold_minutes = (current_time - entry_time).total_seconds() / 60
256
+
257
  original_strategy = trade_data.get('strategy')
258
+ if not original_strategy or original_strategy == 'unknown':
259
+ original_strategy = trade_data.get('decision_data', {}).get('strategy', 'GENERIC')
260
+
261
+ try:
262
+ market_context = await data_manager_global.get_market_context_async()
263
+ except Exception:
264
+ market_context = {'btc_sentiment': 'NEUTRAL'}
265
+
266
  symbol_with_reasons = [{'symbol': symbol, 'reasons': ['re-analysis']}]
267
  ohlcv_data_list = await data_manager_global.get_fast_pass_data_async(symbol_with_reasons)
268
+ if not ohlcv_data_list:
269
+ return None
270
+
271
  raw_data = ohlcv_data_list[0]
272
  try:
273
  updated_market_context = await data_manager_global.get_market_context_async()
274
+ if updated_market_context:
275
+ market_context = updated_market_context
276
+ except Exception:
277
+ pass
278
+
279
  feature_processor = FeatureProcessor(market_context, data_manager_global, learning_engine_global)
280
  processed_data = await feature_processor.process_and_score_symbol(raw_data)
281
+ if not processed_data:
282
+ return None
283
+
284
+ await r2_service_global.save_candidates_data_async(
285
+ candidates_data=None,
286
+ reanalysis_data={
287
+ 'market_context': market_context,
288
+ 'processed_data': processed_data
289
+ }
290
+ )
291
+
292
+ try:
293
+ re_analysis_decision = await llm_service_global.re_analyze_trade_async(trade_data, processed_data)
294
+ except Exception:
295
+ re_analysis_decision = local_re_analyze_trade(trade_data, processed_data)
296
+
297
  final_decision = _apply_patience_logic(re_analysis_decision, hold_minutes, trade_data, processed_data)
298
+ if not final_decision.get('strategy'):
299
+ final_decision['strategy'] = original_strategy
300
+
301
+ await r2_service_global.save_system_logs_async({
302
+ "trade_reanalyzed": True,
303
+ "symbol": symbol,
304
+ "action": final_decision.get('action'),
305
+ "hold_minutes": hold_minutes,
306
+ "strategy": final_decision.get('strategy')
307
+ })
308
+
309
+ return {
310
+ "symbol": symbol,
311
+ "decision": final_decision,
312
+ "current_price": processed_data.get('current_price'),
313
+ "hold_minutes": hold_minutes
314
+ }
315
  except Exception as error:
316
  print(f"Error during trade re-analysis: {error}")
317
+ await r2_service_global.save_system_logs_async({
318
+ "reanalysis_error": True,
319
+ "symbol": symbol,
320
+ "error": str(error)
321
+ })
322
  return None
323
 
324
  async def run_bot_cycle_async():
325
  try:
326
  await r2_service_global.save_system_logs_async({"cycle_started": True})
327
+ if not r2_service_global.acquire_lock():
328
+ return
329
+
330
  open_trades = []
331
  try:
332
+ open_trades = await trade_manager_global.get_open_trades()
333
  trades_fixed = 0
334
  for trade in open_trades:
335
  if not trade.get('strategy') or trade['strategy'] == 'unknown':
336
  original_strategy = trade.get('decision_data', {}).get('strategy', 'GENERIC')
337
  trade['strategy'] = original_strategy
338
  trades_fixed += 1
339
+
340
+ if trades_fixed > 0:
341
+ await r2_service_global.save_open_trades_async(open_trades)
342
+
343
  should_look_for_new_trade = not open_trades
344
  if open_trades:
345
  now = datetime.now()
346
+ trades_to_reanalyze = [
347
+ trade for trade in open_trades
348
+ if now >= datetime.fromisoformat(trade.get('expected_target_time', now.isoformat()))
349
+ ]
350
  if trades_to_reanalyze:
351
  for trade in trades_to_reanalyze:
352
  result = await re_analyze_open_trade_async(trade)
353
  if result and result['decision'].get('action') == "CLOSE_TRADE":
354
+ await trade_manager_global.close_trade(trade, result['current_price'], 'CLOSED_BY_REANALYSIS')
 
 
 
 
 
355
  should_look_for_new_trade = True
356
+ elif result and result['decision'].get('action') == "UPDATE_TRADE":
357
+ await trade_manager_global.update_trade(trade, result['decision'])
358
+
359
  if should_look_for_new_trade:
360
  portfolio_state = await r2_service_global.get_portfolio_state_async()
361
  current_capital = portfolio_state.get("current_capital_usd", 0)
 
367
  portfolio_state["invested_capital_usd"] = 0.0
368
  await r2_service_global.save_portfolio_state_async(portfolio_state)
369
  current_capital = initial_capital
370
+
371
  if current_capital > 1:
372
  new_opportunity = await find_new_opportunities_async()
373
  if new_opportunity:
374
+ if not new_opportunity['decision'].get('strategy'):
375
+ new_opportunity['decision']['strategy'] = new_opportunity.get('strategy', 'GENERIC')
376
+
377
+ await trade_manager_global.open_trade(
378
+ new_opportunity['symbol'],
379
+ new_opportunity['decision'],
380
+ new_opportunity['current_price']
381
+ )
382
  finally:
383
  r2_service_global.release_lock()
384
+ await r2_service_global.save_system_logs_async({
385
+ "cycle_completed": True,
386
+ "open_trades": len(open_trades)
387
+ })
388
  except Exception as error:
389
  print(f"Unhandled error in main cycle: {error}")
390
+ await r2_service_global.save_system_logs_async({
391
+ "cycle_error": True,
392
+ "error": str(error)
393
+ })
394
+ if r2_service_global.lock_acquired:
395
+ r2_service_global.release_lock()
396
 
397
  @asynccontextmanager
398
  async def lifespan(application: FastAPI):
399
+ global r2_service_global, data_manager_global, llm_service_global, learning_engine_global, trade_manager_global, sentiment_analyzer_global
400
  try:
401
  r2_service_global = R2Service()
402
  llm_service_global = LLMService()
 
407
  learning_engine_global = LearningEngine(r2_service_global, data_manager_global)
408
  await learning_engine_global.initialize_enhanced()
409
  await learning_engine_global.force_strategy_learning()
410
+ trade_manager_global = TradeManager(r2_service_global, learning_engine_global, data_manager_global)
411
  asyncio.create_task(monitor_market_async())
412
+ asyncio.create_task(trade_manager_global.start_trade_monitoring())
413
  await r2_service_global.save_system_logs_async({"application_started": True})
414
  yield
415
  except Exception as error:
416
  print(f"Application startup failed: {error}")
417
+ if r2_service_global:
418
+ await r2_service_global.save_system_logs_async({
419
+ "application_startup_failed": True,
420
+ "error": str(error)
421
+ })
422
  raise
423
+ finally:
424
+ await cleanup_on_shutdown()
425
 
426
  application = FastAPI(lifespan=lifespan)
427
 
 
433
  @application.get("/health")
434
  async def health_check():
435
  learning_metrics = {}
436
+ if learning_engine_global and learning_engine_global.initialized:
437
+ learning_metrics = await learning_engine_global.calculate_performance_metrics()
438
+
439
  api_stats = {}
440
+ if data_manager_global:
441
+ api_stats = data_manager_global.get_performance_stats()
442
+
443
  return {
444
+ "status": "healthy",
445
+ "timestamp": datetime.now().isoformat(),
446
+ "services": {
447
  "r2_service": "initialized" if r2_service_global else "uninitialized",
448
  "llm_service": "initialized" if llm_service_global else "uninitialized",
449
  "data_manager": "initialized" if data_manager_global else "uninitialized",
450
  "learning_engine": "active" if learning_engine_global and learning_engine_global.initialized else "inactive",
451
+ "trade_manager": "active" if trade_manager_global else "inactive"
452
+ },
453
+ "market_state_ok": state.MARKET_STATE_OK,
454
+ "learning_engine": learning_metrics
455
  }
456
 
457
  @application.get("/stats")
 
459
  try:
460
  market_context = await data_manager_global.get_market_context_async() if data_manager_global else {}
461
  learning_stats = {}
462
+ if learning_engine_global and learning_engine_global.initialized:
463
+ learning_stats = await learning_engine_global.calculate_performance_metrics()
464
+
465
  api_stats = {}
466
+ if data_manager_global:
467
+ api_stats = data_manager_global.get_performance_stats()
468
+
469
  stats = {
470
+ "timestamp": datetime.now().isoformat(),
471
+ "data_manager": api_stats,
472
+ "market_state": {
473
+ "is_healthy": state.MARKET_STATE_OK,
474
+ "context": market_context
475
+ },
476
+ "trade_monitoring": {
477
+ "active_trades": len(trade_manager_global.monitoring_tasks) if trade_manager_global else 0,
478
+ "is_running": trade_manager_global.is_running if trade_manager_global else False
479
+ },
480
+ "learning_engine": learning_stats
481
  }
482
  return stats
483
+ except Exception as error:
484
+ raise HTTPException(status_code=500, detail=f"Failed to retrieve stats: {str(error)}")
485
 
486
  @application.get("/logs/status")
487
  async def get_logs_status():
 
489
  open_trades = await r2_service_global.get_open_trades_async()
490
  portfolio_state = await r2_service_global.get_portfolio_state_async()
491
  return {
492
+ "logging_system": "active",
493
+ "open_trades_count": len(open_trades),
494
  "current_capital": portfolio_state.get("current_capital_usd", 0),
495
  "total_trades": portfolio_state.get("total_trades", 0),
496
  "timestamp": datetime.now().isoformat()
497
  }
498
+ except Exception as error:
499
+ raise HTTPException(status_code=500, detail=f"Failed to get logs status: {str(error)}")
500
 
501
  async def cleanup_on_shutdown():
502
+ global r2_service_global, data_manager_global, trade_manager_global, learning_engine_global
503
  print("Shutdown signal received. Cleaning up...")
504
  if r2_service_global:
505
+ try:
506
+ await r2_service_global.save_system_logs_async({"application_shutdown": True})
507
+ except Exception:
508
+ pass
509
+
510
  if learning_engine_global and learning_engine_global.initialized:
511
  try:
512
  await learning_engine_global.save_weights_to_r2()
513
  await learning_engine_global.save_performance_history()
514
+ except Exception:
515
+ pass
516
+
517
+ if trade_manager_global:
518
+ trade_manager_global.stop_monitoring()
519
+
520
+ if r2_service_global and r2_service_global.lock_acquired:
521
+ r2_service_global.release_lock()
522
+
523
+ if data_manager_global:
524
+ await data_manager_global.close()
525
 
526
  def signal_handler(signum, frame):
527
  asyncio.create_task(cleanup_on_shutdown())
 
530
  signal.signal(signal.SIGINT, signal_handler)
531
  signal.signal(signal.SIGTERM, signal_handler)
532
 
533
+ if __name__ == "__main__":
534
+ uvicorn.run(application, host="0.0.0.0", port=7860)