Trad / LLM.py
Riy777's picture
Update LLM.py
bacfebc
raw
history blame
12.3 kB
# LLM.py (V13.4 - The "Heavyweight" Omniscient Brain)
# استعادة كامل التفاصيل في البرومبتات لضمان عدم وجود أي اختصارات.
import os, traceback, json, re, time
from datetime import datetime
from typing import Dict, Any, Optional
from openai import AsyncOpenAI, RateLimitError, APIError
# ==============================================================================
# 🔌 إعدادات الاتصال (مطابقة للأصل تماماً)
# ==============================================================================
LLM_API_URL = os.getenv("LLM_API_URL", "https://integrate.api.nvidia.com/v1")
LLM_API_KEY = os.getenv("LLM_API_KEY")
LLM_MODEL = os.getenv("LLM_MODEL", "nvidia/llama-3.1-nemotron-ultra-253b-v1")
LLM_TEMPERATURE = 0.2
LLM_TOP_P = 0.7
LLM_MAX_TOKENS = 16384
LLM_FREQUENCY_PENALTY = 0.8
LLM_PRESENCE_PENALTY = 0.5
CLIENT_TIMEOUT = 300.0
class LLMService:
def __init__(self):
if not LLM_API_KEY:
raise ValueError("❌ [LLM] LLM_API_KEY is missing!")
self.client = AsyncOpenAI(
base_url=LLM_API_URL,
api_key=LLM_API_KEY,
timeout=CLIENT_TIMEOUT
)
self.r2_service = None
self.learning_hub = None
print(f"🧠 [LLM V13.4] Heavyweight Brain Initialized: {LLM_MODEL}")
async def _call_llm(self, prompt: str) -> Optional[str]:
"""إرسال الطلب مع تفعيل وضع التفكير العميق بدقة"""
# ⚠️ هام: هذا الإعداد دقيق جداً لتفعيل قدرات Nemotron الخاصة
system_prompt = "detailed thinking on"
try:
response = await self.client.chat.completions.create(
model=LLM_MODEL,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt}
],
temperature=LLM_TEMPERATURE,
top_p=LLM_TOP_P,
max_tokens=LLM_MAX_TOKENS,
frequency_penalty=LLM_FREQUENCY_PENALTY,
presence_penalty=LLM_PRESENCE_PENALTY,
stream=False,
response_format={"type": "json_object"}
)
return response.choices[0].message.content
except Exception as e:
print(f"❌ [LLM Call Error] {e}")
return None
def _parse_json_secure(self, text: str) -> Optional[Dict]:
"""محلل JSON قوي يستخرج البيانات من أي نص"""
try:
match = re.search(r'\{.*\}', text, re.DOTALL)
if match: return json.loads(match.group(0))
except: pass
return None
# ==================================================================
# 🧠 الوظيفة 1: قرار الدخول الاستراتيجي (تحليل شامل ومفصل)
# ==================================================================
async def get_trading_decision(self, candidate_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
symbol = candidate_data.get('symbol', 'UNKNOWN')
try:
# جلب سياق التعلم السابق إن وجد
learning_context = "Playbook: No specific prior learning for this scenario."
if self.learning_hub:
learning_context = await self.learning_hub.get_active_context_for_llm("general", f"{symbol} entry analysis")
# بناء البرومبت المفصل جداً
prompt = self._create_heavyweight_entry_prompt(candidate_data, learning_context)
# استدعاء النموذج
response_text = await self._call_llm(prompt)
decision = self._parse_json_secure(response_text)
# حفظ نسخة طبق الأصل من الطلب والرد للتدقيق
if self.r2_service and response_text:
await self.r2_service.save_llm_prompt_async(symbol, "entry_decision_full", prompt, response_text)
return decision
except Exception as e:
print(f"❌ [LLM Entry Error] {symbol}: {e}")
traceback.print_exc()
return None
# ==================================================================
# 🔄 الوظيفة 2: إعادة التحليل الدوري (مراجعة شاملة للوضع)
# ==================================================================
async def re_analyze_trade_async(self, trade_data: Dict[str, Any], current_data: Dict[str, Any]) -> Optional[Dict[str, Any]]:
symbol = trade_data.get('symbol', 'UNKNOWN')
try:
strategy = trade_data.get('entry_reason', 'GENERIC')
learning_context = "Playbook: Maintain original strategy unless validated invalidation occurs."
if self.learning_hub:
learning_context = await self.learning_hub.get_active_context_for_llm("strategy", f"{symbol} re-eval {strategy}")
prompt = self._create_heavyweight_reanalysis_prompt(trade_data, current_data, learning_context)
response_text = await self._call_llm(prompt)
decision = self._parse_json_secure(response_text)
if self.r2_service and response_text:
await self.r2_service.save_llm_prompt_async(symbol, "re_analysis_full", prompt, response_text)
return decision
except Exception as e:
print(f"❌ [LLM Re-Eval Error] {symbol}: {e}")
return None
# ==================================================================
# 📝 قسم هندسة البرومبتات (تفاصيل كاملة بدون اختصارات)
# ==================================================================
def _create_heavyweight_entry_prompt(self, data: Dict[str, Any], learning_context: str) -> str:
"""
إنشاء برومبت ضخم يحتوي على كل شاردة وواردة من البيانات المتاحة.
"""
symbol = data.get('symbol')
current_price = data.get('current_price')
# 1. تفاصيل الطبقات السابقة (التحليل الفني والكمي)
titan_score = data.get('titan_details', {}).get('score', 0)
titan_trend = "STRONG_UP" if titan_score > 0.7 else "UP" if titan_score > 0.5 else "WEAK"
pat_details = data.get('pattern_details', {})
pat_name = pat_details.get('pattern_detected', 'None')
pat_conf = pat_details.get('pattern_confidence', 0)
mc_score = data.get('components', {}).get('mc_score', 0)
l1_total = data.get('enhanced_final_score', 0)
l2_total = data.get('layer2_score', 0)
# 2. تفاصيل بيانات الحيتان (كاملة)
whale = data.get('whale_data', {})
whale_1h = whale.get('exchange_flows', {})
whale_24h = whale.get('accumulation_analysis_24h', {})
whale_section = f"""
- 1H Net Flow to Exchanges: ${whale_1h.get('net_flow_usd', 0):,.2f}
- 1H Deposits: {whale_1h.get('deposit_count', 0)} | Withdrawals: {whale_1h.get('withdrawal_count', 0)}
- 24H Accumulation Flow: ${whale_24h.get('net_flow_usd', 0):,.2f}
- 24H Whale Transaction Count: {whale_24h.get('whale_transfers_count', 0)}
- Relative Flow Impact (24H): {whale_24h.get('relative_net_flow_percent', 0):.4f}%
"""
# 3. تفاصيل الأخبار (النص الخام الكامل)
news_text = data.get('news_text', 'No specific news available for this asset currently.')
# 4. لقطة السوق (Price Action Snapshot)
ohlcv = data.get('ohlcv_sample', {})
price_section = ""
for tf, candle in ohlcv.items():
if candle:
# [Timestamp, Open, High, Low, Close, Volume]
price_section += f" - {tf.upper()}: Open={candle[1]}, High={candle[2]}, Low={candle[3]}, Close={candle[4]}, Vol={candle[5]}\n"
return f"""
YOU ARE THE OMNISCIENT BRAIN. A skeptical, master-level crypto trading AI.
Your goal is to validate the findings of your sub-systems and make the FINAL GO/NO-GO decision for {symbol}.
Current Price: {current_price}
========== 🧠 PART 1: SUB-SYSTEM REPORTS (PRELIMINARY ANALYSIS) ==========
Your subordinate systems have flagged this asset with the following scores:
* Layer 1 Technical Score: {l1_score:.4f} / 1.0
- Titan ML Trend Model: {titan_score:.4f} ({titan_trend})
- Chart Pattern Recognition: {pat_name} (Confidence: {pat_conf:.2f})
- Monte Carlo Probability (1H): {mc_score:.4f}
* Layer 2 Enhanced Score: {l2_total:.4f} / 1.0 (After initial whale/news weighting)
========== 🔍 PART 2: RAW EVIDENCE FOR VERIFICATION (THE TRUTH) ==========
Do not trust the scores above blindly. Verify them against this raw data:
[A] RAW PRICE ACTION SNAPSHOT (OHLCV Last Closed Candles):
{price_section}
-> TASK: Does this price action confirm the 'Titan Trend' reported above?
[B] RAW WHALE ON-CHAIN ACTIVITY:
{whale_section}
-> TASK: Is there hidden distribution (selling) despite the technical uptrend?
[C] RAW NEWSWIRE FEED (Latest Headlines & Summaries):
\"\"\"{news_text}\"\"\"
-> TASK: Are there any immediate red flags, FUD, or regulatory risks in this text?
========== 📖 PART 3: INSTITUTIONAL MEMORY (LEARNING PLAYBOOK) ==========
{learning_context}
========== 🛑 FINAL DECISION TASK ==========
Perform a deep, step-by-step internal analysis (triggered by your system mode).
Compare PART 1 (Opinions) vs PART 2 (Facts).
If FACTS contradict OPINIONS, you MUST reject the trade.
REQUIRED OUTPUT (Strict JSON format ONLY):
{{
"action": "WATCH" or "IGNORE",
"confidence_level": 0.00 to 1.00,
"reasoning": "A rigorous, professional justification citing specific raw evidence (e.g., 'Whale 1H inflows of $5M contradict Titan trend').",
"strategy_directive": "MOMENTUM_BREAKOUT" or "DIP_ACCUMULATION" or "SCALP_REVERSAL",
"key_risk_factor": "Identify the single biggest risk based on raw evidence."
}}
"""
def _create_heavyweight_reanalysis_prompt(self, trade: Dict, current: Dict, learning_context: str) -> str:
"""
إنشاء برومبت مفصل لإعادة تقييم صفقة مفتوحة بناءً على تغير الظروف.
"""
symbol = trade.get('symbol')
entry_price = trade.get('entry_price')
current_price = current.get('current_price')
pnl_pct = ((current_price - entry_price) / entry_price) * 100
duration_min = (datetime.now() - datetime.fromisoformat(trade.get('entry_time').replace('Z', ''))).total_seconds() / 60
# البيانات الحالية المقارنة
titan_now = current.get('titan_score', 0)
whale_now = current.get('whale_data', {})
whale_1h_net = whale_now.get('exchange_flows', {}).get('net_flow_usd', 0)
news_now = current.get('news_text', 'No new significant news.')
return f"""
ROLE: Omniscient Brain (Trade Guardian Mode).
EVENT: Mandatory periodic re-evaluation of OPEN POSITION.
ASSET: {symbol}
TIME IN TRADE: {duration_min:.1f} minutes
========== 📉 POSITION STATUS ==========
* Entry Price: {entry_price}
* Current Price: {current_price}
* Unrealized PnL: {pnl_pct:+.2f}%
* Original Entry Reason: "{trade.get('entry_reason')}"
========== 🆕 CHANGED MARKET CONDITIONS (RAW DATA) ==========
1. ML Trend Update (Titan): Currently {titan_now:.4f}
2. Fresh Whale Activity (Last 1H): Net Flow ${whale_1h_net:,.0f}
(Positive = potential selling pressure, Negative = accumulation)
3. Latest News Update:
\"\"\"{news_now[:1000]}\"\"\"
========== 📖 PLAYBOOK GUIDELINES ==========
{learning_context}
========== 🛡️ GUARDIAN DECISION ==========
Analyze if the original investment thesis is still valid based on the NEW raw data.
Output Strict JSON ONLY:
{{
"action": "HOLD" or "EMERGENCY_EXIT" or "UPDATE_TARGETS",
"suggested_new_tp": null or float value,
"suggested_new_sl": null or float value,
"reasoning": "Professional assessment of current risk vs original thesis."
}}
"""
print("✅ LLM Service V13.4 (Heavyweight Omniscient Brain) Loaded - NO SHORTCUTS")