# scripts/guardrails.py from __future__ import annotations import json import re from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Set def _read_json(path: str) -> Dict[str, Any]: return json.loads(Path(path).read_text(encoding="utf-8")) class TargetRegistry: def __init__(self, data: Dict[str, Any]): self.data = data or {} self.canonical = self.data.get("canonical", {}) self.points = self.data.get("points", {}) self.synonyms = self.data.get("synonyms", {}) self.mim_default_by_mg = self.data.get("mim_default_by_mg", {}) # Build quick lookup of allowed devices per mg (union of all groups) self.allowed_by_mg: Dict[str, Set[str]] = {} for group, mg_map in self.canonical.items(): for mg, devices in mg_map.items(): self.allowed_by_mg.setdefault(mg, set()).update(devices) # Flatten device synonyms self.dev_syn = self.synonyms.get("device", {}) self.point_syn = self.synonyms.get("point", {}) self.mim_syn = self.synonyms.get("mim", {}) @classmethod def from_json(cls, path: str) -> "TargetRegistry": return cls(_read_json(path)) def allowed_devices(self, mg: Optional[str]) -> Set[str]: if not mg: return set() return self.allowed_by_mg.get(mg, set()) def canonicalize_mim(self, mim: Optional[str]) -> Optional[str]: if mim is None: return None m = self.mim_syn.get(mim, mim) m = m.upper() if re.fullmatch(r"MIM[1-4]", m): return m return None # unknown MIM label def canonicalize_point(self, point: str) -> str: return self.point_syn.get(point, point) def canonicalize_device(self, dev: str, mg: Optional[str]) -> str: # direct synonym dev2 = self.dev_syn.get(dev, dev) # If device has a dot (like mg1.cap_01), squash using mg+core if "." in dev2 and mg: core = dev2.split(".")[-1] candidate = f"{mg}{core}" # Try mg-prefixed candidate first if candidate in self.allowed_devices(mg): return candidate # Try synonym mapping of the core dev3 = self.dev_syn.get(core, core) candidate2 = f"{mg}{dev3}" if candidate2 in self.allowed_devices(mg): return candidate2 # If device lacks mg prefix but mg is present, and mg+dev is allowed, use it if mg and not dev2.startswith(mg): cand = f"{mg}{dev2}" if cand in self.allowed_devices(mg): return cand # If already allowed, keep it if mg and dev2 in self.allowed_devices(mg): return dev2 # Fall back to synonym again (idempotent) or original return dev2 def pick_default_mim(self, mg: Optional[str]) -> Optional[str]: if mg is None: return None return self.mim_default_by_mg.get(mg) def augment_prompt_with_allowlist(instruction: str, reg: TargetRegistry) -> str: # Append a short allow-list hint (kept compact) def bucket_preview(mg: str) -> str: items = sorted(list(reg.allowed_devices(mg)))[:8] return f"{mg}: {', '.join(items)}" if items else "" hints = [] for mg in ("mg1", "mg2", "mg3", "substation", "unmapped"): s = bucket_preview(mg) if s: hints.append(s) if hints: instruction += ( "\nAllowed device names (examples):\n" + "\n".join(hints) + "\nUse exactly one dot in `name`: [optional MIM]..\n" ) return instruction def _parse_name(name: str) -> Tuple[Optional[str], Optional[str], Optional[str]]: # Accepts: # MIM1.device.point # device.point # If multiple dots exist (bad), we capture best-effort pieces. parts = name.split(".") if len(parts) >= 3 and re.fullmatch(r"MIM[1-4]", parts[0]): mim = parts[0] point = parts[-1] device = ".".join(parts[1:-1]) # may contain dots → caller will clean return mim, device, point elif len(parts) >= 2: point = parts[-1] device = ".".join(parts[:-1]) # may contain dots → caller will clean return None, device, point return None, None, None def _build_name(mim: Optional[str], device: str, point: str) -> str: if mim: return f"{mim}.{device}.{point}" return f"{device}.{point}" def _is_switch_point(point: str) -> bool: return point in {"status", "switchA", "switchB", "switchC"} def _normalize_openclose(item: Dict[str, Any]) -> None: op = item.get("op") point = item.get("point") if op in {"open", "close"} or _is_switch_point(point): # Normalize values to uppercase OPEN/CLOSED av = item.get("attack_value") rv = item.get("real_value") if isinstance(av, str): item["attack_value"] = av.upper() if isinstance(rv, str): item["real_value"] = rv.upper() # If op=open and attack_value missing → OPEN if op == "open" and not item.get("attack_value"): item["attack_value"] = "OPEN" if op == "close" and not item.get("attack_value"): item["attack_value"] = "CLOSED" # Default real_value if absent (reasonable default) if not item.get("real_value"): item["real_value"] = "CLOSED" if item["attack_value"] == "OPEN" else "OPEN" def validate_and_fix_attackplan( ap: Dict[str, Any], reg: TargetRegistry, strict: bool = False, autofix: bool = True, cutoff: float = 0.92, # unused, kept for interface stability ) -> Tuple[Dict[str, Any], List[str]]: notes: List[str] = [] if not isinstance(ap, dict): notes.append("attack plan is not a dict") return ap, notes # Fix top-level mim.selected using first item's scope if needed plan: List[Dict[str, Any]] = ap.get("plan") or [] if plan: scope0 = plan[0].get("scope", {}) mg0 = scope0.get("mg") mim0 = scope0.get("mim") mim0 = reg.canonicalize_mim(mim0) or reg.pick_default_mim(mg0) # Constrain selected to a single MIM when we have one if mim0: ap.setdefault("mim", {}) ap["mim"]["active"] = True ap["mim"]["selected"] = [mim0] new_plan: List[Dict[str, Any]] = [] for it in plan: scope = it.get("scope", {}) or {} mg = scope.get("mg") mim = reg.canonicalize_mim(scope.get("mim")) or reg.pick_default_mim(mg) # Parse and canonicalize name parts name = it.get("name", "") mim_in, dev_raw, point_raw = _parse_name(name) point = reg.canonicalize_point(point_raw) if point_raw else point_raw # Choose mim priority: explicit in scope, else in name mim_final = mim or reg.canonicalize_mim(mim_in) # Canonicalize device using mg + synonyms dev_final = reg.canonicalize_device(dev_raw or "", mg) # If still unknown and strict, drop if strict and mg and dev_final not in reg.allowed_devices(mg): notes.append(f"dropped unknown device for mg={mg}: {dev_raw}") continue # Rebuild name with exactly one dot in device segment if dev_final and point: it["name"] = _build_name(mim_final, dev_final, point) # Update scope.mim to the resolved one if mim_final: it.setdefault("scope", {}) it["scope"]["mim"] = mim_final # Normalize switch/OPEN-CLOSED values _normalize_openclose(it) new_plan.append(it) ap["plan"] = new_plan return ap, notes