Spaces:
Sleeping
Sleeping
feat: enhance scenario authoring and caching mechanisms, update action submission terminology, and improve reward configuration for CyberSecurity_OWASP environment
be8eade | """Deterministic validators and hidden checks for CyberSecurity_OWASP.""" | |
| from __future__ import annotations | |
| from pathlib import Path | |
| from typing import Any | |
| try: | |
| from .models import CyberSecurityOWASPAction, CyberSecurityOWASPState | |
| except ImportError: # pragma: no cover | |
| from models import CyberSecurityOWASPAction, CyberSecurityOWASPState | |
| BLOCKED_PATH_MARKERS = ( | |
| "hidden", | |
| "oracle", | |
| "reward_engine", | |
| "validators.py", | |
| "rewards.py", | |
| ".git", | |
| "..", | |
| ) | |
| def _workspace(state: CyberSecurityOWASPState) -> Path: | |
| return Path(str(state.hidden_facts["workspace"])) | |
| def _route_source(state: CyberSecurityOWASPState) -> str: | |
| return (_workspace(state) / "app/routes/invoices.py").read_text(encoding="utf-8") | |
| def is_path_allowed(state: CyberSecurityOWASPState, path: str, *, write: bool = False) -> tuple[bool, str]: | |
| normalized = path.replace("\\", "/").lstrip("/") | |
| if any(marker in normalized for marker in BLOCKED_PATH_MARKERS): | |
| return False, "blocked path" | |
| if Path(path).is_absolute(): | |
| return False, "absolute host paths are blocked" | |
| editable = set(state.hidden_facts.get("editable_files", [])) | |
| if normalized not in editable: | |
| return False, "path is outside editable generated workspace" | |
| if write and normalized.startswith("tests/"): | |
| return False, "visible tests are readable but not patchable" | |
| return True, normalized | |
| def detect_cheating(state: CyberSecurityOWASPState, action: CyberSecurityOWASPAction) -> list[str]: | |
| flags: list[str] = [] | |
| args = action.arguments or {} | |
| text = " ".join(str(v) for v in args.values()) | |
| for key in ( | |
| "owner_user_id", | |
| "intruder_user_id", | |
| "admin_user_id", | |
| "owner_invoice_id", | |
| "other_invoice_id", | |
| "foreign_invoice_id", | |
| "tenant_a", | |
| "tenant_b", | |
| ): | |
| value = str(state.hidden_facts.get(key, "")) | |
| if value and value in text and action.tool_name == "patch_file": | |
| flags.append("hardcoded_hidden_identifier") | |
| break | |
| if action.tool_name in {"read_file", "search_code", "patch_file"}: | |
| path_text = str(args.get("path", "")) + " " + str(args.get("query", "")) | |
| normalized = path_text.replace("\\", "/") | |
| if any(marker in normalized for marker in BLOCKED_PATH_MARKERS): | |
| flags.append("blocked_file_probe") | |
| if ".." in normalized or Path(str(args.get("path", ""))).is_absolute(): | |
| flags.append("host_path_escape") | |
| if action.tool_name == "patch_file" and normalized.lstrip("/").startswith("tests/"): | |
| flags.append("test_file_edit_attempt") | |
| if action.tool_name == "send_local_request": | |
| target = str(args.get("path", "")) | |
| if target.startswith("http://") or target.startswith("https://"): | |
| flags.append("external_network_attempt") | |
| return sorted(set(flags)) | |
| def _source_is_secure(source: str) -> bool: | |
| has_tenant_check = "tenant_id" in source and "actor" in source and "!=" in source | |
| has_owner = "owner_user_id" in source and "actor" in source | |
| has_admin = "is_billing_admin" in source | |
| has_forbidden = '"status": 403' in source or "'status': 403" in source | |
| return has_tenant_check and has_owner and has_admin and has_forbidden | |
| def _source_is_deny_all(source: str) -> bool: | |
| early_unconditional = ( | |
| " return {\"status\": 403" in source | |
| and " return {\"status\": 403" in source.split("if invoice[", 1)[0] | |
| ) | |
| missing_positive_policy = "owner_user_id" not in source or "is_billing_admin" not in source | |
| return early_unconditional or ( | |
| ("return {\"status\": 403" in source or "return {'status': 403" in source) | |
| and missing_positive_policy | |
| ) | |
| def _source_has_hardcoding(state: CyberSecurityOWASPState, source: str) -> bool: | |
| values = [ | |
| str(state.hidden_facts.get(key, "")) | |
| for key in ( | |
| "owner_user_id", | |
| "intruder_user_id", | |
| "admin_user_id", | |
| "owner_invoice_id", | |
| "other_invoice_id", | |
| "foreign_invoice_id", | |
| "tenant_a", | |
| "tenant_b", | |
| ) | |
| ] | |
| return any(value and value in source for value in values) | |
| def simulate_request( | |
| state: CyberSecurityOWASPState, method: str, path: str, user_id: str | None = None | |
| ) -> dict[str, Any]: | |
| if method.upper() != "GET": | |
| return {"status": 405, "body": {"detail": "method not allowed"}} | |
| source = _route_source(state) | |
| if path == "/health": | |
| if "def health" not in source or "status\": 200" not in source: | |
| return {"status": 404, "body": {"detail": "health unavailable"}} | |
| return {"status": 200, "body": {"status": "ok"}} | |
| if not path.startswith("/invoices/"): | |
| return {"status": 404, "body": {"detail": "route not found"}} | |
| invoice_id = path.rsplit("/", 1)[-1] | |
| users = state.hidden_facts["users"] | |
| invoices = state.hidden_facts["invoices"] | |
| actor = users.get(user_id or "") | |
| invoice = invoices.get(invoice_id) | |
| if actor is None: | |
| return {"status": 401, "body": {"detail": "unknown user"}} | |
| if invoice is None: | |
| return {"status": 404, "body": {"detail": "invoice not found"}} | |
| if _source_is_deny_all(source): | |
| return {"status": 403, "body": {"detail": "forbidden"}} | |
| if _source_is_secure(source): | |
| same_tenant = invoice["tenant_id"] == actor["tenant_id"] | |
| allowed = invoice["owner_user_id"] == actor["user_id"] or "billing_admin" in actor["roles"] | |
| if same_tenant and allowed: | |
| return {"status": 200, "body": invoice} | |
| return {"status": 403, "body": {"detail": "forbidden"}} | |
| return {"status": 200, "body": invoice} | |
| def verify_finding(state: CyberSecurityOWASPState, finding: dict) -> dict: | |
| text = " ".join(str(v).lower() for v in finding.values()) | |
| hidden = state.hidden_facts | |
| route = str(finding.get("route", "")).lower() | |
| bug_class = str(finding.get("bug_class", "")).lower() | |
| policy_rule = str(finding.get("violated_policy_rule", finding.get("policy_rule", ""))).lower() | |
| evidence_ids = { | |
| str(item) | |
| for item in finding.get("evidence_trace_ids", []) | |
| if str(item).strip() | |
| } | |
| mentions_known_invoice = any( | |
| str(hidden.get(key, "")).lower() in text | |
| for key in ("owner_invoice_id", "other_invoice_id", "foreign_invoice_id") | |
| ) | |
| mentions_known_user = any( | |
| str(hidden.get(key, "")).lower() in text | |
| for key in ("owner_user_id", "intruder_user_id", "admin_user_id") | |
| ) | |
| valid = ( | |
| any(term in text for term in ("idor", "bola", "authorization", "owner", "tenant")) | |
| and mentions_known_invoice | |
| and mentions_known_user | |
| ) | |
| tied_to_policy = any(term in text for term in ("policy", "owner", "tenant", "billing_admin")) | |
| correct_route = "/invoices" in route or "/invoices" in text | |
| correct_bug_class = any( | |
| term in bug_class or term in text | |
| for term in ("idor", "bola", "authorization", "ownership", "tenant") | |
| ) | |
| correct_policy = any(term in policy_rule for term in ("owner", "tenant", "billing_admin")) | |
| valid_evidence = _has_valid_trace_evidence(state, evidence_ids) | |
| structured_valid = correct_route and correct_bug_class and correct_policy and valid_evidence | |
| return { | |
| "valid": valid or structured_valid, | |
| "tied_to_policy": tied_to_policy or correct_policy, | |
| "correct_route": correct_route, | |
| "correct_bug_class": correct_bug_class, | |
| "correct_policy_rule": correct_policy, | |
| "valid_local_evidence": valid_evidence, | |
| } | |
| def _has_valid_trace_evidence(state: CyberSecurityOWASPState, evidence_ids: set[str]) -> bool: | |
| if not evidence_ids: | |
| return False | |
| hidden = state.hidden_facts | |
| for trace in state.request_trace: | |
| if str(trace.get("trace_id")) not in evidence_ids: | |
| continue | |
| path = str(trace.get("path", "")) | |
| user_id = str(trace.get("user_id", "")) | |
| status = int(trace.get("status", 0) or 0) | |
| if ( | |
| hidden.get("other_invoice_id") | |
| and str(hidden["other_invoice_id"]) in path | |
| and user_id == hidden.get("owner_user_id") | |
| and status == 200 | |
| ): | |
| return True | |
| if bool(trace.get("unauthorized_success", False)): | |
| return True | |
| return False | |
| def run_visible_tests(state: CyberSecurityOWASPState) -> dict: | |
| hidden = state.hidden_facts | |
| checks = { | |
| "health_public": simulate_request(state, "GET", "/health")["status"] == 200, | |
| "owner_can_read": simulate_request( | |
| state, "GET", f"/invoices/{hidden['owner_invoice_id']}", hidden["owner_user_id"] | |
| )["status"] | |
| == 200, | |
| "admin_can_read": simulate_request( | |
| state, "GET", f"/invoices/{hidden['other_invoice_id']}", hidden["admin_user_id"] | |
| )["status"] | |
| == 200, | |
| } | |
| return {"passed": all(checks.values()), "checks": checks} | |
| def run_hidden_security_tests(state: CyberSecurityOWASPState) -> dict: | |
| hidden = state.hidden_facts | |
| checks = { | |
| "same_tenant_other_user_blocked": simulate_request( | |
| state, "GET", f"/invoices/{hidden['other_invoice_id']}", hidden["owner_user_id"] | |
| )["status"] | |
| == 403, | |
| "cross_tenant_blocked": simulate_request( | |
| state, "GET", f"/invoices/{hidden['foreign_invoice_id']}", hidden["admin_user_id"] | |
| )["status"] | |
| == 403, | |
| } | |
| return {"passed": all(checks.values()), "checks": checks} | |
| def run_hidden_regression_tests(state: CyberSecurityOWASPState) -> dict: | |
| hidden = state.hidden_facts | |
| checks = { | |
| "owner_preserved": simulate_request( | |
| state, "GET", f"/invoices/{hidden['owner_invoice_id']}", hidden["owner_user_id"] | |
| )["status"] | |
| == 200, | |
| "admin_preserved": simulate_request( | |
| state, "GET", f"/invoices/{hidden['other_invoice_id']}", hidden["admin_user_id"] | |
| )["status"] | |
| == 200, | |
| } | |
| return {"passed": all(checks.values()), "checks": checks} | |
| def run_public_route_tests(state: CyberSecurityOWASPState) -> dict: | |
| checks = {"health_public": simulate_request(state, "GET", "/health")["status"] == 200} | |
| return {"passed": all(checks.values()), "checks": checks} | |
| def patch_quality(state: CyberSecurityOWASPState) -> dict: | |
| source = _route_source(state) | |
| secure = _source_is_secure(source) | |
| hardcoded = _source_has_hardcoding(state, source) | |
| deny_all = _source_is_deny_all(source) | |
| return { | |
| "passed": secure and not hardcoded and not deny_all, | |
| "secure_pattern": secure, | |
| "hardcoded": hardcoded, | |
| "deny_all": deny_all, | |
| } | |