sonicoder / code /policy /__init__.py
R-Kentaren's picture
fix: consolidate to code/ only, fix bugs, add missing UI options
e3e7994 verified
Raw
History Blame Contribute Delete
8.85 kB
"""Policy engine β€” rule-based tool approval system.
Inspired by Gemini CLI's PolicyEngine with priority-sorted rules
and pattern matching. Also incorporates Claude Code's allow/ask/deny model.
"""
from __future__ import annotations
import re
from dataclasses import dataclass
from enum import Enum
from typing import Any
class ApprovalDecision(Enum):
ALLOW = "allow"
ASK = "ask"
DENY = "deny"
@dataclass
class PolicyCheckResult:
"""Result of a policy check."""
decision: ApprovalDecision
reason: str = ""
matched_rule: str | None = None
class PolicyEngine:
"""Rule-based policy engine with priority ordering.
Rules are evaluated in priority order (highest first).
First matching rule wins.
Default: ASK for interactive, DENY for non-interactive.
"""
# Built-in safety rules (always evaluated, highest priority)
BUILTIN_RULES = [
{
"tool_name": "bash",
"args_pattern": r"rm\s+-rf\s+(/|~|\*)",
"approval": "deny",
"priority": 1000,
"description": "Block recursive delete of root/home",
},
{
"tool_name": "bash",
"args_pattern": r"(shutdown|reboot|halt|poweroff)",
"approval": "deny",
"priority": 1000,
"description": "Block system shutdown commands",
},
{
"tool_name": "bash",
"args_pattern": r"(mkfs|fdisk|parted|dd\s+if=/dev/zero)",
"approval": "deny",
"priority": 1000,
"description": "Block disk destruction commands",
},
{
"tool_name": "bash",
"args_pattern": r"(chmod\s+-R\s+777|chown\s+-R)",
"approval": "deny",
"priority": 900,
"description": "Block dangerous permission changes",
},
{
"tool_name": "write_file",
"args_pattern": None,
"approval": "allow",
"priority": -100,
"description": "Allow file writes in sandboxed workspace",
},
{
"tool_name": "edit_file",
"args_pattern": None,
"approval": "allow",
"priority": -100,
"description": "Allow file edits in sandboxed workspace",
},
{
"tool_name": "multi_edit",
"args_pattern": None,
"approval": "allow",
"priority": -100,
"description": "Allow multi-file edits in sandboxed workspace",
},
{
"tool_name": "read_file",
"args_pattern": None,
"approval": "allow",
"priority": -200,
"description": "Allow file reads",
},
{
"tool_name": "glob",
"args_pattern": None,
"approval": "allow",
"priority": -200,
"description": "Allow glob searches",
},
{
"tool_name": "grep",
"args_pattern": None,
"approval": "allow",
"priority": -200,
"description": "Allow grep searches",
},
{
"tool_name": "list_dir",
"args_pattern": None,
"approval": "allow",
"priority": -200,
"description": "Allow directory listing",
},
{
"tool_name": "web_search",
"args_pattern": None,
"approval": "allow",
"priority": -200,
"description": "Allow web search",
},
{
"tool_name": "web_fetch",
"args_pattern": None,
"approval": "allow",
"priority": -200,
"description": "Allow web fetch",
},
{
"tool_name": "todo_read",
"args_pattern": None,
"approval": "allow",
"priority": -200,
"description": "Allow todo reads",
},
{
"tool_name": "todo_write",
"args_pattern": None,
"approval": "allow",
"priority": -200,
"description": "Allow todo writes",
},
{
"tool_name": "todo_update",
"args_pattern": None,
"approval": "allow",
"priority": -200,
"description": "Allow todo updates",
},
{
"tool_name": "snapshot_workspace",
"args_pattern": None,
"approval": "allow",
"priority": -200,
"description": "Allow workspace snapshot",
},
]
def __init__(self, interactive: bool = True):
self._rules: list[dict] = list(self.BUILTIN_RULES)
self._interactive = interactive
self._approval_mode: str = "default" # default | auto_edit | yolo
@property
def approval_mode(self) -> str:
return self._approval_mode
@approval_mode.setter
def approval_mode(self, mode: str) -> None:
if mode in ("default", "auto_edit", "yolo"):
self._approval_mode = mode
def add_rule(self, rule: dict) -> None:
"""Add a custom policy rule."""
rule.setdefault("priority", 0)
rule.setdefault("approval", "ask")
self._rules.append(rule)
self._rules.sort(key=lambda r: r["priority"], reverse=True)
def remove_rule(self, description: str) -> bool:
"""Remove a rule by description."""
for i, r in enumerate(self._rules):
if r.get("description") == description:
self._rules.pop(i)
return True
return False
def set_rules(self, rules: list[dict]) -> None:
"""Replace all custom rules (keeps built-in rules)."""
self._rules = list(self.BUILTIN_RULES)
for r in rules:
self.add_rule(r)
def check(
self, tool_name: str, args: dict[str, Any] | None = None
) -> PolicyCheckResult:
"""Check if a tool call is allowed.
Returns the decision and reason from the highest-priority matching rule.
"""
args_str = str(args) if args else ""
for rule in self._rules:
# Check tool name match (supports wildcards)
if not self._match_tool_name(tool_name, rule["tool_name"]):
continue
# Check args pattern if specified
if rule.get("args_pattern"):
try:
if not re.search(rule["args_pattern"], args_str, re.IGNORECASE):
continue
except re.error:
continue
# Match found β€” return this rule's decision
approval = rule["approval"]
# YOLO mode: upgrade DENY to ASK (except built-in high-priority)
if self._approval_mode == "yolo" and rule["priority"] < 900:
if approval == "deny":
approval = "ask"
# Auto-edit mode: auto-approve file writes/edits
if (
self._approval_mode == "auto_edit"
and tool_name in ("write_file", "edit_file", "multi_edit")
):
approval = "allow"
return PolicyCheckResult(
decision=ApprovalDecision(approval),
reason=rule.get(
"description", f"Rule matched: {rule['tool_name']}"
),
matched_rule=rule.get("description"),
)
# Default: ASK for interactive, DENY for non-interactive
default = ApprovalDecision.ASK if self._interactive else ApprovalDecision.DENY
return PolicyCheckResult(
decision=default,
reason=f"No matching rule for {tool_name} (default: {default.value})",
)
def _match_tool_name(self, name: str, pattern: str) -> bool:
"""Match tool name against pattern (supports * wildcards)."""
if pattern == "*":
return True
if "*" in pattern:
# Convert glob to regex
regex = pattern.replace("*", ".*")
return bool(re.match(f"^{regex}$", name))
return name == pattern
def get_rules(self) -> list[dict]:
"""Get all rules for display."""
return list(self._rules)
# ── Global Engine ───────────────────────────────────────────────────────
_global_engine: PolicyEngine | None = None
def get_policy_engine() -> PolicyEngine:
"""Get or create the global policy engine."""
global _global_engine
if _global_engine is None:
_global_engine = PolicyEngine()
return _global_engine
__all__ = [
"ApprovalDecision",
"PolicyCheckResult",
"PolicyEngine",
"get_policy_engine",
]