Spaces:
Sleeping
Sleeping
feat: enhance scenario authoring and caching mechanisms, update action submission terminology, and improve reward configuration for CyberSecurity_OWASP environment
be8eade | """Configuration for scenario authoring, curriculum, and cache-backed reset.""" | |
| from __future__ import annotations | |
| import json | |
| import os | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| from typing import Any, Literal | |
| ScenarioCacheMode = Literal["fallback", "require", "disabled"] | |
| DEFAULT_SCENARIO_CONFIG_PATH = ( | |
| Path(__file__).resolve().parent / "configs" / "scenario_authoring.small.json" | |
| ) | |
| class ScenarioAuthorConfig: | |
| provider: str = "huggingface" | |
| model_id: str = "deepseek-ai/DeepSeek-V4-Pro" | |
| thinking_mode: str = "thinking" | |
| reasoning_effort: str = "high" | |
| temperature: float = 1.0 | |
| top_p: float = 1.0 | |
| max_context_tokens: int = 131072 | |
| class CurriculumCacheConfig: | |
| difficulty_bucket_count: int = 4 | |
| difficulty_labels: list[str] = field(default_factory=lambda: ["D0", "D1", "D2", "D3"]) | |
| train_scenarios_per_bucket: int = 25 | |
| validation_scenarios_per_bucket: int = 10 | |
| heldout_eval_scenarios_per_bucket: int = 10 | |
| target_cache_hit_rate: float = 0.95 | |
| target_reset_latency_ms: int = 200 | |
| scenario_refresh_rate_per_epoch: float = 0.05 | |
| difficulty_calibration_strategy: str = "baseline_agent_pass_rate" | |
| pass_rate_thresholds: dict[str, tuple[float, float]] = field( | |
| default_factory=lambda: { | |
| "D0": (0.8, 1.0), | |
| "D1": (0.6, 0.8), | |
| "D2": (0.4, 0.6), | |
| "D3": (0.2, 0.4), | |
| } | |
| ) | |
| def minimum_for_split(self, split: str) -> int: | |
| if split == "hidden_eval": | |
| return self.heldout_eval_scenarios_per_bucket | |
| if split == "validation": | |
| return self.validation_scenarios_per_bucket | |
| return self.train_scenarios_per_bucket | |
| class ScenarioRuntimeConfig: | |
| cache_mode: ScenarioCacheMode = "fallback" | |
| cache_dir: str = "scenario_cache" | |
| generator_version: str = "scenario_generator_v1" | |
| verifier_version: str = "verifier_v1" | |
| class ScenarioAuthoringSettings: | |
| scenario_author: ScenarioAuthorConfig = field(default_factory=ScenarioAuthorConfig) | |
| curriculum: CurriculumCacheConfig = field(default_factory=CurriculumCacheConfig) | |
| runtime: ScenarioRuntimeConfig = field(default_factory=ScenarioRuntimeConfig) | |
| source_path: str = "" | |
| def load_scenario_authoring_config(path: str | Path | None = None) -> ScenarioAuthoringSettings: | |
| """Load and validate the small scenario-authoring config with env overrides.""" | |
| configured_path = Path( | |
| path | |
| or os.getenv("CYBERSECURITY_OWASP_SCENARIO_CONFIG", "") | |
| or DEFAULT_SCENARIO_CONFIG_PATH | |
| ) | |
| raw = json.loads(configured_path.read_text(encoding="utf-8")) | |
| raw = _apply_env_overrides(raw) | |
| settings = ScenarioAuthoringSettings( | |
| scenario_author=ScenarioAuthorConfig(**raw.get("scenario_author", {})), | |
| curriculum=_curriculum_from_raw(raw.get("curriculum", {})), | |
| runtime=ScenarioRuntimeConfig(**raw.get("runtime", {})), | |
| source_path=str(configured_path), | |
| ) | |
| _validate_settings(settings) | |
| return settings | |
| def _apply_env_overrides(raw: dict[str, Any]) -> dict[str, Any]: | |
| data = json.loads(json.dumps(raw)) | |
| author = data.setdefault("scenario_author", {}) | |
| curriculum = data.setdefault("curriculum", {}) | |
| runtime = data.setdefault("runtime", {}) | |
| _set_if_present(author, "model_id", "CYBERSECURITY_OWASP_SCENARIO_AUTHOR_MODEL") | |
| _set_if_present(author, "provider", "CYBERSECURITY_OWASP_SCENARIO_AUTHOR_PROVIDER") | |
| _set_if_present(author, "thinking_mode", "CYBERSECURITY_OWASP_SCENARIO_THINKING_MODE") | |
| _set_if_present(author, "reasoning_effort", "CYBERSECURITY_OWASP_SCENARIO_REASONING_EFFORT") | |
| _set_if_present(author, "temperature", "CYBERSECURITY_OWASP_SCENARIO_TEMPERATURE", float) | |
| _set_if_present(author, "top_p", "CYBERSECURITY_OWASP_SCENARIO_TOP_P", float) | |
| _set_if_present(author, "max_context_tokens", "CYBERSECURITY_OWASP_SCENARIO_MAX_CONTEXT", int) | |
| _set_if_present(curriculum, "difficulty_bucket_count", "CYBERSECURITY_OWASP_DIFFICULTY_BUCKETS", int) | |
| _set_if_present(curriculum, "train_scenarios_per_bucket", "CYBERSECURITY_OWASP_TRAIN_SCENARIOS_PER_BUCKET", int) | |
| _set_if_present(curriculum, "validation_scenarios_per_bucket", "CYBERSECURITY_OWASP_VALIDATION_SCENARIOS_PER_BUCKET", int) | |
| _set_if_present(curriculum, "heldout_eval_scenarios_per_bucket", "CYBERSECURITY_OWASP_HELDOUT_SCENARIOS_PER_BUCKET", int) | |
| _set_if_present(curriculum, "target_cache_hit_rate", "CYBERSECURITY_OWASP_TARGET_CACHE_HIT_RATE", float) | |
| _set_if_present(curriculum, "target_reset_latency_ms", "CYBERSECURITY_OWASP_TARGET_RESET_LATENCY_MS", int) | |
| _set_if_present(curriculum, "scenario_refresh_rate_per_epoch", "CYBERSECURITY_OWASP_SCENARIO_REFRESH_RATE", float) | |
| _set_if_present(curriculum, "difficulty_calibration_strategy", "CYBERSECURITY_OWASP_DIFFICULTY_CALIBRATION") | |
| _set_if_present(runtime, "cache_dir", "CYBERSECURITY_OWASP_SCENARIO_CACHE_DIR") | |
| _set_if_present(runtime, "cache_mode", "CYBERSECURITY_OWASP_SCENARIO_CACHE_MODE") | |
| _set_if_present(runtime, "generator_version", "CYBERSECURITY_OWASP_SCENARIO_GENERATOR_VERSION") | |
| _set_if_present(runtime, "verifier_version", "CYBERSECURITY_OWASP_SCENARIO_VERIFIER_VERSION") | |
| return data | |
| def _set_if_present( | |
| target: dict[str, Any], | |
| key: str, | |
| env_name: str, | |
| caster: type | None = None, | |
| ) -> None: | |
| value = os.getenv(env_name) | |
| if value is None: | |
| return | |
| target[key] = caster(value) if caster else value | |
| def _curriculum_from_raw(raw: dict[str, Any]) -> CurriculumCacheConfig: | |
| values = dict(raw) | |
| bucket_count = int(values.get("difficulty_bucket_count", 4)) | |
| labels = list(values.get("difficulty_labels") or []) | |
| if len(labels) < bucket_count: | |
| labels.extend(f"D{index}" for index in range(len(labels), bucket_count)) | |
| values["difficulty_labels"] = labels | |
| thresholds = values.get("pass_rate_thresholds") or {} | |
| values["pass_rate_thresholds"] = { | |
| str(key): tuple(float(item) for item in value) | |
| for key, value in thresholds.items() | |
| } | |
| return CurriculumCacheConfig(**values) | |
| def _validate_settings(settings: ScenarioAuthoringSettings) -> None: | |
| author = settings.scenario_author | |
| curriculum = settings.curriculum | |
| runtime = settings.runtime | |
| if not author.model_id: | |
| raise ValueError("scenario_author.model_id is required") | |
| if author.temperature <= 0.0 or author.top_p <= 0.0: | |
| raise ValueError("scenario author sampling values must be positive") | |
| if author.max_context_tokens < 4096: | |
| raise ValueError("scenario author max_context_tokens is too small") | |
| if curriculum.difficulty_bucket_count <= 0: | |
| raise ValueError("difficulty_bucket_count must be positive") | |
| if len(curriculum.difficulty_labels) < curriculum.difficulty_bucket_count: | |
| raise ValueError("difficulty_labels must cover every configured bucket") | |
| for attr in ( | |
| "train_scenarios_per_bucket", | |
| "validation_scenarios_per_bucket", | |
| "heldout_eval_scenarios_per_bucket", | |
| "target_reset_latency_ms", | |
| ): | |
| if int(getattr(curriculum, attr)) <= 0: | |
| raise ValueError(f"{attr} must be positive") | |
| if not 0.0 < curriculum.target_cache_hit_rate <= 1.0: | |
| raise ValueError("target_cache_hit_rate must be in (0, 1]") | |
| if not 0.0 <= curriculum.scenario_refresh_rate_per_epoch <= 1.0: | |
| raise ValueError("scenario_refresh_rate_per_epoch must be in [0, 1]") | |
| if runtime.cache_mode not in {"fallback", "require", "disabled"}: | |
| raise ValueError("runtime.cache_mode must be fallback, require, or disabled") | |