File size: 6,652 Bytes
1d37328
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# learning_hub/curator.py
import json
import asyncio
from typing import List, Dict, Any, TYPE_CHECKING
from .schemas import Delta

if TYPE_CHECKING:
    from LLM import LLMService
    from .memory_store import MemoryStore

class Curator:
    def __init__(self, llm_service: 'LLMService', memory_store: 'MemoryStore'):
        self.llm_service = llm_service
        self.memory_store = memory_store
        
        # (This is a configuration parameter from Point 6, not a placeholder)
        self.distill_threshold: int = 50 
        self.distilled_rules_key: str = "learning_distilled_rules.json"
        print("✅ Learning Hub Module: Curator (Distiller) loaded")

    async def check_and_distill_domain(self, domain: str):
        """
        Checks if a domain needs distillation and runs it if the threshold is met.
        (Implements Point 6 - Distillation trigger)
        """
        try:
            deltas_list = await self.memory_store._load_deltas_from_r2(domain)
            
            # 1. Filter for approved Deltas only for distillation
            approved_deltas = [d for d in deltas_list if d.get('approved', False)]
            
            if len(approved_deltas) >= self.distill_threshold:
                print(f"ℹ️ [Curator] Distillation threshold reached for {domain} ({len(approved_deltas)} approved deltas). Starting...")
                await self.distill_deltas(domain, approved_deltas)
            else:
                print(f"ℹ️ [Curator] {domain} has {len(approved_deltas)}/{self.distill_threshold} approved deltas. Distillation not yet required.")
                
        except Exception as e:
            print(f"❌ [Curator] Failed to check distillation for {domain}: {e}")

    async def distill_deltas(self, domain: str, deltas_to_distill: List[Dict]):
        """
        Runs the LLM distillation process to merge and summarize Deltas.
        (Implements Point 4 - Curator (distillation job))
        """
        try:
            # 1. Create the distillation prompt (Now in English)
            prompt = self._create_distillation_prompt(domain, deltas_to_distill)
            
            # 2. Call the LLM
            response_text = await self.llm_service._call_llm(prompt)
            
            if not response_text:
                raise ValueError("Distiller LLM call returned no response.")

            # 3. Parse the response
            distilled_json = self.llm_service._parse_llm_response_enhanced(
                response_text, 
                fallback_strategy="distillation", 
                symbol=domain
            )
            
            if not distilled_json or "distilled_rules" not in distilled_json:
                raise ValueError(f"Failed to parse Distiller LLM response: {response_text}")

            distilled_rules_text_list = distilled_json.get("distilled_rules", [])
            if not isinstance(distilled_rules_text_list, list):
                 raise ValueError(f"Distiller LLM returned 'distilled_rules' not as a list.")

            # 4. Save the new distilled rules
            await self._save_distilled_rules(domain, distilled_rules_text_list, deltas_to_distill)
            
            # 5. Archive (delete) the old approved deltas that were just distilled
            all_deltas = await self.memory_store._load_deltas_from_r2(domain)
            approved_ids_to_archive = {d['id'] for d in deltas_to_distill}
            
            # Keep only non-approved (in-review) deltas, or deltas that weren't part of this batch
            remaining_deltas = [
                d for d in all_deltas 
                if not (d.get('approved', False) and d.get('id') in approved_ids_to_archive)
            ]
            
            await self.memory_store._save_deltas_to_r2(domain, remaining_deltas)
            
            print(f"✅ [Curator] Distillation complete for {domain}. Created {len(distilled_rules_text_list)} new rules. Archived {len(approved_ids_to_archive)} old deltas.")

        except Exception as e:
            print(f"❌ [Curator] Distillation process failed for {domain}: {e}")
            
    async def _save_distilled_rules(self, domain: str, new_rules_text: List[str], evidence_deltas: List[Dict]):
        """Saves the new distilled rules as high-priority Deltas."""
        
        # We save them back into the main delta file as high-priority,
        # so they get picked up by the get_active_context() function.
        
        deltas_list = await self.memory_store._load_deltas_from_r2(domain)
        evidence_ids = [d.get('id', 'N/A') for d in evidence_deltas]
        
        for rule_text in new_rules_text:
            if not rule_text: continue # Skip empty strings
            
            distilled_delta = Delta(
                text=rule_text,
                domain=domain,
                priority="high", # Distilled rules get high priority
                score=0.95,      # High confidence score
                evidence_refs=evidence_ids, # References all the deltas it summarized
                created_by="curator_v1 (distilled)",
                approved=True,   # Automatically approved
                usage_count=0
            )
            deltas_list.append(distilled_delta.model_dump())
            
        await self.memory_store._save_deltas_to_r2(domain, deltas_list)

    def _create_distillation_prompt(self, domain: str, deltas: List[Dict]) -> str:
        """
        Creates the (English-only) prompt for the LLM to act as a Distiller/Curator.
        (Implements Point 4 - Curator prompt)
        """
        
        deltas_text = "\n".join([f"- {d.get('text')} (Score: {d.get('score', 0.5):.2f})" for d in deltas])
        
        prompt = f"""
SYSTEM: You are an expert "Curator" AI. Your job is to read a list of "Deltas" (learning rules) for crypto trading, identify recurring patterns, and merge them into 3-5 concise, powerful "Golden Rules".

DOMAIN: {domain}

RAW DELTAS TO ANALYZE ({len(deltas)} rules):
{deltas_text}
--- END OF DELTAS ---

TASK:
1.  Analyze the "RAW DELTAS" above.
2.  Find overlaps, repetitions, and contradictions.
3.  Generate 3 to 5 new "Distilled Rules" that summarize the core wisdom of these deltas.
4.  Each new rule must be concise (max 25 words) and actionable.

OUTPUT FORMAT (JSON Only):
{{
  "justification": "A brief explanation of the patterns you found and how you merged them.",
  "distilled_rules": [
    "The first golden rule (e.g., 'Always use ATR trailing stops for breakout strategies.')",
    "The second golden rule (e.g., 'If RSI is overbought on 1H, avoid breakout entries.')",
    "..."
  ]
}}
"""
        return prompt