Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import re | |
| import subprocess | |
| import sys | |
| import tempfile | |
| from pathlib import Path | |
| from typing import Any | |
| import yaml | |
| ROOT = Path(__file__).resolve().parents[1] | |
| DEFAULT_CONFIG = ROOT / "self_improve.yaml" | |
| DEFAULT_SCHEMA = ROOT / "schemas" / "self_improve_proposal_v0.json" | |
| DEFAULT_INFERENCE = ROOT / "inference.yaml" | |
| DEFAULT_USER_GOVERNANCE = ROOT / "build" / "system" / "user_governance.json" | |
| def load_yaml(path: Path) -> dict[str, Any]: | |
| data = yaml.safe_load(path.read_text(encoding="utf-8")) | |
| if not isinstance(data, dict): | |
| raise ValueError(f"{path} did not decode to a mapping") | |
| return data | |
| def load_json(path: Path) -> dict[str, Any]: | |
| data = json.loads(path.read_text(encoding="utf-8")) | |
| if not isinstance(data, dict): | |
| raise ValueError(f"{path} did not decode to an object") | |
| return data | |
| def allowed_path(path: str, roots: list[str]) -> bool: | |
| for root in roots: | |
| if root.endswith("/"): | |
| if path.startswith(root): | |
| return True | |
| elif path == root: | |
| return True | |
| return False | |
| def backend_timeout_seconds(backend: dict[str, Any]) -> float | None: | |
| raw = backend.get("timeout_seconds") | |
| if raw in (None, "", 0): | |
| return None | |
| timeout = float(raw) | |
| if timeout <= 0: | |
| raise ValueError("timeout_seconds must be positive when configured") | |
| return timeout | |
| def sanitize_manifest_id(goal: str) -> str: | |
| slug = re.sub(r"[^a-z0-9]+", "-", goal.lower()).strip("-") | |
| slug = slug[:48] or "self-improve" | |
| return f"self-improve-{slug}" | |
| def build_prompt( | |
| *, | |
| goal: str, | |
| config: dict[str, Any], | |
| system_context: dict[str, Any], | |
| policy: dict[str, Any], | |
| runtime_contract: dict[str, Any], | |
| default_benchmark: str, | |
| user_governance: dict[str, Any] | None, | |
| ) -> str: | |
| compact_context = { | |
| "current_position": { | |
| "slice_id": system_context.get("current_position", {}).get("slice_id"), | |
| "default_profile": system_context.get("current_position", {}).get("default_profile"), | |
| "role": system_context.get("current_position", {}).get("role"), | |
| }, | |
| "latest_runtime_state": system_context.get("latest_runtime_state", {}), | |
| "agent_bootstrap": { | |
| "trust_order": system_context.get("agent_bootstrap", {}).get("trust_order", []), | |
| "first_move": system_context.get("agent_bootstrap", {}).get("first_move"), | |
| }, | |
| } | |
| compact_policy = { | |
| "bits": policy.get("bits", []), | |
| "vectors": policy.get("vectors", []), | |
| "invariants": policy.get("invariants", []), | |
| } | |
| compact_runtime = { | |
| "one_liner": runtime_contract.get("one_liner"), | |
| "contract": runtime_contract.get("contract"), | |
| "acceptance_bar": runtime_contract.get("acceptance_bar", []), | |
| } | |
| compact_governance = None | |
| if user_governance: | |
| compact_governance = { | |
| "governing_rules": user_governance.get("governing_rules", []), | |
| "motif_rule": user_governance.get("motif_rule", ""), | |
| "next_moves": user_governance.get("next_moves", []), | |
| "operator_next_tasks": user_governance.get("operator_next_tasks", []), | |
| } | |
| return "\n".join( | |
| [ | |
| "You are proposing one bounded self-improvement for the bit_vector_tensor_control_policy repo.", | |
| "Produce only JSON matching the schema.", | |
| "The proposal must be small, concrete, and safe to execute through the local runtime.", | |
| f"Maximum touched files: {config['max_files']}.", | |
| f"Allowed roots: {', '.join(config['allowed_roots'])}.", | |
| "Only use manifest actions of type `write_file`.", | |
| "Do not propose shell actions.", | |
| "Return full replacement content for every touched file.", | |
| "Prefer docs, configs, and thin runtime/policy glue over large rewrites.", | |
| f"Use this benchmark command unless a narrower benchmark is clearly better: {default_benchmark}.", | |
| "The change should improve the product shell itself, not produce an external research artifact.", | |
| "Prefer the highest-ranked partial or requested next move from user governance when it can be advanced in one bounded change.", | |
| "", | |
| "System context:", | |
| json.dumps(compact_context, ensure_ascii=True, separators=(",", ":")), | |
| "", | |
| "Policy context:", | |
| json.dumps(compact_policy, ensure_ascii=True, separators=(",", ":")), | |
| "", | |
| "Runtime contract:", | |
| json.dumps(compact_runtime, ensure_ascii=True, separators=(",", ":")), | |
| "", | |
| "User governance:", | |
| json.dumps(compact_governance or {}, ensure_ascii=True, separators=(",", ":")), | |
| "", | |
| f"Improvement goal: {goal}", | |
| ] | |
| ) | |
| def validate_proposal(proposal: dict[str, Any], config: dict[str, Any]) -> None: | |
| roots = config["allowed_roots"] | |
| target_files = proposal.get("target_files", []) | |
| if not target_files: | |
| raise ValueError("proposal did not include target_files") | |
| if len(target_files) > int(config["max_files"]): | |
| raise ValueError("proposal exceeded max_files") | |
| for path in target_files: | |
| if not allowed_path(path, roots): | |
| raise ValueError(f"target file outside allowed roots: {path}") | |
| manifest = proposal.get("manifest", {}) | |
| actions = manifest.get("actions", []) | |
| if len(actions) == 0: | |
| raise ValueError("proposal manifest had no actions") | |
| if len(actions) > int(config["max_files"]): | |
| raise ValueError("proposal manifest exceeded max_files") | |
| action_paths = [] | |
| for action in actions: | |
| if action.get("type") != "write_file": | |
| raise ValueError("proposal manifest included unsupported action type") | |
| path = action.get("path", "") | |
| if not allowed_path(path, roots): | |
| raise ValueError(f"manifest path outside allowed roots: {path}") | |
| action_paths.append(path) | |
| if sorted(target_files) != sorted(action_paths): | |
| raise ValueError("target_files and manifest action paths diverged") | |
| def run_codex_proposal( | |
| *, | |
| goal: str, | |
| config_path: Path, | |
| system_context_path: Path, | |
| output_path: Path, | |
| schema_path: Path, | |
| ) -> dict[str, Any]: | |
| config = load_yaml(config_path) | |
| inference = load_yaml(DEFAULT_INFERENCE) | |
| backend_id = inference["default_backend"] | |
| backend = dict(inference["backends"][backend_id]) | |
| proposal_model = config.get("proposal_model") | |
| if proposal_model: | |
| backend["model"] = proposal_model | |
| system_context = load_json(system_context_path) | |
| policy = load_json(ROOT / "policy" / "control_language_v0.json") | |
| runtime_contract = load_json(ROOT / "runtime" / "work_manifest_v0.json") | |
| user_governance = load_json(DEFAULT_USER_GOVERNANCE) if DEFAULT_USER_GOVERNANCE.exists() else None | |
| default_benchmark = config["default_benchmark"]["command"] | |
| prompt = build_prompt( | |
| goal=goal, | |
| config=config, | |
| system_context=system_context, | |
| policy=policy, | |
| runtime_contract=runtime_contract, | |
| default_benchmark=default_benchmark, | |
| user_governance=user_governance, | |
| ) | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| with tempfile.NamedTemporaryFile("w", suffix=".json", delete=False, encoding="utf-8") as temp_schema: | |
| temp_schema.write(schema_path.read_text(encoding="utf-8")) | |
| temp_schema_path = Path(temp_schema.name) | |
| command = [backend.get("command", "codex"), "exec"] | |
| if backend.get("model"): | |
| command.extend(["-m", str(backend["model"])]) | |
| if backend.get("sandbox"): | |
| command.extend(["-s", str(backend["sandbox"])]) | |
| if backend.get("ephemeral", False): | |
| command.append("--ephemeral") | |
| if backend.get("skip_git_repo_check", False): | |
| command.append("--skip-git-repo-check") | |
| command.extend( | |
| [ | |
| "-C", | |
| str(ROOT), | |
| "--output-schema", | |
| str(temp_schema_path), | |
| "-o", | |
| str(output_path), | |
| "-", | |
| ] | |
| ) | |
| timeout_seconds = config.get("proposal_timeout_seconds") | |
| if timeout_seconds in (None, "", 0): | |
| timeout = backend_timeout_seconds(backend) | |
| else: | |
| timeout = float(timeout_seconds) | |
| if timeout <= 0: | |
| raise ValueError("proposal_timeout_seconds must be positive when configured") | |
| try: | |
| completed = subprocess.run( | |
| command, | |
| input=prompt, | |
| text=True, | |
| capture_output=True, | |
| cwd=ROOT, | |
| check=False, | |
| timeout=timeout, | |
| ) | |
| except subprocess.TimeoutExpired as exc: | |
| raise RuntimeError(f"codex exec timed out after {exc.timeout} seconds") from exc | |
| finally: | |
| temp_schema_path.unlink(missing_ok=True) | |
| if completed.returncode != 0: | |
| raise RuntimeError(completed.stderr.strip() or "codex exec failed") | |
| proposal = load_json(output_path) | |
| proposal["manifest"]["manifest_id"] = sanitize_manifest_id(goal) | |
| proposal["manifest"]["goal"] = proposal.get("goal", goal) | |
| if not proposal.get("benchmark", {}).get("command"): | |
| proposal["benchmark"] = {"command": default_benchmark} | |
| validate_proposal(proposal, config) | |
| output_path.write_text(json.dumps(proposal, indent=2, sort_keys=True) + "\n", encoding="utf-8") | |
| return proposal | |
| def main() -> int: | |
| parser = argparse.ArgumentParser(description="Use Codex CLI to propose one bounded self-improvement manifest.") | |
| parser.add_argument("--goal", required=True) | |
| parser.add_argument("--config", default=str(DEFAULT_CONFIG)) | |
| parser.add_argument("--schema", default=str(DEFAULT_SCHEMA)) | |
| parser.add_argument("--system-context", required=True) | |
| parser.add_argument("--output", required=True) | |
| args = parser.parse_args() | |
| proposal = run_codex_proposal( | |
| goal=args.goal, | |
| config_path=Path(args.config), | |
| system_context_path=Path(args.system_context), | |
| output_path=Path(args.output), | |
| schema_path=Path(args.schema), | |
| ) | |
| json.dump(proposal, sys.stdout, indent=2) | |
| sys.stdout.write("\n") | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |