File size: 6,652 Bytes
1d37328 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# learning_hub/curator.py
import json
import asyncio
from typing import List, Dict, Any, TYPE_CHECKING
from .schemas import Delta
if TYPE_CHECKING:
from LLM import LLMService
from .memory_store import MemoryStore
class Curator:
def __init__(self, llm_service: 'LLMService', memory_store: 'MemoryStore'):
self.llm_service = llm_service
self.memory_store = memory_store
# (This is a configuration parameter from Point 6, not a placeholder)
self.distill_threshold: int = 50
self.distilled_rules_key: str = "learning_distilled_rules.json"
print("✅ Learning Hub Module: Curator (Distiller) loaded")
async def check_and_distill_domain(self, domain: str):
"""
Checks if a domain needs distillation and runs it if the threshold is met.
(Implements Point 6 - Distillation trigger)
"""
try:
deltas_list = await self.memory_store._load_deltas_from_r2(domain)
# 1. Filter for approved Deltas only for distillation
approved_deltas = [d for d in deltas_list if d.get('approved', False)]
if len(approved_deltas) >= self.distill_threshold:
print(f"ℹ️ [Curator] Distillation threshold reached for {domain} ({len(approved_deltas)} approved deltas). Starting...")
await self.distill_deltas(domain, approved_deltas)
else:
print(f"ℹ️ [Curator] {domain} has {len(approved_deltas)}/{self.distill_threshold} approved deltas. Distillation not yet required.")
except Exception as e:
print(f"❌ [Curator] Failed to check distillation for {domain}: {e}")
async def distill_deltas(self, domain: str, deltas_to_distill: List[Dict]):
"""
Runs the LLM distillation process to merge and summarize Deltas.
(Implements Point 4 - Curator (distillation job))
"""
try:
# 1. Create the distillation prompt (Now in English)
prompt = self._create_distillation_prompt(domain, deltas_to_distill)
# 2. Call the LLM
response_text = await self.llm_service._call_llm(prompt)
if not response_text:
raise ValueError("Distiller LLM call returned no response.")
# 3. Parse the response
distilled_json = self.llm_service._parse_llm_response_enhanced(
response_text,
fallback_strategy="distillation",
symbol=domain
)
if not distilled_json or "distilled_rules" not in distilled_json:
raise ValueError(f"Failed to parse Distiller LLM response: {response_text}")
distilled_rules_text_list = distilled_json.get("distilled_rules", [])
if not isinstance(distilled_rules_text_list, list):
raise ValueError(f"Distiller LLM returned 'distilled_rules' not as a list.")
# 4. Save the new distilled rules
await self._save_distilled_rules(domain, distilled_rules_text_list, deltas_to_distill)
# 5. Archive (delete) the old approved deltas that were just distilled
all_deltas = await self.memory_store._load_deltas_from_r2(domain)
approved_ids_to_archive = {d['id'] for d in deltas_to_distill}
# Keep only non-approved (in-review) deltas, or deltas that weren't part of this batch
remaining_deltas = [
d for d in all_deltas
if not (d.get('approved', False) and d.get('id') in approved_ids_to_archive)
]
await self.memory_store._save_deltas_to_r2(domain, remaining_deltas)
print(f"✅ [Curator] Distillation complete for {domain}. Created {len(distilled_rules_text_list)} new rules. Archived {len(approved_ids_to_archive)} old deltas.")
except Exception as e:
print(f"❌ [Curator] Distillation process failed for {domain}: {e}")
async def _save_distilled_rules(self, domain: str, new_rules_text: List[str], evidence_deltas: List[Dict]):
"""Saves the new distilled rules as high-priority Deltas."""
# We save them back into the main delta file as high-priority,
# so they get picked up by the get_active_context() function.
deltas_list = await self.memory_store._load_deltas_from_r2(domain)
evidence_ids = [d.get('id', 'N/A') for d in evidence_deltas]
for rule_text in new_rules_text:
if not rule_text: continue # Skip empty strings
distilled_delta = Delta(
text=rule_text,
domain=domain,
priority="high", # Distilled rules get high priority
score=0.95, # High confidence score
evidence_refs=evidence_ids, # References all the deltas it summarized
created_by="curator_v1 (distilled)",
approved=True, # Automatically approved
usage_count=0
)
deltas_list.append(distilled_delta.model_dump())
await self.memory_store._save_deltas_to_r2(domain, deltas_list)
def _create_distillation_prompt(self, domain: str, deltas: List[Dict]) -> str:
"""
Creates the (English-only) prompt for the LLM to act as a Distiller/Curator.
(Implements Point 4 - Curator prompt)
"""
deltas_text = "\n".join([f"- {d.get('text')} (Score: {d.get('score', 0.5):.2f})" for d in deltas])
prompt = f"""
SYSTEM: You are an expert "Curator" AI. Your job is to read a list of "Deltas" (learning rules) for crypto trading, identify recurring patterns, and merge them into 3-5 concise, powerful "Golden Rules".
DOMAIN: {domain}
RAW DELTAS TO ANALYZE ({len(deltas)} rules):
{deltas_text}
--- END OF DELTAS ---
TASK:
1. Analyze the "RAW DELTAS" above.
2. Find overlaps, repetitions, and contradictions.
3. Generate 3 to 5 new "Distilled Rules" that summarize the core wisdom of these deltas.
4. Each new rule must be concise (max 25 words) and actionable.
OUTPUT FORMAT (JSON Only):
{{
"justification": "A brief explanation of the patterns you found and how you merged them.",
"distilled_rules": [
"The first golden rule (e.g., 'Always use ATR trailing stops for breakout strategies.')",
"The second golden rule (e.g., 'If RSI is overbought on 1H, avoid breakout entries.')",
"..."
]
}}
"""
return prompt |