bit-vector-tensor-control-policy / scripts /propose_self_improvement.py
J94's picture
Initial Space upload
3436bdd verified
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import re
import subprocess
import sys
import tempfile
from pathlib import Path
from typing import Any
import yaml
ROOT = Path(__file__).resolve().parents[1]
DEFAULT_CONFIG = ROOT / "self_improve.yaml"
DEFAULT_SCHEMA = ROOT / "schemas" / "self_improve_proposal_v0.json"
DEFAULT_INFERENCE = ROOT / "inference.yaml"
DEFAULT_USER_GOVERNANCE = ROOT / "build" / "system" / "user_governance.json"
def load_yaml(path: Path) -> dict[str, Any]:
data = yaml.safe_load(path.read_text(encoding="utf-8"))
if not isinstance(data, dict):
raise ValueError(f"{path} did not decode to a mapping")
return data
def load_json(path: Path) -> dict[str, Any]:
data = json.loads(path.read_text(encoding="utf-8"))
if not isinstance(data, dict):
raise ValueError(f"{path} did not decode to an object")
return data
def allowed_path(path: str, roots: list[str]) -> bool:
for root in roots:
if root.endswith("/"):
if path.startswith(root):
return True
elif path == root:
return True
return False
def backend_timeout_seconds(backend: dict[str, Any]) -> float | None:
raw = backend.get("timeout_seconds")
if raw in (None, "", 0):
return None
timeout = float(raw)
if timeout <= 0:
raise ValueError("timeout_seconds must be positive when configured")
return timeout
def sanitize_manifest_id(goal: str) -> str:
slug = re.sub(r"[^a-z0-9]+", "-", goal.lower()).strip("-")
slug = slug[:48] or "self-improve"
return f"self-improve-{slug}"
def build_prompt(
*,
goal: str,
config: dict[str, Any],
system_context: dict[str, Any],
policy: dict[str, Any],
runtime_contract: dict[str, Any],
default_benchmark: str,
user_governance: dict[str, Any] | None,
) -> str:
compact_context = {
"current_position": {
"slice_id": system_context.get("current_position", {}).get("slice_id"),
"default_profile": system_context.get("current_position", {}).get("default_profile"),
"role": system_context.get("current_position", {}).get("role"),
},
"latest_runtime_state": system_context.get("latest_runtime_state", {}),
"agent_bootstrap": {
"trust_order": system_context.get("agent_bootstrap", {}).get("trust_order", []),
"first_move": system_context.get("agent_bootstrap", {}).get("first_move"),
},
}
compact_policy = {
"bits": policy.get("bits", []),
"vectors": policy.get("vectors", []),
"invariants": policy.get("invariants", []),
}
compact_runtime = {
"one_liner": runtime_contract.get("one_liner"),
"contract": runtime_contract.get("contract"),
"acceptance_bar": runtime_contract.get("acceptance_bar", []),
}
compact_governance = None
if user_governance:
compact_governance = {
"governing_rules": user_governance.get("governing_rules", []),
"motif_rule": user_governance.get("motif_rule", ""),
"next_moves": user_governance.get("next_moves", []),
"operator_next_tasks": user_governance.get("operator_next_tasks", []),
}
return "\n".join(
[
"You are proposing one bounded self-improvement for the bit_vector_tensor_control_policy repo.",
"Produce only JSON matching the schema.",
"The proposal must be small, concrete, and safe to execute through the local runtime.",
f"Maximum touched files: {config['max_files']}.",
f"Allowed roots: {', '.join(config['allowed_roots'])}.",
"Only use manifest actions of type `write_file`.",
"Do not propose shell actions.",
"Return full replacement content for every touched file.",
"Prefer docs, configs, and thin runtime/policy glue over large rewrites.",
f"Use this benchmark command unless a narrower benchmark is clearly better: {default_benchmark}.",
"The change should improve the product shell itself, not produce an external research artifact.",
"Prefer the highest-ranked partial or requested next move from user governance when it can be advanced in one bounded change.",
"",
"System context:",
json.dumps(compact_context, ensure_ascii=True, separators=(",", ":")),
"",
"Policy context:",
json.dumps(compact_policy, ensure_ascii=True, separators=(",", ":")),
"",
"Runtime contract:",
json.dumps(compact_runtime, ensure_ascii=True, separators=(",", ":")),
"",
"User governance:",
json.dumps(compact_governance or {}, ensure_ascii=True, separators=(",", ":")),
"",
f"Improvement goal: {goal}",
]
)
def validate_proposal(proposal: dict[str, Any], config: dict[str, Any]) -> None:
roots = config["allowed_roots"]
target_files = proposal.get("target_files", [])
if not target_files:
raise ValueError("proposal did not include target_files")
if len(target_files) > int(config["max_files"]):
raise ValueError("proposal exceeded max_files")
for path in target_files:
if not allowed_path(path, roots):
raise ValueError(f"target file outside allowed roots: {path}")
manifest = proposal.get("manifest", {})
actions = manifest.get("actions", [])
if len(actions) == 0:
raise ValueError("proposal manifest had no actions")
if len(actions) > int(config["max_files"]):
raise ValueError("proposal manifest exceeded max_files")
action_paths = []
for action in actions:
if action.get("type") != "write_file":
raise ValueError("proposal manifest included unsupported action type")
path = action.get("path", "")
if not allowed_path(path, roots):
raise ValueError(f"manifest path outside allowed roots: {path}")
action_paths.append(path)
if sorted(target_files) != sorted(action_paths):
raise ValueError("target_files and manifest action paths diverged")
def run_codex_proposal(
*,
goal: str,
config_path: Path,
system_context_path: Path,
output_path: Path,
schema_path: Path,
) -> dict[str, Any]:
config = load_yaml(config_path)
inference = load_yaml(DEFAULT_INFERENCE)
backend_id = inference["default_backend"]
backend = dict(inference["backends"][backend_id])
proposal_model = config.get("proposal_model")
if proposal_model:
backend["model"] = proposal_model
system_context = load_json(system_context_path)
policy = load_json(ROOT / "policy" / "control_language_v0.json")
runtime_contract = load_json(ROOT / "runtime" / "work_manifest_v0.json")
user_governance = load_json(DEFAULT_USER_GOVERNANCE) if DEFAULT_USER_GOVERNANCE.exists() else None
default_benchmark = config["default_benchmark"]["command"]
prompt = build_prompt(
goal=goal,
config=config,
system_context=system_context,
policy=policy,
runtime_contract=runtime_contract,
default_benchmark=default_benchmark,
user_governance=user_governance,
)
output_path.parent.mkdir(parents=True, exist_ok=True)
with tempfile.NamedTemporaryFile("w", suffix=".json", delete=False, encoding="utf-8") as temp_schema:
temp_schema.write(schema_path.read_text(encoding="utf-8"))
temp_schema_path = Path(temp_schema.name)
command = [backend.get("command", "codex"), "exec"]
if backend.get("model"):
command.extend(["-m", str(backend["model"])])
if backend.get("sandbox"):
command.extend(["-s", str(backend["sandbox"])])
if backend.get("ephemeral", False):
command.append("--ephemeral")
if backend.get("skip_git_repo_check", False):
command.append("--skip-git-repo-check")
command.extend(
[
"-C",
str(ROOT),
"--output-schema",
str(temp_schema_path),
"-o",
str(output_path),
"-",
]
)
timeout_seconds = config.get("proposal_timeout_seconds")
if timeout_seconds in (None, "", 0):
timeout = backend_timeout_seconds(backend)
else:
timeout = float(timeout_seconds)
if timeout <= 0:
raise ValueError("proposal_timeout_seconds must be positive when configured")
try:
completed = subprocess.run(
command,
input=prompt,
text=True,
capture_output=True,
cwd=ROOT,
check=False,
timeout=timeout,
)
except subprocess.TimeoutExpired as exc:
raise RuntimeError(f"codex exec timed out after {exc.timeout} seconds") from exc
finally:
temp_schema_path.unlink(missing_ok=True)
if completed.returncode != 0:
raise RuntimeError(completed.stderr.strip() or "codex exec failed")
proposal = load_json(output_path)
proposal["manifest"]["manifest_id"] = sanitize_manifest_id(goal)
proposal["manifest"]["goal"] = proposal.get("goal", goal)
if not proposal.get("benchmark", {}).get("command"):
proposal["benchmark"] = {"command": default_benchmark}
validate_proposal(proposal, config)
output_path.write_text(json.dumps(proposal, indent=2, sort_keys=True) + "\n", encoding="utf-8")
return proposal
def main() -> int:
parser = argparse.ArgumentParser(description="Use Codex CLI to propose one bounded self-improvement manifest.")
parser.add_argument("--goal", required=True)
parser.add_argument("--config", default=str(DEFAULT_CONFIG))
parser.add_argument("--schema", default=str(DEFAULT_SCHEMA))
parser.add_argument("--system-context", required=True)
parser.add_argument("--output", required=True)
args = parser.parse_args()
proposal = run_codex_proposal(
goal=args.goal,
config_path=Path(args.config),
system_context_path=Path(args.system_context),
output_path=Path(args.output),
schema_path=Path(args.schema),
)
json.dump(proposal, sys.stdout, indent=2)
sys.stdout.write("\n")
return 0
if __name__ == "__main__":
raise SystemExit(main())