Cyber_analyst-round1 / server /scenario_factory.py
Humanlearning's picture
feat: enhance scenario authoring and caching mechanisms, update action submission terminology, and improve reward configuration for CyberSecurity_OWASP environment
be8eade
"""Closed-loop scenario factory for CyberSecurity_OWASP."""
from __future__ import annotations
import os
import tempfile
from pathlib import Path
from typing import Any
from uuid import uuid4
try:
from ..fixture_generator import visible_workspace_summary
from ..policy_graph import build_invoice_policy
from ..template_renderer import render_fastapi_basic
from .adversarial_designer import BoundedAdversarialDesigner
except ImportError: # pragma: no cover
from fixture_generator import visible_workspace_summary
from policy_graph import build_invoice_policy
from template_renderer import render_fastapi_basic
from server.adversarial_designer import BoundedAdversarialDesigner
def _make_workspace(prefix: str) -> Path:
root = Path(os.getenv("CYBERSECURITY_OWASP_WORKSPACE_ROOT", tempfile.gettempdir()))
root.mkdir(parents=True, exist_ok=True)
for _ in range(100):
workspace = root / f"{prefix}{uuid4().hex[:12]}"
try:
workspace.mkdir()
except FileExistsError:
continue
return workspace
raise RuntimeError("Unable to create isolated scenario workspace")
def _visible_policy_hint(public_hint: dict[str, Any]) -> dict[str, Any]:
"""Return partial policy observability without hidden oracle/test labels."""
return {
"domain": public_hint.get("domain", "invoices"),
"policy_rules": list(public_hint.get("policy_rules", [])),
"fixture_aliases": {
"users": dict(public_hint.get("users", {})),
"resources": dict(public_hint.get("resources", {})),
},
"public_routes": list(public_hint.get("public_routes", [])),
"observation_contract": {
"visible": [
"product policy summary",
"fixture aliases needed for local requests",
"route summaries",
"visible test results",
],
"hidden": [
"evaluator-only policy tuples",
"withheld invariant checks",
"withheld scenario labels",
"held-out family label",
],
},
}
class ScenarioFactory:
"""Compiles deterministic local app scenarios from curriculum profiles."""
def __init__(self, designer: BoundedAdversarialDesigner | None = None):
self.designer = designer or BoundedAdversarialDesigner()
def compile_scenario(
self,
seed: int,
*,
split: str = "train",
difficulty: int = 0,
curriculum_profile: dict[str, Any] | None = None,
) -> dict[str, Any]:
profile = curriculum_profile or {
"difficulty": difficulty,
"difficulty_tier": "warmup",
"target_weakness": "same_role_cross_object",
}
adversarial_spec = self.designer.design(
seed=seed, split=split, curriculum_profile=profile
)
compiled = build_invoice_policy(seed)
workspace = _make_workspace(prefix=f"cybersecurity_owasp_{split}_{seed}_")
public_hint = _visible_policy_hint(compiled.public_hint)
editable_files = render_fastapi_basic(workspace, public_hint, compiled.hidden_facts)
workspace_summary = visible_workspace_summary(editable_files, public_hint)
workspace_summary.update(
{
"template_id": adversarial_spec["template_id"],
"target_weakness": adversarial_spec["target_weakness"],
}
)
hidden = dict(compiled.hidden_facts)
hidden.update(
{
"workspace": str(workspace),
"editable_files": editable_files,
"initial_file_hashes": {
path: (workspace / path).read_text(encoding="utf-8")
for path in editable_files
},
"adversarial_spec": adversarial_spec,
"scenario_family": adversarial_spec["scenario_family"],
"template_id": adversarial_spec["template_id"],
"target_weakness": adversarial_spec["target_weakness"],
"oracle_hidden_focus": adversarial_spec["hidden_focus"],
}
)
return {
"task_id": f"{split}-invoices-bola-{seed}",
"workspace": workspace,
"domain": adversarial_spec["domain"],
"bug_family": adversarial_spec["bug_family"],
"scenario_family": adversarial_spec["scenario_family"],
"template_id": adversarial_spec["template_id"],
"target_weakness": adversarial_spec["target_weakness"],
"difficulty": int(profile.get("difficulty", difficulty)),
"difficulty_tier": str(profile.get("difficulty_tier", "warmup")),
"curriculum_snapshot": profile,
"task_brief": (
"Inspect the generated invoices app and policy. Find the broken "
"authorization behavior, submit a diagnosis with local evidence, patch "
"the app, preserve intended owner/admin/public behavior, then submit."
),
"public_hint": public_hint,
"workspace_summary": workspace_summary,
"hidden_facts": hidden,
}