Rohan03 commited on
Commit
9a443e7
·
verified ·
1 Parent(s): 9d76fd3

v0.2.0: Add purpose_agent/hitl.py

Browse files
Files changed (1) hide show
  1. purpose_agent/hitl.py +462 -0
purpose_agent/hitl.py ADDED
@@ -0,0 +1,462 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Human-in-the-Loop — Checkpoint, interrupt, resume, and Φ score overrides.
3
+
4
+ Allows humans to:
5
+ 1. Pause the agent at any step and inspect state
6
+ 2. Override the Purpose Function's Φ score (teach the agent what "good" means)
7
+ 3. Edit the agent's planned action before execution
8
+ 4. Resume from any checkpoint after review
9
+ 5. Inject heuristics directly ("when you see X, always do Y")
10
+
11
+ Φ score overrides are the killer feature: humans can correct the critic,
12
+ and the corrected scores flow into experience replay → the agent permanently
13
+ learns the human's preference. No fine-tuning needed.
14
+ """
15
+
16
+ from __future__ import annotations
17
+
18
+ import json
19
+ import logging
20
+ import os
21
+ import time
22
+ from dataclasses import dataclass, field
23
+ from enum import Enum
24
+ from pathlib import Path
25
+ from typing import Any, Callable
26
+
27
+ from purpose_agent.types import (
28
+ Action, Heuristic, MemoryTier, PurposeScore, State,
29
+ Trajectory, TrajectoryStep,
30
+ )
31
+ from purpose_agent.llm_backend import LLMBackend
32
+ from purpose_agent.orchestrator import Environment, Orchestrator, TaskResult
33
+
34
+ logger = logging.getLogger(__name__)
35
+
36
+
37
+ # ---------------------------------------------------------------------------
38
+ # Checkpoint — serializable snapshot of agent state
39
+ # ---------------------------------------------------------------------------
40
+
41
+ @dataclass
42
+ class Checkpoint:
43
+ """
44
+ A serializable snapshot of the entire agent state at a point in time.
45
+
46
+ Can be saved to disk and restored later to resume execution.
47
+ """
48
+ step_index: int
49
+ current_state: dict[str, Any] # State.data
50
+ state_summary: str
51
+ trajectory_steps: list[dict[str, Any]]
52
+ purpose: str
53
+ task_description: str
54
+ history: list[dict[str, Any]]
55
+ heuristics: list[dict[str, Any]]
56
+ timestamp: float = field(default_factory=time.time)
57
+ checkpoint_id: str = ""
58
+
59
+ def save(self, path: str) -> None:
60
+ """Save checkpoint to disk."""
61
+ Path(path).parent.mkdir(parents=True, exist_ok=True)
62
+ with open(path, "w") as f:
63
+ json.dump(self.__dict__, f, indent=2, default=str)
64
+ logger.info(f"Checkpoint saved: {path}")
65
+
66
+ @classmethod
67
+ def load(cls, path: str) -> "Checkpoint":
68
+ """Load checkpoint from disk."""
69
+ with open(path) as f:
70
+ data = json.load(f)
71
+ return cls(**data)
72
+
73
+
74
+ # ---------------------------------------------------------------------------
75
+ # Human Input Types
76
+ # ---------------------------------------------------------------------------
77
+
78
+ class InterruptType(Enum):
79
+ """Types of human-in-the-loop interrupts."""
80
+ APPROVE_ACTION = "approve_action" # Human must approve before execution
81
+ OVERRIDE_SCORE = "override_score" # Human overrides Φ score
82
+ EDIT_ACTION = "edit_action" # Human edits the action
83
+ INJECT_HEURISTIC = "inject_heuristic" # Human teaches a new heuristic
84
+ PAUSE = "pause" # Just pause for inspection
85
+ ABORT = "abort" # Stop the task
86
+
87
+
88
+ @dataclass
89
+ class HumanInput:
90
+ """Input received from a human during an interrupt."""
91
+ interrupt_type: InterruptType
92
+ approved: bool = True
93
+ edited_action: Action | None = None
94
+ override_score: PurposeScore | None = None
95
+ injected_heuristic: Heuristic | None = None
96
+ message: str = ""
97
+ timestamp: float = field(default_factory=time.time)
98
+
99
+
100
+ # ---------------------------------------------------------------------------
101
+ # Input Handler Protocol
102
+ # ---------------------------------------------------------------------------
103
+
104
+ class HumanInputHandler:
105
+ """
106
+ Handler that collects human input during interrupts.
107
+
108
+ Override this for your UI:
109
+ - CLIInputHandler: Terminal prompts (default)
110
+ - WebInputHandler: Web-based approval UI
111
+ - SlackInputHandler: Slack bot approvals
112
+ - AutoApproveHandler: Approve everything (for testing)
113
+ """
114
+
115
+ def request_input(
116
+ self,
117
+ interrupt_type: InterruptType,
118
+ context: dict[str, Any],
119
+ ) -> HumanInput:
120
+ """
121
+ Request input from a human.
122
+
123
+ Args:
124
+ interrupt_type: What kind of input is needed
125
+ context: Current state, proposed action, scores, etc.
126
+
127
+ Returns:
128
+ HumanInput with the human's decision
129
+ """
130
+ raise NotImplementedError("Override in subclass")
131
+
132
+
133
+ class AutoApproveHandler(HumanInputHandler):
134
+ """Automatically approve everything. For testing / autonomous mode."""
135
+
136
+ def request_input(self, interrupt_type, context):
137
+ return HumanInput(interrupt_type=interrupt_type, approved=True)
138
+
139
+
140
+ class CLIInputHandler(HumanInputHandler):
141
+ """Command-line human-in-the-loop handler."""
142
+
143
+ def request_input(
144
+ self,
145
+ interrupt_type: InterruptType,
146
+ context: dict[str, Any],
147
+ ) -> HumanInput:
148
+
149
+ print("\n" + "=" * 60)
150
+ print(f" 🧑 HUMAN INPUT REQUESTED: {interrupt_type.value}")
151
+ print("=" * 60)
152
+
153
+ if interrupt_type == InterruptType.APPROVE_ACTION:
154
+ print(f"\n Proposed action: {context.get('action_name', '?')}")
155
+ print(f" Thought: {context.get('thought', '')[:200]}")
156
+ print(f" Expected delta: {context.get('expected_delta', '')[:200]}")
157
+ response = input("\n Approve? (y/n/edit): ").strip().lower()
158
+
159
+ if response == "n":
160
+ return HumanInput(interrupt_type=interrupt_type, approved=False)
161
+ elif response.startswith("edit"):
162
+ new_action = input(" New action name: ").strip()
163
+ return HumanInput(
164
+ interrupt_type=InterruptType.EDIT_ACTION,
165
+ approved=True,
166
+ edited_action=Action(name=new_action, params={}, thought="Human override"),
167
+ )
168
+ return HumanInput(interrupt_type=interrupt_type, approved=True)
169
+
170
+ elif interrupt_type == InterruptType.OVERRIDE_SCORE:
171
+ print(f"\n Current Φ_before: {context.get('phi_before', '?')}")
172
+ print(f" Current Φ_after: {context.get('phi_after', '?')}")
173
+ print(f" Current Δ: {context.get('delta', '?')}")
174
+ print(f" Evidence: {context.get('evidence', '')[:200]}")
175
+
176
+ override = input("\n Override Φ_after? (number or 'keep'): ").strip()
177
+ if override.lower() == "keep" or not override:
178
+ return HumanInput(interrupt_type=interrupt_type, approved=True)
179
+
180
+ try:
181
+ new_phi = float(override)
182
+ return HumanInput(
183
+ interrupt_type=interrupt_type,
184
+ approved=True,
185
+ override_score=PurposeScore(
186
+ phi_before=context.get("phi_before", 0),
187
+ phi_after=new_phi,
188
+ delta=new_phi - context.get("phi_before", 0),
189
+ reasoning="Human override",
190
+ evidence="Human judgment",
191
+ confidence=1.0,
192
+ ),
193
+ )
194
+ except ValueError:
195
+ return HumanInput(interrupt_type=interrupt_type, approved=True)
196
+
197
+ elif interrupt_type == InterruptType.INJECT_HEURISTIC:
198
+ print("\n Teach the agent a new heuristic:")
199
+ pattern = input(" When (pattern): ").strip()
200
+ strategy = input(" Do (strategy): ").strip()
201
+
202
+ if pattern and strategy:
203
+ return HumanInput(
204
+ interrupt_type=interrupt_type,
205
+ approved=True,
206
+ injected_heuristic=Heuristic(
207
+ pattern=pattern,
208
+ strategy=strategy,
209
+ steps=[],
210
+ tier=MemoryTier.STRATEGIC,
211
+ q_value=1.0, # Human-taught = high confidence
212
+ ),
213
+ )
214
+ return HumanInput(interrupt_type=interrupt_type, approved=True)
215
+
216
+ else:
217
+ response = input("\n Continue? (y/n): ").strip().lower()
218
+ return HumanInput(
219
+ interrupt_type=interrupt_type,
220
+ approved=(response != "n"),
221
+ )
222
+
223
+
224
+ # ---------------------------------------------------------------------------
225
+ # HITL Orchestrator — wraps Orchestrator with human interrupts
226
+ # ---------------------------------------------------------------------------
227
+
228
+ class HITLOrchestrator:
229
+ """
230
+ Human-in-the-Loop Orchestrator.
231
+
232
+ Wraps the base Orchestrator with interrupt points where humans can:
233
+ - Approve/reject actions before execution
234
+ - Override Φ scores (teaches the Purpose Function)
235
+ - Inject heuristics directly into memory
236
+ - Checkpoint and resume later
237
+
238
+ The Φ override feature is unique to Purpose Agent: when a human corrects
239
+ a score, that correction flows through experience replay into the heuristic
240
+ library. The agent permanently learns the human's preference.
241
+
242
+ Usage:
243
+ hitl = HITLOrchestrator(
244
+ orchestrator=orch,
245
+ input_handler=CLIInputHandler(),
246
+ approve_actions=True, # Require approval for each action
247
+ review_scores=True, # Let human override Φ scores
248
+ checkpoint_dir="./checkpoints",
249
+ )
250
+ result = hitl.run_task(purpose="Do something important")
251
+ """
252
+
253
+ def __init__(
254
+ self,
255
+ orchestrator: Orchestrator,
256
+ input_handler: HumanInputHandler | None = None,
257
+ approve_actions: bool = False,
258
+ review_scores: bool = False,
259
+ checkpoint_dir: str | None = None,
260
+ interrupt_every_n_steps: int = 0, # 0 = only on explicit triggers
261
+ ):
262
+ self.orch = orchestrator
263
+ self.handler = input_handler or AutoApproveHandler()
264
+ self.approve_actions = approve_actions
265
+ self.review_scores = review_scores
266
+ self.checkpoint_dir = checkpoint_dir
267
+ self.interrupt_every_n = interrupt_every_n_steps
268
+ self._checkpoints: list[Checkpoint] = []
269
+
270
+ def run_task(
271
+ self,
272
+ purpose: str,
273
+ initial_state: State | None = None,
274
+ max_steps: int = 20,
275
+ early_stop_phi: float = 9.0,
276
+ resume_from: str | Checkpoint | None = None,
277
+ ) -> TaskResult:
278
+ """
279
+ Run a task with human-in-the-loop interrupts.
280
+
281
+ If resume_from is provided, resumes from that checkpoint.
282
+ """
283
+ # Resume from checkpoint if provided
284
+ start_step = 0
285
+ history = []
286
+ trajectory = Trajectory(task_description=purpose, purpose=purpose)
287
+
288
+ if resume_from:
289
+ cp = resume_from if isinstance(resume_from, Checkpoint) else Checkpoint.load(resume_from)
290
+ current_state = State(data=cp.current_state, summary=cp.state_summary)
291
+ history = cp.history
292
+ start_step = cp.step_index
293
+ logger.info(f"Resuming from checkpoint at step {start_step}")
294
+ else:
295
+ current_state = initial_state or self.orch.environment.reset()
296
+
297
+ self.orch.purpose_fn.reset_trajectory_stats()
298
+
299
+ for step_idx in range(start_step, max_steps):
300
+ # Periodic interrupt
301
+ if self.interrupt_every_n and step_idx > 0 and step_idx % self.interrupt_every_n == 0:
302
+ human = self.handler.request_input(
303
+ InterruptType.PAUSE,
304
+ {"step": step_idx, "state": current_state.describe()[:500]},
305
+ )
306
+ if not human.approved:
307
+ logger.info("Human aborted task")
308
+ break
309
+
310
+ # Actor decides
311
+ action = self.orch.actor.decide(purpose, current_state, history)
312
+
313
+ # Human approval gate
314
+ if self.approve_actions:
315
+ human = self.handler.request_input(
316
+ InterruptType.APPROVE_ACTION,
317
+ {
318
+ "action_name": action.name,
319
+ "action_params": action.params,
320
+ "thought": action.thought,
321
+ "expected_delta": action.expected_delta,
322
+ "step": step_idx + 1,
323
+ },
324
+ )
325
+
326
+ if not human.approved:
327
+ logger.info(f"Human rejected action '{action.name}', skipping step")
328
+ continue
329
+
330
+ if human.edited_action:
331
+ action = human.edited_action
332
+ logger.info(f"Human edited action to '{action.name}'")
333
+
334
+ if action.name.upper() == "DONE":
335
+ break
336
+
337
+ # Execute
338
+ try:
339
+ new_state = self.orch.environment.execute(action, current_state)
340
+ except Exception as e:
341
+ new_state = State(data={**current_state.data, "_error": str(e)})
342
+
343
+ # Purpose Function scores
344
+ score = self.orch.purpose_fn.evaluate(current_state, action, new_state, purpose)
345
+
346
+ # Human score review
347
+ if self.review_scores:
348
+ human = self.handler.request_input(
349
+ InterruptType.OVERRIDE_SCORE,
350
+ {
351
+ "phi_before": score.phi_before,
352
+ "phi_after": score.phi_after,
353
+ "delta": score.delta,
354
+ "evidence": score.evidence,
355
+ "confidence": score.confidence,
356
+ "state_before": current_state.describe()[:200],
357
+ "state_after": new_state.describe()[:200],
358
+ },
359
+ )
360
+
361
+ if human.override_score:
362
+ logger.info(
363
+ f"Human overrode score: Φ {score.phi_after:.1f} → {human.override_score.phi_after:.1f}"
364
+ )
365
+ score = human.override_score
366
+
367
+ # Record step
368
+ step = TrajectoryStep(
369
+ state_before=current_state, action=action, state_after=new_state,
370
+ score=score, step_index=step_idx + 1,
371
+ )
372
+ trajectory.steps.append(step)
373
+ history.append({
374
+ "action": f"{action.name}({json.dumps(action.params, default=str)})",
375
+ "result": new_state.describe()[:200],
376
+ "score": f"Δ={score.delta:+.2f}",
377
+ })
378
+
379
+ # Save checkpoint
380
+ if self.checkpoint_dir:
381
+ self._save_checkpoint(step_idx + 1, new_state, trajectory, history, purpose)
382
+
383
+ # Check termination
384
+ if score.phi_after >= early_stop_phi:
385
+ break
386
+ if self.orch.environment.is_terminal(new_state):
387
+ break
388
+
389
+ current_state = new_state
390
+
391
+ # Post-task
392
+ result = TaskResult(trajectory=trajectory, final_state=current_state)
393
+ self.orch._post_task(trajectory, [])
394
+ return result
395
+
396
+ def inject_heuristic(self, pattern: str, strategy: str, tier: str = "strategic") -> None:
397
+ """Directly inject a human-taught heuristic into the agent's memory."""
398
+ h = Heuristic(
399
+ pattern=pattern,
400
+ strategy=strategy,
401
+ steps=[],
402
+ tier=MemoryTier(tier),
403
+ q_value=1.0, # Human-taught = maximum confidence
404
+ )
405
+ self.orch.optimizer.heuristic_library.append(h)
406
+ self.orch._sync_memory()
407
+ logger.info(f"Injected heuristic: '{pattern}' → '{strategy}'")
408
+
409
+ def _save_checkpoint(
410
+ self, step: int, state: State, trajectory: Trajectory,
411
+ history: list, purpose: str,
412
+ ) -> Checkpoint:
413
+ cp = Checkpoint(
414
+ step_index=step,
415
+ current_state=state.data,
416
+ state_summary=state.describe()[:500],
417
+ trajectory_steps=[
418
+ {
419
+ "action": s.action.name,
420
+ "delta": s.score.delta if s.score else 0,
421
+ "phi_after": s.score.phi_after if s.score else 0,
422
+ }
423
+ for s in trajectory.steps
424
+ ],
425
+ purpose=purpose,
426
+ task_description=trajectory.task_description,
427
+ history=history,
428
+ heuristics=[
429
+ {"pattern": h.pattern, "strategy": h.strategy, "tier": h.tier.value}
430
+ for h in self.orch.optimizer.heuristic_library
431
+ ],
432
+ checkpoint_id=f"cp_{step}_{int(time.time())}",
433
+ )
434
+
435
+ if self.checkpoint_dir:
436
+ path = f"{self.checkpoint_dir}/{cp.checkpoint_id}.json"
437
+ cp.save(path)
438
+
439
+ self._checkpoints.append(cp)
440
+ return cp
441
+
442
+ def list_checkpoints(self) -> list[dict]:
443
+ """List all saved checkpoints."""
444
+ if not self.checkpoint_dir:
445
+ return [{"step": cp.step_index, "id": cp.checkpoint_id} for cp in self._checkpoints]
446
+
447
+ checkpoints = []
448
+ cp_dir = Path(self.checkpoint_dir)
449
+ if cp_dir.exists():
450
+ for f in sorted(cp_dir.glob("cp_*.json")):
451
+ try:
452
+ with open(f) as fh:
453
+ data = json.load(fh)
454
+ checkpoints.append({
455
+ "file": str(f),
456
+ "step": data.get("step_index"),
457
+ "id": data.get("checkpoint_id"),
458
+ "timestamp": data.get("timestamp"),
459
+ })
460
+ except Exception:
461
+ pass
462
+ return checkpoints