""" Command Safety Layer — Whitelist/blocklist enforcement for sandbox commands. Validates commands before execution to prevent destructive operations. """ from __future__ import annotations import re import shlex from dataclasses import dataclass from typing import List, Tuple # Commands that are allowed to execute in the sandbox COMMAND_WHITELIST: List[str] = [ "pip", "pip3", "python", "python3", "apt-get", "npm", "kill", "pkill", "export", "source", "unset", "systemctl", "flask", "uvicorn", "cat", "ls", "echo", "mkdir", "rm", "cp", "mv", "sed", "grep", "awk", "head", "tail", "wc", "ps", "lsof", "curl", "wget", "chmod", "chown", "touch", "tee", "bash", "sh", "cd", "pwd", "which", "env", "printenv", "true", "false", "test", "xargs", ] # Patterns that are absolutely forbidden (destructive commands) BLOCKLIST_PATTERNS: List[str] = [ r"rm\s+-rf\s+/\s*$", # rm -rf / r"rm\s+-rf\s+/\*", # rm -rf /* r"rm\s+--no-preserve-root", # rm --no-preserve-root r":\(\)\s*\{\s*:\|:\s*&\s*\}\s*;\s*:", # fork bomb r"dd\s+if=", # dd (disk destroyer) r"mkfs\.", # mkfs (format disk) r"chmod\s+777\s+/\s*$", # chmod 777 / r"chmod\s+-R\s+777\s+/", # chmod -R 777 / r">\s*/dev/sda", # write to raw disk r"mv\s+/\s+", # mv / somewhere r"wget.*\|\s*sh", # pipe download to shell r"curl.*\|\s*sh", # pipe download to shell r"curl.*\|\s*bash", # pipe download to bash r"(?:^|&&|\|\||;)\s*(?:/sbin/)?shutdown\b", # shutdown invocation r"(?:^|&&|\|\||;)\s*(?:/sbin/)?reboot\b", # reboot invocation r"(?:^|&&|\|\||;)\s*(?:/sbin/)?init\s+0\b", # init 0 halt invocation r"(?:^|&&|\|\||;)\s*(?:/sbin/)?halt\b", # halt invocation ] # Patterns involving sudo + destructive operations SUDO_DANGEROUS_PATTERNS: List[str] = [ r"sudo\s+rm", r"sudo\s+dd", r"sudo\s+mkfs", r"sudo\s+chmod\s+777", r"sudo\s+shutdown", r"sudo\s+reboot", r"sudo\s+halt", r"sudo\s+init", ] @dataclass class SafetyCheckResult: """Result of a command safety check. Attributes: is_safe: Whether the command passed safety checks. is_whitelisted: Whether the base command is in the whitelist. is_blocked: Whether the command matches a blocklist pattern. reason: Human-readable reason if the command was rejected. matched_pattern: The blocklist pattern that matched, if any. """ is_safe: bool is_whitelisted: bool is_blocked: bool reason: str = "" matched_pattern: str = "" class CommandSafetyChecker: """Validates commands against whitelist and blocklist rules. Usage: checker = CommandSafetyChecker() result = checker.check("pip install flask") if result.is_safe: # execute command """ def __init__( self, extra_whitelist: List[str] | None = None, extra_blocklist: List[str] | None = None, ) -> None: """Initialize the safety checker. Args: extra_whitelist: Additional commands to allow. extra_blocklist: Additional regex patterns to block. """ self.whitelist = set(COMMAND_WHITELIST) if extra_whitelist: self.whitelist.update(extra_whitelist) self.blocklist = list(BLOCKLIST_PATTERNS) if extra_blocklist: self.blocklist.extend(extra_blocklist) self.sudo_patterns = list(SUDO_DANGEROUS_PATTERNS) def check(self, command: str) -> SafetyCheckResult: """Check if a command is safe to execute. Args: command: The shell command string to validate. Returns: SafetyCheckResult with safety determination and reason. """ command = command.strip() if not command: return SafetyCheckResult( is_safe=False, is_whitelisted=False, is_blocked=False, reason="Empty command", ) # Check blocklist first (highest priority) blocked, pattern = self._check_blocklist(command) if blocked: return SafetyCheckResult( is_safe=False, is_whitelisted=False, is_blocked=True, reason=f"Command matches dangerous pattern: {pattern}", matched_pattern=pattern, ) # Check sudo + destructive combos sudo_blocked, sudo_pattern = self._check_sudo_dangerous(command) if sudo_blocked: return SafetyCheckResult( is_safe=False, is_whitelisted=False, is_blocked=True, reason=f"Dangerous sudo command: {sudo_pattern}", matched_pattern=sudo_pattern, ) # Check whitelist base_cmd = self._extract_base_command(command) is_whitelisted = base_cmd in self.whitelist if not is_whitelisted: return SafetyCheckResult( is_safe=False, is_whitelisted=False, is_blocked=False, reason=f"Command '{base_cmd}' is not in the whitelist", ) return SafetyCheckResult( is_safe=True, is_whitelisted=True, is_blocked=False, ) def _check_blocklist(self, command: str) -> Tuple[bool, str]: """Check command against blocklist patterns.""" for pattern in self.blocklist: if re.search(pattern, command, re.IGNORECASE): return True, pattern return False, "" def _check_sudo_dangerous(self, command: str) -> Tuple[bool, str]: """Check for sudo combined with destructive operations.""" for pattern in self.sudo_patterns: if re.search(pattern, command, re.IGNORECASE): return True, pattern return False, "" def _extract_base_command(self, command: str) -> str: """Extract the base command from a shell command string. Handles pipes, redirections, env vars, and command chains. """ # Strip leading env variable assignments cmd = command.strip() while re.match(r'^[A-Za-z_][A-Za-z0-9_]*=\S+\s+', cmd): cmd = re.sub(r'^[A-Za-z_][A-Za-z0-9_]*=\S+\s+', '', cmd, count=1) # Handle command chains (&&, ||, ;) — check each segment for sep in ['&&', '||', ';']: if sep in cmd: first_part = cmd.split(sep)[0].strip() return self._extract_base_command(first_part) # Handle pipes — check the first command if '|' in cmd: first_part = cmd.split('|')[0].strip() return self._extract_base_command(first_part) # Handle subshell $(...) cmd = re.sub(r'\$\([^)]*\)', '', cmd).strip() # Get the first token try: tokens = shlex.split(cmd) except ValueError: tokens = cmd.split() if not tokens: return "" base = tokens[0] # Strip path (e.g., /usr/bin/pip -> pip) if '/' in base: base = base.rsplit('/', 1)[-1] return base