printf-sourav's picture
Initial commit
27cdb3e
Raw
History Blame Contribute Delete
7.24 kB
"""
Command Safety Layer — Whitelist/blocklist enforcement for sandbox commands.
Validates commands before execution to prevent destructive operations.
"""
from __future__ import annotations
import re
import shlex
from dataclasses import dataclass
from typing import List, Tuple
# Commands that are allowed to execute in the sandbox
COMMAND_WHITELIST: List[str] = [
"pip", "pip3", "python", "python3",
"apt-get", "npm",
"kill", "pkill",
"export", "source", "unset",
"systemctl",
"flask", "uvicorn",
"cat", "ls", "echo", "mkdir", "rm", "cp", "mv",
"sed", "grep", "awk", "head", "tail", "wc",
"ps", "lsof", "curl", "wget",
"chmod", "chown",
"touch", "tee",
"bash", "sh",
"cd", "pwd", "which", "env", "printenv",
"true", "false", "test",
"xargs",
]
# Patterns that are absolutely forbidden (destructive commands)
BLOCKLIST_PATTERNS: List[str] = [
r"rm\s+-rf\s+/\s*$", # rm -rf /
r"rm\s+-rf\s+/\*", # rm -rf /*
r"rm\s+--no-preserve-root", # rm --no-preserve-root
r":\(\)\s*\{\s*:\|:\s*&\s*\}\s*;\s*:", # fork bomb
r"dd\s+if=", # dd (disk destroyer)
r"mkfs\.", # mkfs (format disk)
r"chmod\s+777\s+/\s*$", # chmod 777 /
r"chmod\s+-R\s+777\s+/", # chmod -R 777 /
r">\s*/dev/sda", # write to raw disk
r"mv\s+/\s+", # mv / somewhere
r"wget.*\|\s*sh", # pipe download to shell
r"curl.*\|\s*sh", # pipe download to shell
r"curl.*\|\s*bash", # pipe download to bash
r"(?:^|&&|\|\||;)\s*(?:/sbin/)?shutdown\b", # shutdown invocation
r"(?:^|&&|\|\||;)\s*(?:/sbin/)?reboot\b", # reboot invocation
r"(?:^|&&|\|\||;)\s*(?:/sbin/)?init\s+0\b", # init 0 halt invocation
r"(?:^|&&|\|\||;)\s*(?:/sbin/)?halt\b", # halt invocation
]
# Patterns involving sudo + destructive operations
SUDO_DANGEROUS_PATTERNS: List[str] = [
r"sudo\s+rm",
r"sudo\s+dd",
r"sudo\s+mkfs",
r"sudo\s+chmod\s+777",
r"sudo\s+shutdown",
r"sudo\s+reboot",
r"sudo\s+halt",
r"sudo\s+init",
]
@dataclass
class SafetyCheckResult:
"""Result of a command safety check.
Attributes:
is_safe: Whether the command passed safety checks.
is_whitelisted: Whether the base command is in the whitelist.
is_blocked: Whether the command matches a blocklist pattern.
reason: Human-readable reason if the command was rejected.
matched_pattern: The blocklist pattern that matched, if any.
"""
is_safe: bool
is_whitelisted: bool
is_blocked: bool
reason: str = ""
matched_pattern: str = ""
class CommandSafetyChecker:
"""Validates commands against whitelist and blocklist rules.
Usage:
checker = CommandSafetyChecker()
result = checker.check("pip install flask")
if result.is_safe:
# execute command
"""
def __init__(
self,
extra_whitelist: List[str] | None = None,
extra_blocklist: List[str] | None = None,
) -> None:
"""Initialize the safety checker.
Args:
extra_whitelist: Additional commands to allow.
extra_blocklist: Additional regex patterns to block.
"""
self.whitelist = set(COMMAND_WHITELIST)
if extra_whitelist:
self.whitelist.update(extra_whitelist)
self.blocklist = list(BLOCKLIST_PATTERNS)
if extra_blocklist:
self.blocklist.extend(extra_blocklist)
self.sudo_patterns = list(SUDO_DANGEROUS_PATTERNS)
def check(self, command: str) -> SafetyCheckResult:
"""Check if a command is safe to execute.
Args:
command: The shell command string to validate.
Returns:
SafetyCheckResult with safety determination and reason.
"""
command = command.strip()
if not command:
return SafetyCheckResult(
is_safe=False, is_whitelisted=False, is_blocked=False,
reason="Empty command",
)
# Check blocklist first (highest priority)
blocked, pattern = self._check_blocklist(command)
if blocked:
return SafetyCheckResult(
is_safe=False, is_whitelisted=False, is_blocked=True,
reason=f"Command matches dangerous pattern: {pattern}",
matched_pattern=pattern,
)
# Check sudo + destructive combos
sudo_blocked, sudo_pattern = self._check_sudo_dangerous(command)
if sudo_blocked:
return SafetyCheckResult(
is_safe=False, is_whitelisted=False, is_blocked=True,
reason=f"Dangerous sudo command: {sudo_pattern}",
matched_pattern=sudo_pattern,
)
# Check whitelist
base_cmd = self._extract_base_command(command)
is_whitelisted = base_cmd in self.whitelist
if not is_whitelisted:
return SafetyCheckResult(
is_safe=False, is_whitelisted=False, is_blocked=False,
reason=f"Command '{base_cmd}' is not in the whitelist",
)
return SafetyCheckResult(
is_safe=True, is_whitelisted=True, is_blocked=False,
)
def _check_blocklist(self, command: str) -> Tuple[bool, str]:
"""Check command against blocklist patterns."""
for pattern in self.blocklist:
if re.search(pattern, command, re.IGNORECASE):
return True, pattern
return False, ""
def _check_sudo_dangerous(self, command: str) -> Tuple[bool, str]:
"""Check for sudo combined with destructive operations."""
for pattern in self.sudo_patterns:
if re.search(pattern, command, re.IGNORECASE):
return True, pattern
return False, ""
def _extract_base_command(self, command: str) -> str:
"""Extract the base command from a shell command string.
Handles pipes, redirections, env vars, and command chains.
"""
# Strip leading env variable assignments
cmd = command.strip()
while re.match(r'^[A-Za-z_][A-Za-z0-9_]*=\S+\s+', cmd):
cmd = re.sub(r'^[A-Za-z_][A-Za-z0-9_]*=\S+\s+', '', cmd, count=1)
# Handle command chains (&&, ||, ;) — check each segment
for sep in ['&&', '||', ';']:
if sep in cmd:
first_part = cmd.split(sep)[0].strip()
return self._extract_base_command(first_part)
# Handle pipes — check the first command
if '|' in cmd:
first_part = cmd.split('|')[0].strip()
return self._extract_base_command(first_part)
# Handle subshell $(...)
cmd = re.sub(r'\$\([^)]*\)', '', cmd).strip()
# Get the first token
try:
tokens = shlex.split(cmd)
except ValueError:
tokens = cmd.split()
if not tokens:
return ""
base = tokens[0]
# Strip path (e.g., /usr/bin/pip -> pip)
if '/' in base:
base = base.rsplit('/', 1)[-1]
return base