"""Policy engine — rule-based tool approval system. Inspired by Gemini CLI's PolicyEngine with priority-sorted rules and pattern matching. Also incorporates Claude Code's allow/ask/deny model. """ from __future__ import annotations import re from dataclasses import dataclass from enum import Enum from typing import Any class ApprovalDecision(Enum): ALLOW = "allow" ASK = "ask" DENY = "deny" @dataclass class PolicyCheckResult: """Result of a policy check.""" decision: ApprovalDecision reason: str = "" matched_rule: str | None = None class PolicyEngine: """Rule-based policy engine with priority ordering. Rules are evaluated in priority order (highest first). First matching rule wins. Default: ASK for interactive, DENY for non-interactive. """ # Built-in safety rules (always evaluated, highest priority) BUILTIN_RULES = [ { "tool_name": "bash", "args_pattern": r"rm\s+-rf\s+(/|~|\*)", "approval": "deny", "priority": 1000, "description": "Block recursive delete of root/home", }, { "tool_name": "bash", "args_pattern": r"(shutdown|reboot|halt|poweroff)", "approval": "deny", "priority": 1000, "description": "Block system shutdown commands", }, { "tool_name": "bash", "args_pattern": r"(mkfs|fdisk|parted|dd\s+if=/dev/zero)", "approval": "deny", "priority": 1000, "description": "Block disk destruction commands", }, { "tool_name": "bash", "args_pattern": r"(chmod\s+-R\s+777|chown\s+-R)", "approval": "deny", "priority": 900, "description": "Block dangerous permission changes", }, { "tool_name": "write_file", "args_pattern": None, "approval": "allow", "priority": -100, "description": "Allow file writes in sandboxed workspace", }, { "tool_name": "edit_file", "args_pattern": None, "approval": "allow", "priority": -100, "description": "Allow file edits in sandboxed workspace", }, { "tool_name": "multi_edit", "args_pattern": None, "approval": "allow", "priority": -100, "description": "Allow multi-file edits in sandboxed workspace", }, { "tool_name": "read_file", "args_pattern": None, "approval": "allow", "priority": -200, "description": "Allow file reads", }, { "tool_name": "glob", "args_pattern": None, "approval": "allow", "priority": -200, "description": "Allow glob searches", }, { "tool_name": "grep", "args_pattern": None, "approval": "allow", "priority": -200, "description": "Allow grep searches", }, { "tool_name": "list_dir", "args_pattern": None, "approval": "allow", "priority": -200, "description": "Allow directory listing", }, { "tool_name": "web_search", "args_pattern": None, "approval": "allow", "priority": -200, "description": "Allow web search", }, { "tool_name": "web_fetch", "args_pattern": None, "approval": "allow", "priority": -200, "description": "Allow web fetch", }, { "tool_name": "todo_read", "args_pattern": None, "approval": "allow", "priority": -200, "description": "Allow todo reads", }, { "tool_name": "todo_write", "args_pattern": None, "approval": "allow", "priority": -200, "description": "Allow todo writes", }, { "tool_name": "todo_update", "args_pattern": None, "approval": "allow", "priority": -200, "description": "Allow todo updates", }, { "tool_name": "snapshot_workspace", "args_pattern": None, "approval": "allow", "priority": -200, "description": "Allow workspace snapshot", }, ] def __init__(self, interactive: bool = True): self._rules: list[dict] = list(self.BUILTIN_RULES) self._interactive = interactive self._approval_mode: str = "default" # default | auto_edit | yolo @property def approval_mode(self) -> str: return self._approval_mode @approval_mode.setter def approval_mode(self, mode: str) -> None: if mode in ("default", "auto_edit", "yolo"): self._approval_mode = mode def add_rule(self, rule: dict) -> None: """Add a custom policy rule.""" rule.setdefault("priority", 0) rule.setdefault("approval", "ask") self._rules.append(rule) self._rules.sort(key=lambda r: r["priority"], reverse=True) def remove_rule(self, description: str) -> bool: """Remove a rule by description.""" for i, r in enumerate(self._rules): if r.get("description") == description: self._rules.pop(i) return True return False def set_rules(self, rules: list[dict]) -> None: """Replace all custom rules (keeps built-in rules).""" self._rules = list(self.BUILTIN_RULES) for r in rules: self.add_rule(r) def check( self, tool_name: str, args: dict[str, Any] | None = None ) -> PolicyCheckResult: """Check if a tool call is allowed. Returns the decision and reason from the highest-priority matching rule. """ args_str = str(args) if args else "" for rule in self._rules: # Check tool name match (supports wildcards) if not self._match_tool_name(tool_name, rule["tool_name"]): continue # Check args pattern if specified if rule.get("args_pattern"): try: if not re.search(rule["args_pattern"], args_str, re.IGNORECASE): continue except re.error: continue # Match found — return this rule's decision approval = rule["approval"] # YOLO mode: upgrade DENY to ASK (except built-in high-priority) if self._approval_mode == "yolo" and rule["priority"] < 900: if approval == "deny": approval = "ask" # Auto-edit mode: auto-approve file writes/edits if ( self._approval_mode == "auto_edit" and tool_name in ("write_file", "edit_file", "multi_edit") ): approval = "allow" return PolicyCheckResult( decision=ApprovalDecision(approval), reason=rule.get( "description", f"Rule matched: {rule['tool_name']}" ), matched_rule=rule.get("description"), ) # Default: ASK for interactive, DENY for non-interactive default = ApprovalDecision.ASK if self._interactive else ApprovalDecision.DENY return PolicyCheckResult( decision=default, reason=f"No matching rule for {tool_name} (default: {default.value})", ) def _match_tool_name(self, name: str, pattern: str) -> bool: """Match tool name against pattern (supports * wildcards).""" if pattern == "*": return True if "*" in pattern: # Convert glob to regex regex = pattern.replace("*", ".*") return bool(re.match(f"^{regex}$", name)) return name == pattern def get_rules(self) -> list[dict]: """Get all rules for display.""" return list(self._rules) # ── Global Engine ─────────────────────────────────────────────────────── _global_engine: PolicyEngine | None = None def get_policy_engine() -> PolicyEngine: """Get or create the global policy engine.""" global _global_engine if _global_engine is None: _global_engine = PolicyEngine() return _global_engine __all__ = [ "ApprovalDecision", "PolicyCheckResult", "PolicyEngine", "get_policy_engine", ]