Ken Huang
Initial deployment: Security-Aware AI Agent Demo
e856398
"""Zero-Trust Permission Validation System"""
import json
import re
from pathlib import Path
from typing import Dict, Any, List, Optional
from datetime import datetime
def load_permission_matrix() -> Dict[str, Any]:
"""Load permission matrix from JSON"""
matrix_path = Path(__file__).parent.parent / "data" / "permission_matrix.json"
with open(matrix_path, 'r') as f:
return json.load(f)
def get_agent_role(agent_id: str) -> Optional[str]:
"""
Extract role from agent_id
Expected format: role-name-01, role-name-02, etc.
"""
if not agent_id:
return "guest-agent"
# Extract role from agent_id (e.g., "data-processor-01" -> "data-processor")
parts = agent_id.rsplit('-', 1)
if len(parts) == 2 and parts[1].isdigit():
return parts[0]
return agent_id
def check_pattern_match(resource: str, patterns: List[str]) -> bool:
"""
Check if resource matches any of the allowed patterns
Supports wildcards like database:*:read or filesystem:/tmp/*:write
"""
for pattern in patterns:
# Convert wildcard pattern to regex
regex_pattern = pattern.replace('*', '.*').replace(':', r'\:')
if re.match(f"^{regex_pattern}$", resource):
return True
return False
def check_always_deny(action: str, resource: str) -> tuple[bool, Optional[str]]:
"""Check if action is in always_deny list"""
matrix = load_permission_matrix()
always_deny = matrix['default_policies']['always_deny']
for denied_pattern in always_deny:
# Check if action matches denied pattern
if '*' in denied_pattern:
pattern = denied_pattern.replace('*', '.*')
if re.match(f"^{pattern}$", action):
return True, f"Action '{action}' is globally denied"
elif action == denied_pattern:
return True, f"Action '{action}' is globally denied"
return False, None
def check_requires_approval(action: str, resource: str) -> bool:
"""Check if action requires human approval"""
matrix = load_permission_matrix()
require_approval = matrix['default_policies']['require_approval_for']
for approval_pattern in require_approval:
if '*' in approval_pattern:
pattern = approval_pattern.replace('*', '.*')
if re.match(f"^{pattern}$", action):
return True
elif action == approval_pattern:
return True
# Check resource patterns for sensitive data
sensitive_keywords = ['secret', 'credential', 'password', 'token', 'key', 'payment']
resource_lower = resource.lower()
if any(keyword in resource_lower for keyword in sensitive_keywords):
return True
return False
def validate_permissions(
agent_id: str,
action: str,
resource: str,
current_permissions: Optional[List[str]] = None,
request_context: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Zero-trust permission validation
Args:
agent_id: Unique identifier for the agent
action: The action being attempted (e.g., "read_file", "execute_code")
resource: The target resource (e.g., "/etc/passwd", "database:users")
current_permissions: Agent's current permission set (optional)
request_context: Additional context (IP, session_id, timestamp)
Returns:
Validation result with decision and recommendations
"""
matrix = load_permission_matrix()
# Check if action is globally denied
is_denied, deny_reason = check_always_deny(action, resource)
if is_denied:
from .audit import generate_audit_id
audit_id = generate_audit_id("perm")
return {
"allowed": False,
"decision": "DENY",
"reason": deny_reason,
"agent_role": "unknown",
"required_permissions": [],
"current_permissions": current_permissions or [],
"permission_gap": [],
"recommendations": ["This action is prohibited by security policy"],
"escalation_path": "Contact security-admin@company.com",
"audit_id": audit_id
}
# Get agent role
role = get_agent_role(agent_id)
# Check if role exists in matrix
if role not in matrix['roles']:
from .audit import generate_audit_id
audit_id = generate_audit_id("perm")
return {
"allowed": False,
"decision": "DENY",
"reason": f"Unknown agent role: '{role}'",
"agent_role": role,
"required_permissions": [],
"current_permissions": current_permissions or [],
"permission_gap": [],
"recommendations": ["Register agent with valid role in permission matrix"],
"escalation_path": "Contact admin to configure agent permissions",
"audit_id": audit_id
}
role_config = matrix['roles'][role]
# Check if action is explicitly denied for this role
if action in role_config.get('denied_actions', []):
from .audit import generate_audit_id
audit_id = generate_audit_id("perm")
return {
"allowed": False,
"decision": "DENY",
"reason": f"Agent role '{role}' explicitly denies action '{action}'",
"agent_role": role,
"required_permissions": [],
"current_permissions": current_permissions or [],
"permission_gap": [f"{action} on {resource}"],
"recommendations": [
"This action is not permitted for your role",
"Request role change if elevated access is needed"
],
"escalation_path": "Contact security-admin@company.com",
"audit_id": audit_id
}
# Check if action is in allowed_actions
if action not in role_config['allowed_actions']:
from .audit import generate_audit_id
audit_id = generate_audit_id("perm")
return {
"allowed": False,
"decision": "DENY",
"reason": f"Action '{action}' not in allowed actions for role '{role}'",
"agent_role": role,
"required_permissions": [f"{action}:{resource}"],
"current_permissions": role_config['allowed_actions'],
"permission_gap": [action],
"recommendations": [
"Request permission addition from administrator",
"Use alternative action within your current permissions"
],
"escalation_path": "Submit permission request at /admin/permissions",
"audit_id": audit_id
}
# Check if resource matches allowed patterns
resource_allowed = check_pattern_match(resource, role_config['resource_patterns'])
if not resource_allowed:
from .audit import generate_audit_id
audit_id = generate_audit_id("perm")
return {
"allowed": False,
"decision": "DENY",
"reason": f"Resource '{resource}' does not match allowed patterns for role '{role}'",
"agent_role": role,
"required_permissions": [f"{action}:{resource}"],
"current_permissions": role_config['resource_patterns'],
"permission_gap": [f"access to {resource}"],
"recommendations": [
"Verify resource path is correct",
"Request access to this resource pattern"
],
"escalation_path": "Contact security-admin@company.com",
"audit_id": audit_id
}
# Check if action requires approval
requires_approval = check_requires_approval(action, resource)
from .audit import generate_audit_id
audit_id = generate_audit_id("perm")
if requires_approval:
return {
"allowed": False,
"decision": "REQUIRES_APPROVAL",
"reason": f"Action '{action}' on '{resource}' requires human approval",
"agent_role": role,
"required_permissions": [f"{action}:{resource}"],
"current_permissions": role_config['allowed_actions'],
"permission_gap": ["human approval"],
"recommendations": [
"Submit approval request with justification",
"Approval required due to sensitive action/resource"
],
"escalation_path": "Submit at /admin/approval-requests",
"audit_id": audit_id,
"approval_required": True
}
# Permission granted
return {
"allowed": True,
"decision": "ALLOW",
"reason": f"Agent '{agent_id}' has valid permissions for '{action}' on '{resource}'",
"agent_role": role,
"required_permissions": [f"{action}:{resource}"],
"current_permissions": role_config['allowed_actions'],
"permission_gap": [],
"recommendations": [],
"escalation_path": None,
"audit_id": audit_id
}