Riy777 commited on
Commit
f58d016
·
verified ·
1 Parent(s): e3d52e4

Update data_manager.py

Browse files
Files changed (1) hide show
  1. data_manager.py +31 -34
data_manager.py CHANGED
@@ -1,5 +1,5 @@
1
  # ml_engine/data_manager.py
2
- # (V12.5 - Lazy Loading Fix + V15.6 App-Compat Fix)
3
 
4
  import os
5
  import asyncio
@@ -37,7 +37,6 @@ class DataManager:
37
  # ==================================================================
38
  # ⚙️ إعدادات التحكم المركزية (V12.3 Hybrid Thresholds)
39
  # ==================================================================
40
- # العتبة الرئيسية للنظام الهجين (Titan + Patterns + MC)
41
  self.HYBRID_ENTRY_THRESHOLD = 0.60
42
  # ==================================================================
43
 
@@ -45,7 +44,6 @@ class DataManager:
45
  self.whale_monitor = whale_monitor
46
  self.r2_service = r2_service
47
 
48
- # تهيئة منصة التبادل (KuCoin كمثال)
49
  self.exchange = ccxt.kucoin({
50
  'enableRateLimit': True,
51
  'timeout': 30000,
@@ -62,41 +60,48 @@ class DataManager:
62
 
63
  async def initialize(self):
64
  """تهيئة مدير البيانات والاتصالات"""
 
65
  self.http_client = httpx.AsyncClient(timeout=30.0)
66
  await self._load_markets()
67
 
68
- print(" > [DataManager] إنشاء النماذج المساندة (Lazy Load)...") # (تغيير الرسالة)
69
  try:
70
  self.layer1_ranker = Layer1Ranker(model_path="ml_models/layer1_ranker.lgbm")
71
- # [ 🔴🔴🔴 LAZY LOAD 🔴🔴🔴 ]
72
- # await self.layer1_ranker.initialize() # (تم الإلغاء - سيتم تحميله عند الحاجة)
73
- # [ 🔴🔴🔴 END LAZY LOAD 🔴🔴🔴 ]
74
-
75
  if ChartPatternAnalyzer:
76
  self.pattern_analyzer = ChartPatternAnalyzer(r2_service=self.r2_service)
77
- # [ 🔴🔴🔴 LAZY LOAD 🔴🔴🔴 ]
78
- # await self.pattern_analyzer.initialize() # (تم الإلغاء - سيتم تحميله عند الحاجة)
79
- # [ 🔴🔴🔴 END LAZY LOAD 🔴🔴🔴 ]
80
 
81
  except Exception as e:
82
  print(f"⚠️ [DataManager] تحذير أثناء إنشاء النماذج المساندة: {e}")
83
 
84
  print(f"✅ DataManager V12.5 initialized (Hybrid Threshold: {self.HYBRID_ENTRY_THRESHOLD})")
 
 
85
 
86
  async def _load_markets(self):
87
  """تحميل بيانات الأسواق وتخزينها مؤقتاً"""
 
88
  try:
89
  if self.exchange:
 
90
  await self.exchange.load_markets()
91
  self.market_cache = self.exchange.markets
 
 
 
 
 
 
 
 
92
  except Exception as e:
93
- print(f" [DataManager] فشل تحميل الأسواق: {e}")
 
94
 
95
  async def close(self):
96
  """إغلاق جميع الاتصالات بأمان"""
 
97
  if self.http_client: await self.http_client.aclose()
98
  if self.exchange: await self.exchange.close()
99
- # تنظيف ذاكرة النماذج الفرعية إذا لزم الأمر
100
  if self.pattern_analyzer and hasattr(self.pattern_analyzer, 'clear_memory'):
101
  self.pattern_analyzer.clear_memory()
102
  if self.layer1_ranker and hasattr(self.layer1_ranker, 'clear_memory'):
@@ -108,7 +113,6 @@ class DataManager:
108
  async def load_contracts_from_r2(self):
109
  """
110
  [جديد] يقوم بتحميل قاعدة بيانات العقود من R2 عند بدء التشغيل.
111
- (مطلوب بواسطة app.py V15.6)
112
  """
113
  print(" > [DataManager] Loading contracts database from R2...")
114
  if not self.r2_service:
@@ -126,7 +130,6 @@ class DataManager:
126
  def get_contracts_db(self) -> Dict[str, Any]:
127
  """
128
  [جديد] إرجاع قاعدة بيانات العقود التي تم تحميلها.
129
- (مطلوب بواسطة app.py V15.6)
130
  """
131
  return self.contracts_db
132
 
@@ -136,7 +139,6 @@ class DataManager:
136
  async def layer1_rapid_screening(self) -> List[Dict[str, Any]]:
137
  """
138
  الغربلة الأولية السريعة جداً بناءً على الحجم فقط.
139
- تختار أعلى 150 عملة سيولةً لتمريرها للتحليل الهجين المعمق.
140
  """
141
  print(f"🔍 [Layer 1] بدء الغربلة السريعة (Top Liquid Assets)...")
142
  volume_data = await self._get_volume_data_live()
@@ -145,7 +147,6 @@ class DataManager:
145
  print("⚠️ [Layer 1 Warning] لم يتم العثور على بيانات حجم تداول.")
146
  return []
147
 
148
- # اختيار أعلى 150 عملة من حيث حجم التداول الدولاري
149
  candidates = volume_data[:150]
150
  print(f"✅ [Layer 1] تم تمرير {len(candidates)} عملة للتحليل الهجين.")
151
  return candidates
@@ -156,14 +157,12 @@ class DataManager:
156
  tickers = await self.exchange.fetch_tickers()
157
  data = []
158
  for symbol, ticker in tickers.items():
159
- # تصفية الأزواج: USDT فقط، وحجم تداول معقول (> 100k$) لتجنب العملات الميتة
160
  if symbol.endswith('/USDT') and ticker.get('quoteVolume') and ticker['quoteVolume'] > 100000:
161
  data.append({
162
  'symbol': symbol,
163
  'dollar_volume': ticker['quoteVolume'],
164
  'current_price': ticker['last']
165
  })
166
- # الترتيب التنازلي حسب الحجم
167
  data.sort(key=lambda x: x['dollar_volume'], reverse=True)
168
  return data
169
  except Exception as e:
@@ -176,14 +175,12 @@ class DataManager:
176
  async def stream_ohlcv_data(self, symbols: List[Dict], queue: asyncio.Queue):
177
  """
178
  مولد بيانات متدفق (Streaming Generator) يجلب شموع OHLCV
179
- لعدة إطارات زمنية لكل عملة، ويضع النتائج في طابور المعالجة.
180
  """
181
  timeframes = ['5m', '15m', '1h', '4h', '1d']
182
- limit = 500 # نحتاج تاريخ كافي للمؤشرات المعقدة
183
 
184
  for sym_data in symbols:
185
  symbol = sym_data['symbol']
186
- # جلب جميع الإطارات الزمنية بالتوازي للسرعة القصوى
187
  tasks = [self._fetch_ohlcv_live(symbol, tf, limit) for tf in timeframes]
188
  results = await asyncio.gather(*tasks, return_exceptions=False)
189
 
@@ -191,22 +188,16 @@ class DataManager:
191
  valid_packet = True
192
  for i, res in enumerate(results):
193
  tf = timeframes[i]
194
- # التحقق من جودة البيانات (على الأقل 200 شمعة للتحليل الدقيق)
195
  if res and isinstance(res, list) and len(res) >= 200:
196
  ohlcv_packet[tf] = res
197
  else:
198
- # إذا فقدنا إطاراً زمنياً حيوياً (مثل 5m أو 1h)، قد نعتبر الحزمة غير صالحة
199
  if tf in ['5m', '1h']: valid_packet = False
200
 
201
- # إذا كانت الحزمة مكتملة بما يكفي، نرسلها للمعالجة
202
  if valid_packet and len(ohlcv_packet) >= 4:
203
  sym_data['ohlcv'] = ohlcv_packet
204
- await queue.put([sym_data]) # نرسل كقائمة لتوافق المعالج
205
 
206
- # فاصل زمني صغير جداً لتجنب تجاوز حدود الـ API بعنف
207
  await asyncio.sleep(0.05)
208
-
209
- # علامة نهاية التدفق
210
  await queue.put(None)
211
 
212
  async def _fetch_ohlcv_live(self, symbol, timeframe, limit):
@@ -214,7 +205,6 @@ class DataManager:
214
  try:
215
  return await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
216
  except Exception:
217
- # نتجاهل الأخطاء الفردية لعدم إيقاف التدفق الكامل
218
  return None
219
 
220
  # ==================================================================
@@ -229,17 +219,24 @@ class DataManager:
229
  print(f"⚠️ [DataManager] Failed to fetch price for {symbol}: {e}")
230
  return 0.0
231
 
232
- # [NEW FIX V12.4] الدالة التي كانت مفقودة وتمت إضافتها
233
  async def get_latest_ohlcv(self, symbol: str, timeframe: str = '5m', limit: int = 100) -> List[List[float]]:
234
  """
235
  جلب عدد محدود من الشموع الأخيرة بسرعة.
236
- يستخدمها 'القناص' (Sniper) للمراقبة اللحظية الخفيفة.
237
  """
 
 
238
  try:
239
  candles = await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
 
240
  if candles and len(candles) > 0:
 
241
  return candles
242
- return []
 
 
 
 
243
  except Exception as e:
244
- # print(f"⚠️ [DataManager] Failed rapid OHLCV fetch for {symbol}: {e}")
 
245
  return []
 
1
  # ml_engine/data_manager.py
2
+ # (V12.5 - Lazy Loading Fix + V15.6 App-Compat Fix + Detailed Logging)
3
 
4
  import os
5
  import asyncio
 
37
  # ==================================================================
38
  # ⚙️ إعدادات التحكم المركزية (V12.3 Hybrid Thresholds)
39
  # ==================================================================
 
40
  self.HYBRID_ENTRY_THRESHOLD = 0.60
41
  # ==================================================================
42
 
 
44
  self.whale_monitor = whale_monitor
45
  self.r2_service = r2_service
46
 
 
47
  self.exchange = ccxt.kucoin({
48
  'enableRateLimit': True,
49
  'timeout': 30000,
 
60
 
61
  async def initialize(self):
62
  """تهيئة مدير البيانات والاتصالات"""
63
+ print(" > [DM Log] 0. بدء تهيئة DataManager...")
64
  self.http_client = httpx.AsyncClient(timeout=30.0)
65
  await self._load_markets()
66
 
67
+ print(" > [DataManager] إنشاء النماذج المساندة (Lazy Load)...")
68
  try:
69
  self.layer1_ranker = Layer1Ranker(model_path="ml_models/layer1_ranker.lgbm")
 
 
 
 
70
  if ChartPatternAnalyzer:
71
  self.pattern_analyzer = ChartPatternAnalyzer(r2_service=self.r2_service)
 
 
 
72
 
73
  except Exception as e:
74
  print(f"⚠️ [DataManager] تحذير أثناء إنشاء النماذج المساندة: {e}")
75
 
76
  print(f"✅ DataManager V12.5 initialized (Hybrid Threshold: {self.HYBRID_ENTRY_THRESHOLD})")
77
+ print(" > [DM Log] 4. اكتملت تهيئة DataManager.")
78
+
79
 
80
  async def _load_markets(self):
81
  """تحميل بيانات الأسواق وتخزينها مؤقتاً"""
82
+ print(" > [DM Log] 1. بدء _load_markets...")
83
  try:
84
  if self.exchange:
85
+ print(" > [DM Log] 2. استدعاء exchange.load_markets()... (قد يستغرق وقتاً)")
86
  await self.exchange.load_markets()
87
  self.market_cache = self.exchange.markets
88
+
89
+ if self.market_cache and len(self.market_cache) > 0:
90
+ print(f" > [DM Log] 3. ✅ نجاح! تم تحميل {len(self.market_cache)} سوق.")
91
+ else:
92
+ print(" > [DM Log] 3. ⚠️ تحذير: load_markets() نجح ولكن لم يتم إرجاع أسواق.")
93
+ else:
94
+ print(" > [DM Log] 2. ❌ خطأ: self.exchange هو None.")
95
+
96
  except Exception as e:
97
+ print(f" > [DM Log] 3. ❌❌❌ فشل فادح في _load_markets: {e}")
98
+ traceback.print_exc()
99
 
100
  async def close(self):
101
  """إغلاق جميع الاتصالات بأمان"""
102
+ print(" > [DM Log] 7. إغلاق اتصالات DataManager...")
103
  if self.http_client: await self.http_client.aclose()
104
  if self.exchange: await self.exchange.close()
 
105
  if self.pattern_analyzer and hasattr(self.pattern_analyzer, 'clear_memory'):
106
  self.pattern_analyzer.clear_memory()
107
  if self.layer1_ranker and hasattr(self.layer1_ranker, 'clear_memory'):
 
113
  async def load_contracts_from_r2(self):
114
  """
115
  [جديد] يقوم بتحميل قاعدة بيانات العقود من R2 عند بدء التشغيل.
 
116
  """
117
  print(" > [DataManager] Loading contracts database from R2...")
118
  if not self.r2_service:
 
130
  def get_contracts_db(self) -> Dict[str, Any]:
131
  """
132
  [جديد] إرجاع قاعدة بيانات العقود التي تم تحميلها.
 
133
  """
134
  return self.contracts_db
135
 
 
139
  async def layer1_rapid_screening(self) -> List[Dict[str, Any]]:
140
  """
141
  الغربلة الأولية السريعة جداً بناءً على الحجم فقط.
 
142
  """
143
  print(f"🔍 [Layer 1] بدء الغربلة السريعة (Top Liquid Assets)...")
144
  volume_data = await self._get_volume_data_live()
 
147
  print("⚠️ [Layer 1 Warning] لم يتم العثور على بيانات حجم تداول.")
148
  return []
149
 
 
150
  candidates = volume_data[:150]
151
  print(f"✅ [Layer 1] تم تمرير {len(candidates)} عملة للتحليل الهجين.")
152
  return candidates
 
157
  tickers = await self.exchange.fetch_tickers()
158
  data = []
159
  for symbol, ticker in tickers.items():
 
160
  if symbol.endswith('/USDT') and ticker.get('quoteVolume') and ticker['quoteVolume'] > 100000:
161
  data.append({
162
  'symbol': symbol,
163
  'dollar_volume': ticker['quoteVolume'],
164
  'current_price': ticker['last']
165
  })
 
166
  data.sort(key=lambda x: x['dollar_volume'], reverse=True)
167
  return data
168
  except Exception as e:
 
175
  async def stream_ohlcv_data(self, symbols: List[Dict], queue: asyncio.Queue):
176
  """
177
  مولد بيانات متدفق (Streaming Generator) يجلب شموع OHLCV
 
178
  """
179
  timeframes = ['5m', '15m', '1h', '4h', '1d']
180
+ limit = 500
181
 
182
  for sym_data in symbols:
183
  symbol = sym_data['symbol']
 
184
  tasks = [self._fetch_ohlcv_live(symbol, tf, limit) for tf in timeframes]
185
  results = await asyncio.gather(*tasks, return_exceptions=False)
186
 
 
188
  valid_packet = True
189
  for i, res in enumerate(results):
190
  tf = timeframes[i]
 
191
  if res and isinstance(res, list) and len(res) >= 200:
192
  ohlcv_packet[tf] = res
193
  else:
 
194
  if tf in ['5m', '1h']: valid_packet = False
195
 
 
196
  if valid_packet and len(ohlcv_packet) >= 4:
197
  sym_data['ohlcv'] = ohlcv_packet
198
+ await queue.put([sym_data])
199
 
 
200
  await asyncio.sleep(0.05)
 
 
201
  await queue.put(None)
202
 
203
  async def _fetch_ohlcv_live(self, symbol, timeframe, limit):
 
205
  try:
206
  return await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
207
  except Exception:
 
208
  return None
209
 
210
  # ==================================================================
 
219
  print(f"⚠️ [DataManager] Failed to fetch price for {symbol}: {e}")
220
  return 0.0
221
 
 
222
  async def get_latest_ohlcv(self, symbol: str, timeframe: str = '5m', limit: int = 100) -> List[List[float]]:
223
  """
224
  جلب عدد محدود من الشموع الأخيرة بسرعة.
 
225
  """
226
+ # (إضافة طابع خفيف لتجنب إغراق السجلات)
227
+ # print(f" > [DM Log] 5. [get_latest_ohlcv] طلب {symbol} {timeframe}...")
228
  try:
229
  candles = await self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
230
+
231
  if candles and len(candles) > 0:
232
+ # (لا نطبع هذا لأنه سينجح)
233
  return candles
234
+ else:
235
+ # (هذا هو الطابع المهم الذي يكشف الفشل الصامت)
236
+ print(f" > [DM Log] 6. ⚠️ [get_latest_ohlcv] فشل صامت لـ {symbol} {timeframe}. (أرجع قائمة فارغة).")
237
+ return []
238
+
239
  except Exception as e:
240
+ # (هذا هو الطابع المهم الذي يكشف الفشل الفادح)
241
+ print(f" > [DM Log] 6. ❌ [get_latest_ohlcv] فشل فادح لـ {symbol} {timeframe}: {e}")
242
  return []