Riy777 commited on
Commit
07eb3a2
·
1 Parent(s): 3cdd892

Update ml_engine/processor.py

Browse files
Files changed (1) hide show
  1. ml_engine/processor.py +46 -53
ml_engine/processor.py CHANGED
@@ -22,11 +22,8 @@ class MLProcessor:
22
  self.strategy_engine = MultiStrategyEngine(data_manager, learning_engine)
23
  self.monte_carlo_analyzer = MonteCarloAnalyzer()
24
  self.pattern_analyzer = ChartPatternAnalyzer()
 
25
 
26
- # 🔴 تم إزالة منظم سرعة الحيتان من هنا
27
- # self.whale_data_semaphore = asyncio.Semaphore(1)
28
-
29
- # 🔴 --- تعديل جوهري: إضافة معلمة جديدة واستخدامها --- 🔴
30
  async def process_and_score_symbol_enhanced(self, raw_data, preloaded_whale_data: dict = None):
31
  """
32
  (معدلة) المعالجة المحسنة للرموز
@@ -44,7 +41,7 @@ class MLProcessor:
44
  return None
45
 
46
  try:
47
- # حساب المؤشرات (لا تغيير)
48
  advanced_indicators = {}
49
  ohlcv_available = raw_data.get('ohlcv', {})
50
  for timeframe, candles in ohlcv_available.items():
@@ -54,38 +51,34 @@ class MLProcessor:
54
  advanced_indicators[timeframe] = indicators
55
  base_analysis['advanced_indicators'] = advanced_indicators
56
 
57
- # حساب مونت كارلو (لا تغيير)
58
  monte_carlo_probability = await self.monte_carlo_analyzer.predict_1h_probability(ohlcv_available)
59
  if monte_carlo_probability is not None:
60
  base_analysis['monte_carlo_probability'] = monte_carlo_probability
61
  base_analysis['monte_carlo_details'] = self.monte_carlo_analyzer.simulation_results
62
 
63
- # حساب الأنماط (لا تغيير)
64
  pattern_analysis = await self.pattern_analyzer.detect_chart_patterns(ohlcv_available)
65
  base_analysis['pattern_analysis'] = pattern_analysis
66
 
67
- # 🔴 --- تعديل: استخدام بيانات الحيتان المحملة مسبقاً --- 🔴
68
- # لا يوجد استدعاء شبكي هنا
69
  if preloaded_whale_data:
70
  base_analysis['whale_data'] = preloaded_whale_data.get(symbol, {'data_available': False, 'reason': 'Not preloaded'})
71
  else:
72
  base_analysis['whale_data'] = {'data_available': False, 'reason': 'Preloading disabled'}
73
- # 🔴 --- نهاية التعديل --- 🔴
74
 
75
- # حساب الاستراتيجيات والنتيجة النهائية (لا تغيير في المنطق هنا)
76
  strategy_scores, base_scores = await self.strategy_engine.evaluate_all_strategies(base_analysis, self.market_context)
77
  base_analysis['strategy_scores'] = strategy_scores
78
  base_analysis['base_strategy_scores'] = base_scores
79
 
80
  if base_scores:
81
- # ... (الكود الخاص بتحديد أفضل استراتيجية كما هو) ...
82
  best_strategy = max(base_scores.items(), key=lambda x: x[1])
83
  best_strategy_name = best_strategy[0]
84
  best_strategy_score = best_strategy[1]
85
  base_analysis['recommended_strategy'] = best_strategy_name
86
  base_analysis['strategy_confidence'] = best_strategy_score
87
  base_analysis['target_strategy'] = best_strategy_name if best_strategy_score > 0.3 else 'GENERIC'
88
-
89
 
90
  enhanced_score = self._calculate_enhanced_final_score(base_analysis)
91
  base_analysis['enhanced_final_score'] = enhanced_score
@@ -100,8 +93,6 @@ class MLProcessor:
100
  print(f"❌ خطأ فادح في المعالجة المحسنة للرمز {raw_data.get('symbol', 'unknown')}: {error}")
101
  return None
102
 
103
- # ...(بقية الدوال في الملف كما هي بدون تغيير، بما في ذلك _calculate_enhanced_final_score)...
104
-
105
  def _create_dataframe(self, candles):
106
  """إنشاء DataFrame من بيانات الشموع مع DatetimeIndex مرتب"""
107
  try:
@@ -199,7 +190,45 @@ class MLProcessor:
199
  print(f" 🐋 حيتان: {signal.get('action', 'HOLD')} (ثقة: {signal.get('confidence', 0):.2f})")
200
  return top_candidates
201
 
202
- # (safe_json_parse و process_multiple_symbols_parallel كما هي، لكن process_multiple_symbols_parallel لم تعد تُستخدم مباشرة في التدفق الجديد)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
203
  def safe_json_parse(json_string):
204
  if not json_string: return None
205
  try:
@@ -215,40 +244,4 @@ def safe_json_parse(json_string):
215
  return json.loads(s)
216
  except json.JSONDecodeError: return None
217
 
218
- async def process_multiple_symbols_parallel(self, symbols_data_list, preloaded_whale_data: dict, max_concurrent=5):
219
- """(معدلة) معالجة متعددة للرموز بشكل متوازٍ باستخدام بيانات الحيتان المحملة مسبقًا"""
220
- # 🔴 استخدام Semaphore داخلي للتحكم في مهام ML نفسها (وليس الحيتان)
221
- semaphore = asyncio.Semaphore(max_concurrent)
222
- tasks_results = []
223
-
224
- async def process_symbol_with_semaphore(symbol_data):
225
- async with semaphore:
226
- try:
227
- # تمرير بيانات الحيتان المحملة مسبقًا
228
- return await self.process_and_score_symbol_enhanced(symbol_data, preloaded_whale_data)
229
- except Exception as e:
230
- return e
231
-
232
- try:
233
- # print(f"🚀 بدء المعالجة المتوازية لـ {len(symbols_data_list)} رمز (ML بحد أقصى {max_concurrent})...") # Reduced logging
234
-
235
- batch_tasks = [asyncio.create_task(process_symbol_with_semaphore(sd)) for sd in symbols_data_list]
236
- batch_results = await asyncio.gather(*batch_tasks, return_exceptions=False)
237
-
238
- successful_results = []
239
- for result in batch_results:
240
- if isinstance(result, Exception):
241
- # print(f" ⚠️ خطأ في مهمة ML متوازية: {result}") # Reduced logging
242
- continue
243
- if isinstance(result, dict) and result.get('enhanced_final_score', 0) > 0.4:
244
- successful_results.append(result)
245
-
246
- # print(f"🎯 اكتملت المعالجة المتوازية للمجموعة: {len(successful_results)}/{len(symbols_data_list)} ناجحة") # Reduced logging
247
- return successful_results
248
-
249
- except Exception as error:
250
- print(f"❌ خطأ في المعالجة المتوازية لـ ML: {error}")
251
- return []
252
-
253
-
254
- print("✅ ML Processor loaded - Non-Blocking Whale Data Integration")
 
22
  self.strategy_engine = MultiStrategyEngine(data_manager, learning_engine)
23
  self.monte_carlo_analyzer = MonteCarloAnalyzer()
24
  self.pattern_analyzer = ChartPatternAnalyzer()
25
+ self.whale_data_semaphore = asyncio.Semaphore(2)
26
 
 
 
 
 
27
  async def process_and_score_symbol_enhanced(self, raw_data, preloaded_whale_data: dict = None):
28
  """
29
  (معدلة) المعالجة المحسنة للرموز
 
41
  return None
42
 
43
  try:
44
+ # حساب المؤشرات
45
  advanced_indicators = {}
46
  ohlcv_available = raw_data.get('ohlcv', {})
47
  for timeframe, candles in ohlcv_available.items():
 
51
  advanced_indicators[timeframe] = indicators
52
  base_analysis['advanced_indicators'] = advanced_indicators
53
 
54
+ # حساب مونت كارلو
55
  monte_carlo_probability = await self.monte_carlo_analyzer.predict_1h_probability(ohlcv_available)
56
  if monte_carlo_probability is not None:
57
  base_analysis['monte_carlo_probability'] = monte_carlo_probability
58
  base_analysis['monte_carlo_details'] = self.monte_carlo_analyzer.simulation_results
59
 
60
+ # حساب الأنماط
61
  pattern_analysis = await self.pattern_analyzer.detect_chart_patterns(ohlcv_available)
62
  base_analysis['pattern_analysis'] = pattern_analysis
63
 
64
+ # استخدام بيانات الحيتان المحملة مسبقاً
 
65
  if preloaded_whale_data:
66
  base_analysis['whale_data'] = preloaded_whale_data.get(symbol, {'data_available': False, 'reason': 'Not preloaded'})
67
  else:
68
  base_analysis['whale_data'] = {'data_available': False, 'reason': 'Preloading disabled'}
 
69
 
70
+ # حساب الاستراتيجيات والنتيجة النهائية
71
  strategy_scores, base_scores = await self.strategy_engine.evaluate_all_strategies(base_analysis, self.market_context)
72
  base_analysis['strategy_scores'] = strategy_scores
73
  base_analysis['base_strategy_scores'] = base_scores
74
 
75
  if base_scores:
 
76
  best_strategy = max(base_scores.items(), key=lambda x: x[1])
77
  best_strategy_name = best_strategy[0]
78
  best_strategy_score = best_strategy[1]
79
  base_analysis['recommended_strategy'] = best_strategy_name
80
  base_analysis['strategy_confidence'] = best_strategy_score
81
  base_analysis['target_strategy'] = best_strategy_name if best_strategy_score > 0.3 else 'GENERIC'
 
82
 
83
  enhanced_score = self._calculate_enhanced_final_score(base_analysis)
84
  base_analysis['enhanced_final_score'] = enhanced_score
 
93
  print(f"❌ خطأ فادح في المعالجة المحسنة للرمز {raw_data.get('symbol', 'unknown')}: {error}")
94
  return None
95
 
 
 
96
  def _create_dataframe(self, candles):
97
  """إنشاء DataFrame من بيانات الشموع مع DatetimeIndex مرتب"""
98
  try:
 
190
  print(f" 🐋 حيتان: {signal.get('action', 'HOLD')} (ثقة: {signal.get('confidence', 0):.2f})")
191
  return top_candidates
192
 
193
+ # 🔴 --- بدء الإصلاح: نقل الدالة إلى داخل الكلاس --- 🔴
194
+ # تأكد من أن هذه الدالة بنفس مستوى المسافة الب��دئة مثل الدوال الأخرى في الكلاس
195
+ async def process_multiple_symbols_parallel(self, symbols_data_list, preloaded_whale_data: dict, max_concurrent=5):
196
+ """(معدلة) معالجة متعددة للرموز بشكل متوازٍ باستخدام بيانات الحيتان المحملة مسبقًا"""
197
+ semaphore = asyncio.Semaphore(max_concurrent)
198
+ tasks_results = []
199
+
200
+ async def process_symbol_with_semaphore(symbol_data):
201
+ async with semaphore:
202
+ try:
203
+ # تمرير بيانات الحيتان المحملة مسبقًا
204
+ return await self.process_and_score_symbol_enhanced(symbol_data, preloaded_whale_data)
205
+ except Exception as e:
206
+ return e # Return the exception itself
207
+
208
+ try:
209
+ batch_tasks = [asyncio.create_task(process_symbol_with_semaphore(sd)) for sd in symbols_data_list]
210
+ batch_results = await asyncio.gather(*batch_tasks, return_exceptions=False) # Capture results/exceptions manually
211
+
212
+ successful_results = []
213
+ for result in batch_results:
214
+ if isinstance(result, Exception):
215
+ # Propagate the exception to be handled by the caller (process_batch_parallel in app.py)
216
+ # This makes error reporting more consistent.
217
+ raise result
218
+ # Check result is a dict before accessing keys
219
+ if isinstance(result, dict): # Removed score check here, done by caller
220
+ successful_results.append(result)
221
+ # else: # Optional: Handle unexpected return types if necessary
222
+ # print(f"Warning: Unexpected type returned by process_symbol_with_semaphore: {type(result)}")
223
+
224
+ return successful_results # Return list of results (dicts)
225
+
226
+ except Exception as error: # Catch errors raised from gather or process_symbol_with_semaphore
227
+ # Re-raise the error to be caught by process_batch_parallel in app.py
228
+ raise error
229
+ # 🔴 --- نهاية الإصلاح --- 🔴
230
+
231
+ # (safe_json_parse يمكن أن تبقى خارج الكلاس لأنها دالة مساعدة عامة)
232
  def safe_json_parse(json_string):
233
  if not json_string: return None
234
  try:
 
244
  return json.loads(s)
245
  except json.JSONDecodeError: return None
246
 
247
+ print("✅ ML Processor loaded - Non-Blocking Whale Data Integration & Fixed Method Definition")