Spaces:
Sleeping
Sleeping
feat: enhance scenario authoring and caching mechanisms, update action submission terminology, and improve reward configuration for CyberSecurity_OWASP environment
be8eade | """Closed-loop scenario factory for CyberSecurity_OWASP.""" | |
| from __future__ import annotations | |
| import os | |
| import tempfile | |
| from pathlib import Path | |
| from typing import Any | |
| from uuid import uuid4 | |
| try: | |
| from ..fixture_generator import visible_workspace_summary | |
| from ..policy_graph import build_invoice_policy | |
| from ..template_renderer import render_fastapi_basic | |
| from .adversarial_designer import BoundedAdversarialDesigner | |
| except ImportError: # pragma: no cover | |
| from fixture_generator import visible_workspace_summary | |
| from policy_graph import build_invoice_policy | |
| from template_renderer import render_fastapi_basic | |
| from server.adversarial_designer import BoundedAdversarialDesigner | |
| def _make_workspace(prefix: str) -> Path: | |
| root = Path(os.getenv("CYBERSECURITY_OWASP_WORKSPACE_ROOT", tempfile.gettempdir())) | |
| root.mkdir(parents=True, exist_ok=True) | |
| for _ in range(100): | |
| workspace = root / f"{prefix}{uuid4().hex[:12]}" | |
| try: | |
| workspace.mkdir() | |
| except FileExistsError: | |
| continue | |
| return workspace | |
| raise RuntimeError("Unable to create isolated scenario workspace") | |
| def _visible_policy_hint(public_hint: dict[str, Any]) -> dict[str, Any]: | |
| """Return partial policy observability without hidden oracle/test labels.""" | |
| return { | |
| "domain": public_hint.get("domain", "invoices"), | |
| "policy_rules": list(public_hint.get("policy_rules", [])), | |
| "fixture_aliases": { | |
| "users": dict(public_hint.get("users", {})), | |
| "resources": dict(public_hint.get("resources", {})), | |
| }, | |
| "public_routes": list(public_hint.get("public_routes", [])), | |
| "observation_contract": { | |
| "visible": [ | |
| "product policy summary", | |
| "fixture aliases needed for local requests", | |
| "route summaries", | |
| "visible test results", | |
| ], | |
| "hidden": [ | |
| "evaluator-only policy tuples", | |
| "withheld invariant checks", | |
| "withheld scenario labels", | |
| "held-out family label", | |
| ], | |
| }, | |
| } | |
| class ScenarioFactory: | |
| """Compiles deterministic local app scenarios from curriculum profiles.""" | |
| def __init__(self, designer: BoundedAdversarialDesigner | None = None): | |
| self.designer = designer or BoundedAdversarialDesigner() | |
| def compile_scenario( | |
| self, | |
| seed: int, | |
| *, | |
| split: str = "train", | |
| difficulty: int = 0, | |
| curriculum_profile: dict[str, Any] | None = None, | |
| ) -> dict[str, Any]: | |
| profile = curriculum_profile or { | |
| "difficulty": difficulty, | |
| "difficulty_tier": "warmup", | |
| "target_weakness": "same_role_cross_object", | |
| } | |
| adversarial_spec = self.designer.design( | |
| seed=seed, split=split, curriculum_profile=profile | |
| ) | |
| compiled = build_invoice_policy(seed) | |
| workspace = _make_workspace(prefix=f"cybersecurity_owasp_{split}_{seed}_") | |
| public_hint = _visible_policy_hint(compiled.public_hint) | |
| editable_files = render_fastapi_basic(workspace, public_hint, compiled.hidden_facts) | |
| workspace_summary = visible_workspace_summary(editable_files, public_hint) | |
| workspace_summary.update( | |
| { | |
| "template_id": adversarial_spec["template_id"], | |
| "target_weakness": adversarial_spec["target_weakness"], | |
| } | |
| ) | |
| hidden = dict(compiled.hidden_facts) | |
| hidden.update( | |
| { | |
| "workspace": str(workspace), | |
| "editable_files": editable_files, | |
| "initial_file_hashes": { | |
| path: (workspace / path).read_text(encoding="utf-8") | |
| for path in editable_files | |
| }, | |
| "adversarial_spec": adversarial_spec, | |
| "scenario_family": adversarial_spec["scenario_family"], | |
| "template_id": adversarial_spec["template_id"], | |
| "target_weakness": adversarial_spec["target_weakness"], | |
| "oracle_hidden_focus": adversarial_spec["hidden_focus"], | |
| } | |
| ) | |
| return { | |
| "task_id": f"{split}-invoices-bola-{seed}", | |
| "workspace": workspace, | |
| "domain": adversarial_spec["domain"], | |
| "bug_family": adversarial_spec["bug_family"], | |
| "scenario_family": adversarial_spec["scenario_family"], | |
| "template_id": adversarial_spec["template_id"], | |
| "target_weakness": adversarial_spec["target_weakness"], | |
| "difficulty": int(profile.get("difficulty", difficulty)), | |
| "difficulty_tier": str(profile.get("difficulty_tier", "warmup")), | |
| "curriculum_snapshot": profile, | |
| "task_brief": ( | |
| "Inspect the generated invoices app and policy. Find the broken " | |
| "authorization behavior, submit a diagnosis with local evidence, patch " | |
| "the app, preserve intended owner/admin/public behavior, then submit." | |
| ), | |
| "public_hint": public_hint, | |
| "workspace_summary": workspace_summary, | |
| "hidden_facts": hidden, | |
| } | |