Lars Talian commited on
Commit
228ed67
·
1 Parent(s): a49b769

Make mutation policy weights explicit

Browse files
docs/mutation_policy.md ADDED
@@ -0,0 +1,107 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Mutation Policy Weights
2
+
3
+ `PopulationMutationPolicy` is a hand-authored heuristic policy, but its
4
+ weights and shaping constants are now explicit in
5
+ `src/open_range/builder/mutation_policy.py` under `MutationPolicySettings`.
6
+
7
+ The policy has three jobs:
8
+
9
+ 1. Choose which stored snapshot is the best parent to mutate next.
10
+ 2. Choose which structural mutation op to apply.
11
+ 3. Choose which security/noise mutation op to apply.
12
+
13
+ ## Parent Selection Terms
14
+
15
+ These fields live in `MutationPolicySettings.parent`.
16
+
17
+ | Field | Default | Why it exists |
18
+ | --- | ---: | --- |
19
+ | `frontier_weight` | `0.28` | Prefer snapshots near the current learning frontier instead of trivially solved or impossible ones. |
20
+ | `replay_weight` | `0.18` | Revisit under-played snapshots so the curriculum does not collapse to a tiny subset. |
21
+ | `novelty_weight` | `0.16` | Favor rarer vulnerability mixes across the population. |
22
+ | `weak_overlap_weight` | `0.18` | Bias parent choice toward snapshots that exercise known weak areas. |
23
+ | `lineage_balance_weight` | `0.08` | Prevent one root lineage from dominating the pool. |
24
+ | `depth_balance_weight` | `0.04` | Avoid over-sampling very deep descendant chains. |
25
+ | `recency_weight` | `0.04` | Cool down parents that were used repeatedly in the recent window. |
26
+ | `complexity_weight` | `0.04` | Slightly prefer richer parents with more structure to mutate from. |
27
+
28
+ Shaping constants in the same model explain how those raw signals are formed:
29
+
30
+ | Field | Default | Meaning |
31
+ | --- | ---: | --- |
32
+ | `minimum_total` | `0.05` | Sampling floor for low-scoring parents. |
33
+ | `unplayed_frontier_score` | `0.40` | Frontier score used before any play stats exist. |
34
+ | `empty_vuln_novelty_score` | `0.25` | Novelty fallback for snapshots with no typed vulnerabilities. |
35
+ | `preferred_generation_depth` | `3.0` | Depth after which descendant chains start being penalized. |
36
+ | `complexity_vuln_factor` | `0.25` | Complexity contribution per vulnerability. |
37
+ | `complexity_golden_path_factor` | `0.03` | Complexity contribution per golden-path step. |
38
+ | `complexity_dependency_edge_factor` | `0.02` | Complexity contribution per dependency edge. |
39
+ | `complexity_trust_edge_factor` | `0.02` | Complexity contribution per trust edge. |
40
+ | `complexity_cap` | `1.0` | Cap for the normalized complexity score. |
41
+
42
+ ## Mutation Selection Terms
43
+
44
+ These fields live in `MutationPolicySettings.mutation`.
45
+
46
+ | Field | Default | Why it exists |
47
+ | --- | ---: | --- |
48
+ | `curriculum_weight` | `0.38` | Prefer ops that target the agent's current weakness. |
49
+ | `novelty_weight` | `0.24` | Prefer ops that open new surfaces or vary episode shape. |
50
+ | `structural_gain_weight` | `0.28` | Prefer ops that materially expand the scenario graph. |
51
+ | `lineage_weight` | `0.10` | Slight bias toward shallower lineage when all else is equal. |
52
+ | `minimum_total` | `0.05` | Sampling floor for low-scoring mutation ops. |
53
+
54
+ Raw novelty bonuses in `MutationPolicySettings.novelty`:
55
+
56
+ | Field | Default | Meaning |
57
+ | --- | ---: | --- |
58
+ | `base_bonus` | `0.40` | Baseline novelty for every op. |
59
+ | `new_vuln_class_bonus` | `1.0` | Extra novelty for a vulnerability class not seen recently. |
60
+ | `new_noise_surface_bonus` | `0.50` | Extra novelty for noise on a new attack surface. |
61
+ | `structural_op_bonus` | `0.40` | Extra novelty for non-security ops that change the graph. |
62
+
63
+ Raw curriculum bonuses in `MutationPolicySettings.curriculum`:
64
+
65
+ | Field | Default | Meaning |
66
+ | --- | ---: | --- |
67
+ | `base_bonus` | `0.35` | Baseline curriculum value for every op. |
68
+ | `weak_area_bonus` | `1.50` | Reward seeding a vulnerability in a known weak area. |
69
+ | `new_vuln_bonus` | `0.40` | Reward introducing a vulnerability class not present in the parent. |
70
+ | `chain_length_bonus` | `0.60` | Reward edges that help satisfy multi-hop chain requirements. |
71
+ | `focus_identity_bonus` | `0.50` | Reward identity-layer ops when curriculum focus is identity. |
72
+ | `focus_infra_bonus` | `0.50` | Reward infra-layer ops when curriculum focus is infra. |
73
+ | `focus_process_bonus` | `0.40` | Reward benign noise when focus is process realism. |
74
+
75
+ ## Structural Gain Table
76
+
77
+ These fields live in `MutationPolicySettings.structural_gains`.
78
+
79
+ | Op Type | Default |
80
+ | --- | ---: |
81
+ | `add_service` | `1.00` |
82
+ | `add_dependency_edge` | `0.90` |
83
+ | `add_trust_edge` | `0.85` |
84
+ | `add_user` | `0.80` |
85
+ | `seed_vuln` | `0.70` |
86
+ | `add_benign_noise` | `0.30` |
87
+ | `default_gain` | `0.20` |
88
+
89
+ ## Tuning Path
90
+
91
+ You can swap weights without touching policy code:
92
+
93
+ 1. Write a JSON or YAML file matching `MutationPolicySettings`.
94
+ 2. Load it with `load_mutation_policy_settings(path)` or pass it into `PopulationMutationPolicy(settings=...)`.
95
+ 3. Compare it against the default policy with:
96
+
97
+ ```bash
98
+ PYTHONPATH=src .venv/bin/python scripts/calibrate_mutation_policy.py \
99
+ --store-dir snapshots \
100
+ --stats path/to/snapshot_stats.json \
101
+ --context path/to/build_context.json \
102
+ --settings tuned=path/to/policy_settings.yaml
103
+ ```
104
+
105
+ The calibration output is JSON so it can be diffed, archived, or fed into
106
+ notebooks. Parent-selection logs and `MutationPlan.score_breakdown` now expose
107
+ weighted contributions instead of only raw feature values.
scripts/calibrate_mutation_policy.py ADDED
@@ -0,0 +1,131 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """Offline calibration harness for PopulationMutationPolicy."""
3
+
4
+ from __future__ import annotations
5
+
6
+ import argparse
7
+ import asyncio
8
+ import json
9
+ from pathlib import Path
10
+ from typing import Any
11
+
12
+ import yaml
13
+
14
+ from open_range.builder.mutation_policy import (
15
+ PopulationMutationPolicy,
16
+ load_mutation_policy_settings,
17
+ )
18
+ from open_range.builder.snapshot_store import SnapshotStore
19
+ from open_range.protocols import BuildContext
20
+
21
+
22
+ def _load_object(path: str | None) -> dict[str, Any]:
23
+ if not path:
24
+ return {}
25
+ payload = Path(path).read_text(encoding="utf-8")
26
+ suffix = Path(path).suffix.lower()
27
+ if suffix in {".yaml", ".yml"}:
28
+ data = yaml.safe_load(payload) or {}
29
+ else:
30
+ data = json.loads(payload)
31
+ if not isinstance(data, dict):
32
+ raise ValueError(f"expected an object in {path}")
33
+ return data
34
+
35
+
36
+ def _parse_settings_arg(value: str) -> tuple[str, Path]:
37
+ if "=" in value:
38
+ label, raw_path = value.split("=", 1)
39
+ return label.strip(), Path(raw_path).resolve()
40
+ path = Path(value).resolve()
41
+ return path.stem, path
42
+
43
+
44
+ def main(argv: list[str] | None = None) -> int:
45
+ parser = argparse.ArgumentParser(
46
+ description=(
47
+ "Compare parent-selection scores across one or more "
48
+ "PopulationMutationPolicy settings files."
49
+ )
50
+ )
51
+ parser.add_argument(
52
+ "--store-dir",
53
+ default="snapshots",
54
+ help="Snapshot store directory containing <snapshot_id>/spec.json entries.",
55
+ )
56
+ parser.add_argument(
57
+ "--stats",
58
+ help=(
59
+ "Optional JSON/YAML file mapping snapshot_id to runtime stats such as "
60
+ "plays, plays_recent, red_solve_rate, and blue_detect_rate."
61
+ ),
62
+ )
63
+ parser.add_argument(
64
+ "--context",
65
+ help="Optional JSON/YAML file describing the BuildContext to score against.",
66
+ )
67
+ parser.add_argument(
68
+ "--settings",
69
+ action="append",
70
+ default=[],
71
+ help=(
72
+ "Optional policy settings file to compare. Repeatable. Accepts "
73
+ "'label=path' or just 'path'."
74
+ ),
75
+ )
76
+ parser.add_argument(
77
+ "--limit",
78
+ type=int,
79
+ default=5,
80
+ help="How many top-ranked parents to include per policy.",
81
+ )
82
+ args = parser.parse_args(argv)
83
+
84
+ entries = asyncio.run(SnapshotStore(args.store_dir).list_entries())
85
+ if not entries:
86
+ raise SystemExit(f"No stored snapshots found under {args.store_dir}")
87
+
88
+ context = BuildContext.model_validate(_load_object(args.context))
89
+ snapshot_stats = _load_object(args.stats)
90
+
91
+ policies: list[tuple[str, PopulationMutationPolicy]] = [
92
+ ("default", PopulationMutationPolicy()),
93
+ ]
94
+ for item in args.settings:
95
+ label, path = _parse_settings_arg(item)
96
+ policies.append(
97
+ (label, PopulationMutationPolicy(settings=load_mutation_policy_settings(path)))
98
+ )
99
+
100
+ report = {
101
+ "store_dir": str(Path(args.store_dir).resolve()),
102
+ "snapshot_count": len(entries),
103
+ "context": context.model_dump(mode="json"),
104
+ "policies": [],
105
+ }
106
+
107
+ for label, policy in policies:
108
+ ranked = sorted(
109
+ policy.score_parents(
110
+ entries,
111
+ context=context,
112
+ snapshot_stats=snapshot_stats,
113
+ ),
114
+ key=lambda score: score.total,
115
+ reverse=True,
116
+ )[: max(args.limit, 1)]
117
+ report["policies"].append(
118
+ {
119
+ "label": label,
120
+ "profile_name": policy.name,
121
+ "settings": policy.settings_dict(),
122
+ "top_parents": [score.log_payload() for score in ranked],
123
+ }
124
+ )
125
+
126
+ print(json.dumps(report, indent=2, sort_keys=True))
127
+ return 0
128
+
129
+
130
+ if __name__ == "__main__":
131
+ raise SystemExit(main())
src/open_range/builder/mutation_policy.py CHANGED
@@ -1,46 +1,329 @@
1
- """Population-aware parent and mutation selection policy."""
 
 
 
 
 
2
 
3
  from __future__ import annotations
4
 
 
5
  import random
6
  from collections import Counter
7
  from dataclasses import dataclass
 
8
  from typing import Any
9
 
 
 
 
10
  from open_range.protocols import BuildContext, MutationOp, SnapshotSpec
11
  from open_range.validator.graphs import compile_snapshot_graphs
12
 
13
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
14
  @dataclass(frozen=True, slots=True)
15
  class ParentPolicyScore:
16
  snapshot_id: str
17
  total: float
18
- components: dict[str, float]
 
 
 
 
 
 
 
 
 
 
 
19
 
20
 
21
  @dataclass(frozen=True, slots=True)
22
  class MutationChoice:
23
  op: MutationOp
24
  total: float
25
- components: dict[str, float]
 
 
 
 
 
 
 
 
 
 
 
 
26
 
27
 
28
  class PopulationMutationPolicy:
29
- """Simple population-guided policy for parent and op selection.
30
 
31
- This is intentionally heuristic rather than learned. It gives the runtime
32
- an explicit place to score parents and mutation candidates using curriculum,
33
- replay, novelty, and lineage signals instead of relying on raw RNG.
34
- """
35
 
36
- name = "population_guided_v1"
37
- _STRUCTURAL_OPS = {
38
- "add_service",
39
- "add_user",
40
- "add_dependency_edge",
41
- "add_trust_edge",
42
- }
43
- _SECURITY_OPS = {"seed_vuln", "add_benign_noise"}
44
 
45
  def select_parent(
46
  self,
@@ -59,7 +342,7 @@ class PopulationMutationPolicy:
59
  raise ValueError("No parent candidates available")
60
  ordered = sorted(scores, key=lambda score: score.total, reverse=True)
61
  top = ordered[: min(3, len(ordered))]
62
- weights = [max(score.total, 0.05) for score in top]
63
  chosen_score = rng.choices(top, weights=weights, k=1)[0]
64
  chosen_entry = next(
65
  entry for entry in entries if entry.snapshot_id == chosen_score.snapshot_id
@@ -76,6 +359,8 @@ class PopulationMutationPolicy:
76
  if not entries:
77
  return []
78
 
 
 
79
  root_counts = Counter(
80
  entry.snapshot.lineage.root_snapshot_id or entry.snapshot_id
81
  for entry in entries
@@ -95,7 +380,7 @@ class PopulationMutationPolicy:
95
  red_rate = float(stat.get("red_solve_rate", 0.0))
96
  blue_rate = float(stat.get("blue_detect_rate", 0.0))
97
  frontier = (
98
- 0.4
99
  if plays == 0
100
  else (
101
  self._frontier_score(red_rate)
@@ -104,26 +389,32 @@ class PopulationMutationPolicy:
104
  / 2.0
105
  )
106
  replay = 1.0 / (plays + 1.0)
107
- novelty = 1.0 / (
108
- 1.0 + sum(vuln_frequency[vuln] for vuln in vuln_types)
109
- ) if vuln_types else 0.25
 
 
110
  weak_overlap = float(len(vuln_types.intersection(context.weak_areas)))
111
  root_id = snapshot.lineage.root_snapshot_id or entry.snapshot_id
112
  lineage_balance = 1.0 / max(root_counts[root_id], 1)
113
  depth = float(snapshot.lineage.generation_depth)
114
- depth_balance = 1.0 / (1.0 + max(depth - 3.0, 0.0))
 
 
115
  recency = 1.0 / (1.0 + float(stat.get("plays_recent", 0)))
116
  complexity = min(
117
  (
118
- len(snapshot.truth_graph.vulns) * 0.25
119
- + len(snapshot.golden_path) * 0.03
120
- + len(compiled.dependency_edges) * 0.02
121
- + len(compiled.trust_edges) * 0.02
 
 
122
  ),
123
- 1.0,
124
  )
125
 
126
- components = {
127
  "frontier": frontier,
128
  "replay": replay,
129
  "novelty": novelty,
@@ -133,21 +424,18 @@ class PopulationMutationPolicy:
133
  "recency": recency,
134
  "complexity": complexity,
135
  }
136
- total = (
137
- frontier * 0.28
138
- + replay * 0.18
139
- + novelty * 0.16
140
- + weak_overlap * 0.18
141
- + lineage_balance * 0.08
142
- + depth_balance * 0.04
143
- + recency * 0.04
144
- + complexity * 0.04
145
  )
146
  scores.append(
147
  ParentPolicyScore(
148
  snapshot_id=entry.snapshot_id,
149
- total=round(max(total, 0.05), 4),
150
- components={key: round(value, 4) for key, value in components.items()},
 
 
151
  )
152
  )
153
  return scores
@@ -181,7 +469,6 @@ class PopulationMutationPolicy:
181
  if security is not None:
182
  selected.append(security)
183
 
184
- # Best-effort deterministic fallbacks when only one category exists.
185
  if not selected and structural_candidates:
186
  fallback = self._select_candidate(
187
  structural_candidates,
@@ -208,10 +495,10 @@ class PopulationMutationPolicy:
208
  return [], 0.0, {}
209
 
210
  breakdown = {
211
- "curriculum": round(sum(c.components["curriculum"] for c in selected), 4),
212
- "novelty": round(sum(c.components["novelty"] for c in selected), 4),
213
- "structural_gain": round(sum(c.components["structural_gain"] for c in selected), 4),
214
- "lineage": round(sum(c.components["lineage"] for c in selected), 4),
215
  }
216
  total = round(sum(choice.total for choice in selected), 4)
217
  return ops, total, breakdown
@@ -235,7 +522,7 @@ class PopulationMutationPolicy:
235
  if deterministic or len(ranked) == 1:
236
  return ranked[0]
237
  top = ranked[: min(3, len(ranked))]
238
- weights = [max(choice.total, 0.05) for choice in top]
239
  return rng.choices(top, weights=weights, k=1)[0]
240
 
241
  def _rank_candidates(
@@ -247,28 +534,30 @@ class PopulationMutationPolicy:
247
  ) -> list[MutationChoice]:
248
  ranked: list[MutationChoice] = []
249
  existing_vulns = {v.type for v in snapshot.truth_graph.vulns if v.type}
 
250
  for candidate in candidates:
251
  curriculum = self._curriculum_bonus(candidate, context, existing_vulns)
252
  novelty = self._novelty_bonus(candidate, context)
253
  structural_gain = self._structural_gain(candidate)
254
  lineage = 1.0 / (1.0 + snapshot.lineage.generation_depth)
255
- components = {
256
  "curriculum": curriculum,
257
  "novelty": novelty,
258
  "structural_gain": structural_gain,
259
  "lineage": lineage,
260
  }
261
- total = (
262
- curriculum * 0.38
263
- + novelty * 0.24
264
- + structural_gain * 0.28
265
- + lineage * 0.10
266
  )
267
  ranked.append(
268
  MutationChoice(
269
  op=candidate,
270
- total=round(max(total, 0.05), 4),
271
- components={key: round(value, 4) for key, value in components.items()},
 
 
272
  )
273
  )
274
  ranked.sort(key=lambda choice: choice.total, reverse=True)
@@ -278,52 +567,58 @@ class PopulationMutationPolicy:
278
  def _frontier_score(rate: float) -> float:
279
  return max(0.0, 1.0 - abs(rate - 0.5) * 2.0)
280
 
281
- @staticmethod
282
- def _structural_gain(op: MutationOp) -> float:
283
- mapping = {
284
- "add_service": 1.0,
285
- "add_dependency_edge": 0.9,
286
- "add_trust_edge": 0.85,
287
- "add_user": 0.8,
288
- "seed_vuln": 0.7,
289
- "add_benign_noise": 0.3,
290
- }
291
- return mapping.get(op.op_type, 0.2) * max(op.magnitude, 1)
292
 
293
- @staticmethod
294
- def _novelty_bonus(op: MutationOp, context: BuildContext) -> float:
295
- bonus = 0.4
296
  if op.op_type == "seed_vuln":
297
  vuln_type = str(op.params.get("vuln_type", "")).strip()
298
  if vuln_type and vuln_type not in context.previous_vuln_classes:
299
- bonus += 1.0
300
  if op.op_type == "add_benign_noise":
301
  location = str(op.params.get("location", "")).strip()
302
  if location and location not in context.recent_attack_surfaces:
303
- bonus += 0.5
304
  if op.op_type not in {"seed_vuln", "add_benign_noise"}:
305
- bonus += 0.4
306
  return bonus
307
 
308
- @staticmethod
309
  def _curriculum_bonus(
 
310
  op: MutationOp,
311
  context: BuildContext,
312
  existing_vulns: set[str],
313
  ) -> float:
314
- bonus = 0.35
 
315
  if op.op_type == "seed_vuln":
316
  vuln_type = str(op.params.get("vuln_type", "")).strip()
317
  if vuln_type in context.weak_areas:
318
- bonus += 1.5
319
  if vuln_type and vuln_type not in existing_vulns:
320
- bonus += 0.4
321
  if op.op_type in {"add_dependency_edge", "add_trust_edge"} and context.require_chain_length > 1:
322
- bonus += 0.6
323
  if context.focus_layer == "identity" and op.op_type in {"add_user", "add_trust_edge"}:
324
- bonus += 0.5
325
  if context.focus_layer == "infra" and op.op_type in {"add_service", "add_dependency_edge"}:
326
- bonus += 0.5
327
  if context.focus_layer == "process" and op.op_type == "add_benign_noise":
328
- bonus += 0.4
329
  return bonus
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Population-aware parent and mutation selection policy.
2
+
3
+ The scoring settings live in :class:`MutationPolicySettings` so the runtime can
4
+ audit, tune, and swap heuristic weight sets without rewriting policy logic.
5
+ See ``docs/mutation_policy.md`` and ``scripts/calibrate_mutation_policy.py``.
6
+ """
7
 
8
  from __future__ import annotations
9
 
10
+ import json
11
  import random
12
  from collections import Counter
13
  from dataclasses import dataclass
14
+ from pathlib import Path
15
  from typing import Any
16
 
17
+ import yaml
18
+ from pydantic import BaseModel, ConfigDict, Field
19
+
20
  from open_range.protocols import BuildContext, MutationOp, SnapshotSpec
21
  from open_range.validator.graphs import compile_snapshot_graphs
22
 
23
 
24
+ class ParentScoreSettings(BaseModel):
25
+ """Weights and shaping constants for parent selection.
26
+
27
+ Each ``*_weight`` field controls how much that signal contributes to the
28
+ final parent score. The remaining fields shape the raw signals before the
29
+ weighted sum is applied.
30
+ """
31
+
32
+ model_config = ConfigDict(extra="forbid")
33
+
34
+ frontier_weight: float = Field(
35
+ default=0.28,
36
+ description="Prefer snapshots near the current red/blue frontier.",
37
+ )
38
+ replay_weight: float = Field(
39
+ default=0.18,
40
+ description="Prefer under-played snapshots so the curriculum keeps exploring.",
41
+ )
42
+ novelty_weight: float = Field(
43
+ default=0.16,
44
+ description="Prefer rarer vulnerability mixes in the stored population.",
45
+ )
46
+ weak_overlap_weight: float = Field(
47
+ default=0.18,
48
+ description="Prefer parents that overlap the curriculum's known weak areas.",
49
+ )
50
+ lineage_balance_weight: float = Field(
51
+ default=0.08,
52
+ description="Avoid over-sampling a single root lineage.",
53
+ )
54
+ depth_balance_weight: float = Field(
55
+ default=0.04,
56
+ description="Prevent deep descendant chains from dominating parent choice.",
57
+ )
58
+ recency_weight: float = Field(
59
+ default=0.04,
60
+ description="De-prioritize parents used repeatedly in the recent window.",
61
+ )
62
+ complexity_weight: float = Field(
63
+ default=0.04,
64
+ description="Slightly prefer parents with richer structure to mutate from.",
65
+ )
66
+ minimum_total: float = Field(
67
+ default=0.05,
68
+ description="Lower bound used when sampling among low-scoring parents.",
69
+ )
70
+ unplayed_frontier_score: float = Field(
71
+ default=0.40,
72
+ description="Frontier score used before any play statistics exist.",
73
+ )
74
+ empty_vuln_novelty_score: float = Field(
75
+ default=0.25,
76
+ description="Novelty fallback for snapshots with no typed vulnerabilities.",
77
+ )
78
+ preferred_generation_depth: float = Field(
79
+ default=3.0,
80
+ description="Depth after which descendants start incurring a balance penalty.",
81
+ )
82
+ complexity_vuln_factor: float = Field(
83
+ default=0.25,
84
+ description="Complexity contribution per planted vulnerability.",
85
+ )
86
+ complexity_golden_path_factor: float = Field(
87
+ default=0.03,
88
+ description="Complexity contribution per golden-path step.",
89
+ )
90
+ complexity_dependency_edge_factor: float = Field(
91
+ default=0.02,
92
+ description="Complexity contribution per dependency edge.",
93
+ )
94
+ complexity_trust_edge_factor: float = Field(
95
+ default=0.02,
96
+ description="Complexity contribution per trust edge.",
97
+ )
98
+ complexity_cap: float = Field(
99
+ default=1.0,
100
+ description="Upper bound for the normalized complexity signal.",
101
+ )
102
+
103
+ def weights(self) -> dict[str, float]:
104
+ return {
105
+ "frontier": self.frontier_weight,
106
+ "replay": self.replay_weight,
107
+ "novelty": self.novelty_weight,
108
+ "weak_overlap": self.weak_overlap_weight,
109
+ "lineage_balance": self.lineage_balance_weight,
110
+ "depth_balance": self.depth_balance_weight,
111
+ "recency": self.recency_weight,
112
+ "complexity": self.complexity_weight,
113
+ }
114
+
115
+
116
+ class MutationScoreSettings(BaseModel):
117
+ """Weights and sampling floor for mutation-op choice."""
118
+
119
+ model_config = ConfigDict(extra="forbid")
120
+
121
+ curriculum_weight: float = Field(
122
+ default=0.38,
123
+ description="Bias toward ops that target the current curriculum weakness.",
124
+ )
125
+ novelty_weight: float = Field(
126
+ default=0.24,
127
+ description="Bias toward ops that open new exploit surfaces.",
128
+ )
129
+ structural_gain_weight: float = Field(
130
+ default=0.28,
131
+ description="Bias toward ops that materially expand the scenario graph.",
132
+ )
133
+ lineage_weight: float = Field(
134
+ default=0.10,
135
+ description="Slightly favor mutations closer to the root lineage.",
136
+ )
137
+ minimum_total: float = Field(
138
+ default=0.05,
139
+ description="Lower bound used when sampling among low-scoring ops.",
140
+ )
141
+
142
+ def weights(self) -> dict[str, float]:
143
+ return {
144
+ "curriculum": self.curriculum_weight,
145
+ "novelty": self.novelty_weight,
146
+ "structural_gain": self.structural_gain_weight,
147
+ "lineage": self.lineage_weight,
148
+ }
149
+
150
+
151
+ class NoveltyBonusSettings(BaseModel):
152
+ """Raw novelty bonuses applied before mutation weighting."""
153
+
154
+ model_config = ConfigDict(extra="forbid")
155
+
156
+ base_bonus: float = Field(
157
+ default=0.40,
158
+ description="Baseline novelty score for every candidate mutation.",
159
+ )
160
+ new_vuln_class_bonus: float = Field(
161
+ default=1.0,
162
+ description="Bonus when seeding a vulnerability class not seen recently.",
163
+ )
164
+ new_noise_surface_bonus: float = Field(
165
+ default=0.50,
166
+ description="Bonus when benign noise targets a new recent surface.",
167
+ )
168
+ structural_op_bonus: float = Field(
169
+ default=0.40,
170
+ description="Bonus for non-security ops that expand the topology or process graph.",
171
+ )
172
+
173
+
174
+ class CurriculumBonusSettings(BaseModel):
175
+ """Raw curriculum bonuses applied before mutation weighting."""
176
+
177
+ model_config = ConfigDict(extra="forbid")
178
+
179
+ base_bonus: float = Field(
180
+ default=0.35,
181
+ description="Baseline curriculum score for every candidate mutation.",
182
+ )
183
+ weak_area_bonus: float = Field(
184
+ default=1.50,
185
+ description="Bonus when a seeded vulnerability matches a weak area.",
186
+ )
187
+ new_vuln_bonus: float = Field(
188
+ default=0.40,
189
+ description="Bonus when a seeded vulnerability is new to this parent snapshot.",
190
+ )
191
+ chain_length_bonus: float = Field(
192
+ default=0.60,
193
+ description="Bonus for dependency/trust edges when longer exploit chains are required.",
194
+ )
195
+ focus_identity_bonus: float = Field(
196
+ default=0.50,
197
+ description="Bonus for identity-layer ops when curriculum focus is identity.",
198
+ )
199
+ focus_infra_bonus: float = Field(
200
+ default=0.50,
201
+ description="Bonus for infra-layer ops when curriculum focus is infra.",
202
+ )
203
+ focus_process_bonus: float = Field(
204
+ default=0.40,
205
+ description="Bonus for benign-noise ops when curriculum focus is process realism.",
206
+ )
207
+
208
+
209
+ class StructuralGainSettings(BaseModel):
210
+ """Normalized gain assigned to each mutation op type before weighting."""
211
+
212
+ model_config = ConfigDict(extra="forbid")
213
+
214
+ add_service: float = Field(
215
+ default=1.0,
216
+ description="Largest structural gain: introduces a new service node.",
217
+ )
218
+ add_dependency_edge: float = Field(
219
+ default=0.90,
220
+ description="High structural gain: adds an application/service dependency edge.",
221
+ )
222
+ add_trust_edge: float = Field(
223
+ default=0.85,
224
+ description="High structural gain: adds an identity or trust relationship.",
225
+ )
226
+ add_user: float = Field(
227
+ default=0.80,
228
+ description="Moderate structural gain: introduces a new principal into the graph.",
229
+ )
230
+ seed_vuln: float = Field(
231
+ default=0.70,
232
+ description="Security gain without changing topology shape dramatically.",
233
+ )
234
+ add_benign_noise: float = Field(
235
+ default=0.30,
236
+ description="Low structural gain: improves realism and observability noise.",
237
+ )
238
+ default_gain: float = Field(
239
+ default=0.20,
240
+ description="Fallback gain for unknown mutation op types.",
241
+ )
242
+
243
+ def gain_for(self, op_type: str) -> float:
244
+ mapping = self.model_dump(exclude={"default_gain"})
245
+ return float(mapping.get(op_type, self.default_gain))
246
+
247
+
248
+ class MutationPolicySettings(BaseModel):
249
+ """Complete settings model for :class:`PopulationMutationPolicy`."""
250
+
251
+ model_config = ConfigDict(extra="forbid")
252
+
253
+ profile_name: str = Field(
254
+ default="population_guided_v1",
255
+ description="Human-readable policy profile name used in logs and metadata.",
256
+ )
257
+ parent: ParentScoreSettings = Field(default_factory=ParentScoreSettings)
258
+ mutation: MutationScoreSettings = Field(default_factory=MutationScoreSettings)
259
+ novelty: NoveltyBonusSettings = Field(default_factory=NoveltyBonusSettings)
260
+ curriculum: CurriculumBonusSettings = Field(default_factory=CurriculumBonusSettings)
261
+ structural_gains: StructuralGainSettings = Field(default_factory=StructuralGainSettings)
262
+
263
+
264
+ def load_mutation_policy_settings(path: str | Path) -> MutationPolicySettings:
265
+ """Load policy settings from JSON or YAML."""
266
+ settings_path = Path(path)
267
+ raw_text = settings_path.read_text(encoding="utf-8")
268
+ if settings_path.suffix.lower() in {".yaml", ".yml"}:
269
+ payload = yaml.safe_load(raw_text) or {}
270
+ else:
271
+ payload = json.loads(raw_text)
272
+ if not isinstance(payload, dict):
273
+ raise ValueError(f"settings file must decode to an object: {settings_path}")
274
+ return MutationPolicySettings.model_validate(payload)
275
+
276
+
277
  @dataclass(frozen=True, slots=True)
278
  class ParentPolicyScore:
279
  snapshot_id: str
280
  total: float
281
+ signals: dict[str, float]
282
+ weights: dict[str, float]
283
+ contributions: dict[str, float]
284
+
285
+ def log_payload(self) -> dict[str, Any]:
286
+ return {
287
+ "snapshot_id": self.snapshot_id,
288
+ "total": self.total,
289
+ "signals": self.signals,
290
+ "weights": self.weights,
291
+ "contributions": self.contributions,
292
+ }
293
 
294
 
295
  @dataclass(frozen=True, slots=True)
296
  class MutationChoice:
297
  op: MutationOp
298
  total: float
299
+ signals: dict[str, float]
300
+ weights: dict[str, float]
301
+ contributions: dict[str, float]
302
+
303
+ def log_payload(self) -> dict[str, Any]:
304
+ return {
305
+ "mutation_id": self.op.mutation_id,
306
+ "op_type": self.op.op_type,
307
+ "total": self.total,
308
+ "signals": self.signals,
309
+ "weights": self.weights,
310
+ "contributions": self.contributions,
311
+ }
312
 
313
 
314
  class PopulationMutationPolicy:
315
+ """Population-guided policy with explicit, swappable scoring settings."""
316
 
317
+ def __init__(self, settings: MutationPolicySettings | None = None) -> None:
318
+ self.settings = settings or MutationPolicySettings()
 
 
319
 
320
+ @property
321
+ def name(self) -> str:
322
+ return self.settings.profile_name
323
+
324
+ def settings_dict(self) -> dict[str, Any]:
325
+ """Return the active settings as a plain dict for logging or serialization."""
326
+ return self.settings.model_dump(mode="json")
 
327
 
328
  def select_parent(
329
  self,
 
342
  raise ValueError("No parent candidates available")
343
  ordered = sorted(scores, key=lambda score: score.total, reverse=True)
344
  top = ordered[: min(3, len(ordered))]
345
+ weights = [max(score.total, self.settings.parent.minimum_total) for score in top]
346
  chosen_score = rng.choices(top, weights=weights, k=1)[0]
347
  chosen_entry = next(
348
  entry for entry in entries if entry.snapshot_id == chosen_score.snapshot_id
 
359
  if not entries:
360
  return []
361
 
362
+ parent_settings = self.settings.parent
363
+ parent_weights = parent_settings.weights()
364
  root_counts = Counter(
365
  entry.snapshot.lineage.root_snapshot_id or entry.snapshot_id
366
  for entry in entries
 
380
  red_rate = float(stat.get("red_solve_rate", 0.0))
381
  blue_rate = float(stat.get("blue_detect_rate", 0.0))
382
  frontier = (
383
+ parent_settings.unplayed_frontier_score
384
  if plays == 0
385
  else (
386
  self._frontier_score(red_rate)
 
389
  / 2.0
390
  )
391
  replay = 1.0 / (plays + 1.0)
392
+ novelty = (
393
+ 1.0 / (1.0 + sum(vuln_frequency[vuln] for vuln in vuln_types))
394
+ if vuln_types
395
+ else parent_settings.empty_vuln_novelty_score
396
+ )
397
  weak_overlap = float(len(vuln_types.intersection(context.weak_areas)))
398
  root_id = snapshot.lineage.root_snapshot_id or entry.snapshot_id
399
  lineage_balance = 1.0 / max(root_counts[root_id], 1)
400
  depth = float(snapshot.lineage.generation_depth)
401
+ depth_balance = 1.0 / (
402
+ 1.0 + max(depth - parent_settings.preferred_generation_depth, 0.0)
403
+ )
404
  recency = 1.0 / (1.0 + float(stat.get("plays_recent", 0)))
405
  complexity = min(
406
  (
407
+ len(snapshot.truth_graph.vulns) * parent_settings.complexity_vuln_factor
408
+ + len(snapshot.golden_path) * parent_settings.complexity_golden_path_factor
409
+ + len(compiled.dependency_edges)
410
+ * parent_settings.complexity_dependency_edge_factor
411
+ + len(compiled.trust_edges)
412
+ * parent_settings.complexity_trust_edge_factor
413
  ),
414
+ parent_settings.complexity_cap,
415
  )
416
 
417
+ signals = {
418
  "frontier": frontier,
419
  "replay": replay,
420
  "novelty": novelty,
 
424
  "recency": recency,
425
  "complexity": complexity,
426
  }
427
+ contributions = self._weighted_contributions(signals, parent_weights)
428
+ total = round(
429
+ max(sum(contributions.values()), parent_settings.minimum_total),
430
+ 4,
 
 
 
 
 
431
  )
432
  scores.append(
433
  ParentPolicyScore(
434
  snapshot_id=entry.snapshot_id,
435
+ total=total,
436
+ signals=self._round_dict(signals),
437
+ weights=self._round_dict(parent_weights),
438
+ contributions=self._round_dict(contributions),
439
  )
440
  )
441
  return scores
 
469
  if security is not None:
470
  selected.append(security)
471
 
 
472
  if not selected and structural_candidates:
473
  fallback = self._select_candidate(
474
  structural_candidates,
 
495
  return [], 0.0, {}
496
 
497
  breakdown = {
498
+ "curriculum": round(sum(c.contributions["curriculum"] for c in selected), 4),
499
+ "novelty": round(sum(c.contributions["novelty"] for c in selected), 4),
500
+ "structural_gain": round(sum(c.contributions["structural_gain"] for c in selected), 4),
501
+ "lineage": round(sum(c.contributions["lineage"] for c in selected), 4),
502
  }
503
  total = round(sum(choice.total for choice in selected), 4)
504
  return ops, total, breakdown
 
522
  if deterministic or len(ranked) == 1:
523
  return ranked[0]
524
  top = ranked[: min(3, len(ranked))]
525
+ weights = [max(choice.total, self.settings.mutation.minimum_total) for choice in top]
526
  return rng.choices(top, weights=weights, k=1)[0]
527
 
528
  def _rank_candidates(
 
534
  ) -> list[MutationChoice]:
535
  ranked: list[MutationChoice] = []
536
  existing_vulns = {v.type for v in snapshot.truth_graph.vulns if v.type}
537
+ mutation_weights = self.settings.mutation.weights()
538
  for candidate in candidates:
539
  curriculum = self._curriculum_bonus(candidate, context, existing_vulns)
540
  novelty = self._novelty_bonus(candidate, context)
541
  structural_gain = self._structural_gain(candidate)
542
  lineage = 1.0 / (1.0 + snapshot.lineage.generation_depth)
543
+ signals = {
544
  "curriculum": curriculum,
545
  "novelty": novelty,
546
  "structural_gain": structural_gain,
547
  "lineage": lineage,
548
  }
549
+ contributions = self._weighted_contributions(signals, mutation_weights)
550
+ total = round(
551
+ max(sum(contributions.values()), self.settings.mutation.minimum_total),
552
+ 4,
 
553
  )
554
  ranked.append(
555
  MutationChoice(
556
  op=candidate,
557
+ total=total,
558
+ signals=self._round_dict(signals),
559
+ weights=self._round_dict(mutation_weights),
560
+ contributions=self._round_dict(contributions),
561
  )
562
  )
563
  ranked.sort(key=lambda choice: choice.total, reverse=True)
 
567
  def _frontier_score(rate: float) -> float:
568
  return max(0.0, 1.0 - abs(rate - 0.5) * 2.0)
569
 
570
+ def _structural_gain(self, op: MutationOp) -> float:
571
+ return self.settings.structural_gains.gain_for(op.op_type) * max(op.magnitude, 1)
 
 
 
 
 
 
 
 
 
572
 
573
+ def _novelty_bonus(self, op: MutationOp, context: BuildContext) -> float:
574
+ novelty = self.settings.novelty
575
+ bonus = novelty.base_bonus
576
  if op.op_type == "seed_vuln":
577
  vuln_type = str(op.params.get("vuln_type", "")).strip()
578
  if vuln_type and vuln_type not in context.previous_vuln_classes:
579
+ bonus += novelty.new_vuln_class_bonus
580
  if op.op_type == "add_benign_noise":
581
  location = str(op.params.get("location", "")).strip()
582
  if location and location not in context.recent_attack_surfaces:
583
+ bonus += novelty.new_noise_surface_bonus
584
  if op.op_type not in {"seed_vuln", "add_benign_noise"}:
585
+ bonus += novelty.structural_op_bonus
586
  return bonus
587
 
 
588
  def _curriculum_bonus(
589
+ self,
590
  op: MutationOp,
591
  context: BuildContext,
592
  existing_vulns: set[str],
593
  ) -> float:
594
+ curriculum = self.settings.curriculum
595
+ bonus = curriculum.base_bonus
596
  if op.op_type == "seed_vuln":
597
  vuln_type = str(op.params.get("vuln_type", "")).strip()
598
  if vuln_type in context.weak_areas:
599
+ bonus += curriculum.weak_area_bonus
600
  if vuln_type and vuln_type not in existing_vulns:
601
+ bonus += curriculum.new_vuln_bonus
602
  if op.op_type in {"add_dependency_edge", "add_trust_edge"} and context.require_chain_length > 1:
603
+ bonus += curriculum.chain_length_bonus
604
  if context.focus_layer == "identity" and op.op_type in {"add_user", "add_trust_edge"}:
605
+ bonus += curriculum.focus_identity_bonus
606
  if context.focus_layer == "infra" and op.op_type in {"add_service", "add_dependency_edge"}:
607
+ bonus += curriculum.focus_infra_bonus
608
  if context.focus_layer == "process" and op.op_type == "add_benign_noise":
609
+ bonus += curriculum.focus_process_bonus
610
  return bonus
611
+
612
+ @staticmethod
613
+ def _weighted_contributions(
614
+ signals: dict[str, float],
615
+ weights: dict[str, float],
616
+ ) -> dict[str, float]:
617
+ return {
618
+ name: float(signals.get(name, 0.0)) * float(weight)
619
+ for name, weight in weights.items()
620
+ }
621
+
622
+ @staticmethod
623
+ def _round_dict(values: dict[str, float]) -> dict[str, float]:
624
+ return {key: round(float(value), 4) for key, value in values.items()}
src/open_range/builder/mutator.py CHANGED
@@ -344,6 +344,14 @@ class Mutator:
344
  context=context,
345
  rng=rng,
346
  )
 
 
 
 
 
 
 
 
347
 
348
  if not ops:
349
  fallback = self._candidate_add_benign_noise(snapshot, rng)
 
344
  context=context,
345
  rng=rng,
346
  )
347
+ if ops:
348
+ logger.info(
349
+ "Mutator policy %s chose ops=%s score=%.3f breakdown=%s",
350
+ self.policy.name,
351
+ [op.mutation_id for op in ops],
352
+ policy_score,
353
+ score_breakdown,
354
+ )
355
 
356
  if not ops:
357
  fallback = self._candidate_add_benign_noise(snapshot, rng)
src/open_range/server/runtime.py CHANGED
@@ -1139,11 +1139,10 @@ class ManagedSnapshotRuntime:
1139
  rng=rng,
1140
  )
1141
  logger.info(
1142
- "ManagedSnapshotRuntime selected parent %s via %s (score=%.3f components=%s)",
1143
  selected.snapshot_id,
1144
  self.mutation_policy.name,
1145
- score.total,
1146
- score.components,
1147
  )
1148
  return selected
1149
 
 
1139
  rng=rng,
1140
  )
1141
  logger.info(
1142
+ "ManagedSnapshotRuntime selected parent %s via %s %s",
1143
  selected.snapshot_id,
1144
  self.mutation_policy.name,
1145
+ json.dumps(score.log_payload(), sort_keys=True),
 
1146
  )
1147
  return selected
1148
 
tests/test_mutation_policy.py CHANGED
@@ -1,8 +1,24 @@
1
  """Tests for population-guided mutation selection policy."""
2
 
 
 
 
 
 
3
  import random
 
 
 
 
 
 
4
 
5
- from open_range.builder.mutation_policy import PopulationMutationPolicy
 
 
 
 
 
6
  from open_range.protocols import BuildContext, MutationOp
7
 
8
 
@@ -100,3 +116,163 @@ def test_policy_best_effort_when_only_structural_available(sample_snapshot_spec)
100
 
101
  assert len(ops) == 1
102
  assert ops[0].op_type in {"add_trust_edge", "add_dependency_edge"}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  """Tests for population-guided mutation selection policy."""
2
 
3
+ from __future__ import annotations
4
+
5
+ import asyncio
6
+ import json
7
+ import os
8
  import random
9
+ import subprocess
10
+ import sys
11
+ from pathlib import Path
12
+ from types import SimpleNamespace
13
+
14
+ import pytest
15
 
16
+ from open_range.builder.mutation_policy import (
17
+ MutationPolicySettings,
18
+ PopulationMutationPolicy,
19
+ load_mutation_policy_settings,
20
+ )
21
+ from open_range.builder.snapshot_store import SnapshotStore
22
  from open_range.protocols import BuildContext, MutationOp
23
 
24
 
 
116
 
117
  assert len(ops) == 1
118
  assert ops[0].op_type in {"add_trust_edge", "add_dependency_edge"}
119
+
120
+
121
+ def test_load_policy_settings_from_yaml(tmp_path: Path):
122
+ settings_path = tmp_path / "policy.yaml"
123
+ settings_path.write_text(
124
+ "\n".join(
125
+ [
126
+ "profile_name: tuned_policy",
127
+ "parent:",
128
+ " frontier_weight: 0.5",
129
+ "mutation:",
130
+ " structural_gain_weight: 0.6",
131
+ ]
132
+ ),
133
+ encoding="utf-8",
134
+ )
135
+
136
+ settings = load_mutation_policy_settings(settings_path)
137
+
138
+ assert settings.profile_name == "tuned_policy"
139
+ assert settings.parent.frontier_weight == 0.5
140
+ assert settings.mutation.structural_gain_weight == 0.6
141
+ assert settings.structural_gains.add_service == 1.0
142
+
143
+
144
+ def test_parent_scores_expose_weighted_contributions(sample_snapshot_spec):
145
+ policy = PopulationMutationPolicy()
146
+ snapshot = sample_snapshot_spec.model_copy(deep=True)
147
+ snapshot.lineage.root_snapshot_id = "root_a"
148
+ entry = SimpleNamespace(snapshot_id="snap_a", snapshot=snapshot)
149
+
150
+ score = policy.score_parents(
151
+ [entry],
152
+ context=BuildContext(seed=1, tier=1, weak_areas=["sqli"]),
153
+ snapshot_stats={
154
+ "snap_a": {
155
+ "plays": 2,
156
+ "plays_recent": 1,
157
+ "red_solve_rate": 0.5,
158
+ "blue_detect_rate": 0.25,
159
+ }
160
+ },
161
+ )[0]
162
+
163
+ assert score.weights["frontier"] == pytest.approx(
164
+ policy.settings.parent.frontier_weight
165
+ )
166
+ assert score.contributions["frontier"] == pytest.approx(
167
+ score.signals["frontier"] * score.weights["frontier"],
168
+ rel=1e-3,
169
+ )
170
+ assert score.total == pytest.approx(sum(score.contributions.values()), rel=1e-3)
171
+
172
+
173
+ def test_custom_settings_change_candidate_ranking(sample_snapshot_spec):
174
+ settings = MutationPolicySettings(
175
+ profile_name="structural_gain_only",
176
+ mutation={
177
+ "curriculum_weight": 0.0,
178
+ "novelty_weight": 0.0,
179
+ "structural_gain_weight": 1.0,
180
+ "lineage_weight": 0.0,
181
+ },
182
+ structural_gains={
183
+ "add_service": 0.2,
184
+ "add_dependency_edge": 0.2,
185
+ "add_trust_edge": 0.2,
186
+ "add_user": 0.2,
187
+ "seed_vuln": 0.1,
188
+ "add_benign_noise": 2.5,
189
+ "default_gain": 0.0,
190
+ },
191
+ )
192
+ policy = PopulationMutationPolicy(settings=settings)
193
+ ranked = policy._rank_candidates(
194
+ [
195
+ MutationOp(
196
+ mutation_id="seed_sqli",
197
+ op_type="seed_vuln",
198
+ target_selector={"host": "web"},
199
+ params={"vuln_type": "sqli"},
200
+ ),
201
+ MutationOp(
202
+ mutation_id="noise_1",
203
+ op_type="add_benign_noise",
204
+ target_selector={"location": "siem:noise.log"},
205
+ params={"location": "siem:noise.log"},
206
+ ),
207
+ ],
208
+ snapshot=sample_snapshot_spec,
209
+ context=BuildContext(seed=1, tier=1),
210
+ )
211
+
212
+ assert ranked[0].op.op_type == "add_benign_noise"
213
+ assert ranked[0].contributions["structural_gain"] == pytest.approx(
214
+ ranked[0].total,
215
+ rel=1e-3,
216
+ )
217
+
218
+
219
+ def test_calibration_script_compares_default_and_custom_settings(
220
+ tmp_path: Path,
221
+ sample_snapshot_spec,
222
+ ):
223
+ store_dir = tmp_path / "snapshots"
224
+ asyncio.run(SnapshotStore(str(store_dir)).store(sample_snapshot_spec, "snap_demo"))
225
+
226
+ stats_path = tmp_path / "snapshot_stats.json"
227
+ stats_path.write_text(
228
+ json.dumps(
229
+ {
230
+ "snap_demo": {
231
+ "plays": 3,
232
+ "plays_recent": 1,
233
+ "red_solve_rate": 0.5,
234
+ "blue_detect_rate": 0.0,
235
+ }
236
+ }
237
+ ),
238
+ encoding="utf-8",
239
+ )
240
+ context_path = tmp_path / "context.json"
241
+ context_path.write_text(
242
+ BuildContext(seed=7, tier=2, weak_areas=["sqli"]).model_dump_json(indent=2),
243
+ encoding="utf-8",
244
+ )
245
+ settings_path = tmp_path / "tuned.json"
246
+ settings_path.write_text(
247
+ MutationPolicySettings(
248
+ profile_name="tuned",
249
+ parent={"frontier_weight": 0.5},
250
+ ).model_dump_json(indent=2),
251
+ encoding="utf-8",
252
+ )
253
+
254
+ result = subprocess.run(
255
+ [
256
+ sys.executable,
257
+ "scripts/calibrate_mutation_policy.py",
258
+ "--store-dir",
259
+ str(store_dir),
260
+ "--stats",
261
+ str(stats_path),
262
+ "--context",
263
+ str(context_path),
264
+ "--settings",
265
+ f"tuned={settings_path}",
266
+ ],
267
+ capture_output=True,
268
+ check=False,
269
+ cwd=Path(__file__).resolve().parents[1],
270
+ env={**os.environ, "PYTHONPATH": "src"},
271
+ text=True,
272
+ )
273
+
274
+ assert result.returncode == 0, result.stderr
275
+ payload = json.loads(result.stdout)
276
+ assert payload["snapshot_count"] == 1
277
+ assert [policy["label"] for policy in payload["policies"]] == ["default", "tuned"]
278
+ assert payload["policies"][0]["top_parents"][0]["snapshot_id"] == "snap_demo"