Spaces:
Sleeping
Sleeping
| """ | |
| Command Safety Layer — Whitelist/blocklist enforcement for sandbox commands. | |
| Validates commands before execution to prevent destructive operations. | |
| """ | |
| from __future__ import annotations | |
| import re | |
| import shlex | |
| from dataclasses import dataclass | |
| from typing import List, Tuple | |
| # Commands that are allowed to execute in the sandbox | |
| COMMAND_WHITELIST: List[str] = [ | |
| "pip", "pip3", "python", "python3", | |
| "apt-get", "npm", | |
| "kill", "pkill", | |
| "export", "source", "unset", | |
| "systemctl", | |
| "flask", "uvicorn", | |
| "cat", "ls", "echo", "mkdir", "rm", "cp", "mv", | |
| "sed", "grep", "awk", "head", "tail", "wc", | |
| "ps", "lsof", "curl", "wget", | |
| "chmod", "chown", | |
| "touch", "tee", | |
| "bash", "sh", | |
| "cd", "pwd", "which", "env", "printenv", | |
| "true", "false", "test", | |
| "xargs", | |
| ] | |
| # Patterns that are absolutely forbidden (destructive commands) | |
| BLOCKLIST_PATTERNS: List[str] = [ | |
| r"rm\s+-rf\s+/\s*$", # rm -rf / | |
| r"rm\s+-rf\s+/\*", # rm -rf /* | |
| r"rm\s+--no-preserve-root", # rm --no-preserve-root | |
| r":\(\)\s*\{\s*:\|:\s*&\s*\}\s*;\s*:", # fork bomb | |
| r"dd\s+if=", # dd (disk destroyer) | |
| r"mkfs\.", # mkfs (format disk) | |
| r"chmod\s+777\s+/\s*$", # chmod 777 / | |
| r"chmod\s+-R\s+777\s+/", # chmod -R 777 / | |
| r">\s*/dev/sda", # write to raw disk | |
| r"mv\s+/\s+", # mv / somewhere | |
| r"wget.*\|\s*sh", # pipe download to shell | |
| r"curl.*\|\s*sh", # pipe download to shell | |
| r"curl.*\|\s*bash", # pipe download to bash | |
| r"(?:^|&&|\|\||;)\s*(?:/sbin/)?shutdown\b", # shutdown invocation | |
| r"(?:^|&&|\|\||;)\s*(?:/sbin/)?reboot\b", # reboot invocation | |
| r"(?:^|&&|\|\||;)\s*(?:/sbin/)?init\s+0\b", # init 0 halt invocation | |
| r"(?:^|&&|\|\||;)\s*(?:/sbin/)?halt\b", # halt invocation | |
| ] | |
| # Patterns involving sudo + destructive operations | |
| SUDO_DANGEROUS_PATTERNS: List[str] = [ | |
| r"sudo\s+rm", | |
| r"sudo\s+dd", | |
| r"sudo\s+mkfs", | |
| r"sudo\s+chmod\s+777", | |
| r"sudo\s+shutdown", | |
| r"sudo\s+reboot", | |
| r"sudo\s+halt", | |
| r"sudo\s+init", | |
| ] | |
| class SafetyCheckResult: | |
| """Result of a command safety check. | |
| Attributes: | |
| is_safe: Whether the command passed safety checks. | |
| is_whitelisted: Whether the base command is in the whitelist. | |
| is_blocked: Whether the command matches a blocklist pattern. | |
| reason: Human-readable reason if the command was rejected. | |
| matched_pattern: The blocklist pattern that matched, if any. | |
| """ | |
| is_safe: bool | |
| is_whitelisted: bool | |
| is_blocked: bool | |
| reason: str = "" | |
| matched_pattern: str = "" | |
| class CommandSafetyChecker: | |
| """Validates commands against whitelist and blocklist rules. | |
| Usage: | |
| checker = CommandSafetyChecker() | |
| result = checker.check("pip install flask") | |
| if result.is_safe: | |
| # execute command | |
| """ | |
| def __init__( | |
| self, | |
| extra_whitelist: List[str] | None = None, | |
| extra_blocklist: List[str] | None = None, | |
| ) -> None: | |
| """Initialize the safety checker. | |
| Args: | |
| extra_whitelist: Additional commands to allow. | |
| extra_blocklist: Additional regex patterns to block. | |
| """ | |
| self.whitelist = set(COMMAND_WHITELIST) | |
| if extra_whitelist: | |
| self.whitelist.update(extra_whitelist) | |
| self.blocklist = list(BLOCKLIST_PATTERNS) | |
| if extra_blocklist: | |
| self.blocklist.extend(extra_blocklist) | |
| self.sudo_patterns = list(SUDO_DANGEROUS_PATTERNS) | |
| def check(self, command: str) -> SafetyCheckResult: | |
| """Check if a command is safe to execute. | |
| Args: | |
| command: The shell command string to validate. | |
| Returns: | |
| SafetyCheckResult with safety determination and reason. | |
| """ | |
| command = command.strip() | |
| if not command: | |
| return SafetyCheckResult( | |
| is_safe=False, is_whitelisted=False, is_blocked=False, | |
| reason="Empty command", | |
| ) | |
| # Check blocklist first (highest priority) | |
| blocked, pattern = self._check_blocklist(command) | |
| if blocked: | |
| return SafetyCheckResult( | |
| is_safe=False, is_whitelisted=False, is_blocked=True, | |
| reason=f"Command matches dangerous pattern: {pattern}", | |
| matched_pattern=pattern, | |
| ) | |
| # Check sudo + destructive combos | |
| sudo_blocked, sudo_pattern = self._check_sudo_dangerous(command) | |
| if sudo_blocked: | |
| return SafetyCheckResult( | |
| is_safe=False, is_whitelisted=False, is_blocked=True, | |
| reason=f"Dangerous sudo command: {sudo_pattern}", | |
| matched_pattern=sudo_pattern, | |
| ) | |
| # Check whitelist | |
| base_cmd = self._extract_base_command(command) | |
| is_whitelisted = base_cmd in self.whitelist | |
| if not is_whitelisted: | |
| return SafetyCheckResult( | |
| is_safe=False, is_whitelisted=False, is_blocked=False, | |
| reason=f"Command '{base_cmd}' is not in the whitelist", | |
| ) | |
| return SafetyCheckResult( | |
| is_safe=True, is_whitelisted=True, is_blocked=False, | |
| ) | |
| def _check_blocklist(self, command: str) -> Tuple[bool, str]: | |
| """Check command against blocklist patterns.""" | |
| for pattern in self.blocklist: | |
| if re.search(pattern, command, re.IGNORECASE): | |
| return True, pattern | |
| return False, "" | |
| def _check_sudo_dangerous(self, command: str) -> Tuple[bool, str]: | |
| """Check for sudo combined with destructive operations.""" | |
| for pattern in self.sudo_patterns: | |
| if re.search(pattern, command, re.IGNORECASE): | |
| return True, pattern | |
| return False, "" | |
| def _extract_base_command(self, command: str) -> str: | |
| """Extract the base command from a shell command string. | |
| Handles pipes, redirections, env vars, and command chains. | |
| """ | |
| # Strip leading env variable assignments | |
| cmd = command.strip() | |
| while re.match(r'^[A-Za-z_][A-Za-z0-9_]*=\S+\s+', cmd): | |
| cmd = re.sub(r'^[A-Za-z_][A-Za-z0-9_]*=\S+\s+', '', cmd, count=1) | |
| # Handle command chains (&&, ||, ;) — check each segment | |
| for sep in ['&&', '||', ';']: | |
| if sep in cmd: | |
| first_part = cmd.split(sep)[0].strip() | |
| return self._extract_base_command(first_part) | |
| # Handle pipes — check the first command | |
| if '|' in cmd: | |
| first_part = cmd.split('|')[0].strip() | |
| return self._extract_base_command(first_part) | |
| # Handle subshell $(...) | |
| cmd = re.sub(r'\$\([^)]*\)', '', cmd).strip() | |
| # Get the first token | |
| try: | |
| tokens = shlex.split(cmd) | |
| except ValueError: | |
| tokens = cmd.split() | |
| if not tokens: | |
| return "" | |
| base = tokens[0] | |
| # Strip path (e.g., /usr/bin/pip -> pip) | |
| if '/' in base: | |
| base = base.rsplit('/', 1)[-1] | |
| return base | |