File size: 7,988 Bytes
fba140f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
# scripts/guardrails.py
from __future__ import annotations
import json
import re
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Set
def _read_json(path: str) -> Dict[str, Any]:
return json.loads(Path(path).read_text(encoding="utf-8"))
class TargetRegistry:
def __init__(self, data: Dict[str, Any]):
self.data = data or {}
self.canonical = self.data.get("canonical", {})
self.points = self.data.get("points", {})
self.synonyms = self.data.get("synonyms", {})
self.mim_default_by_mg = self.data.get("mim_default_by_mg", {})
# Build quick lookup of allowed devices per mg (union of all groups)
self.allowed_by_mg: Dict[str, Set[str]] = {}
for group, mg_map in self.canonical.items():
for mg, devices in mg_map.items():
self.allowed_by_mg.setdefault(mg, set()).update(devices)
# Flatten device synonyms
self.dev_syn = self.synonyms.get("device", {})
self.point_syn = self.synonyms.get("point", {})
self.mim_syn = self.synonyms.get("mim", {})
@classmethod
def from_json(cls, path: str) -> "TargetRegistry":
return cls(_read_json(path))
def allowed_devices(self, mg: Optional[str]) -> Set[str]:
if not mg:
return set()
return self.allowed_by_mg.get(mg, set())
def canonicalize_mim(self, mim: Optional[str]) -> Optional[str]:
if mim is None:
return None
m = self.mim_syn.get(mim, mim)
m = m.upper()
if re.fullmatch(r"MIM[1-4]", m):
return m
return None # unknown MIM label
def canonicalize_point(self, point: str) -> str:
return self.point_syn.get(point, point)
def canonicalize_device(self, dev: str, mg: Optional[str]) -> str:
# direct synonym
dev2 = self.dev_syn.get(dev, dev)
# If device has a dot (like mg1.cap_01), squash using mg+core
if "." in dev2 and mg:
core = dev2.split(".")[-1]
candidate = f"{mg}{core}"
# Try mg-prefixed candidate first
if candidate in self.allowed_devices(mg):
return candidate
# Try synonym mapping of the core
dev3 = self.dev_syn.get(core, core)
candidate2 = f"{mg}{dev3}"
if candidate2 in self.allowed_devices(mg):
return candidate2
# If device lacks mg prefix but mg is present, and mg+dev is allowed, use it
if mg and not dev2.startswith(mg):
cand = f"{mg}{dev2}"
if cand in self.allowed_devices(mg):
return cand
# If already allowed, keep it
if mg and dev2 in self.allowed_devices(mg):
return dev2
# Fall back to synonym again (idempotent) or original
return dev2
def pick_default_mim(self, mg: Optional[str]) -> Optional[str]:
if mg is None:
return None
return self.mim_default_by_mg.get(mg)
def augment_prompt_with_allowlist(instruction: str, reg: TargetRegistry) -> str:
# Append a short allow-list hint (kept compact)
def bucket_preview(mg: str) -> str:
items = sorted(list(reg.allowed_devices(mg)))[:8]
return f"{mg}: {', '.join(items)}" if items else ""
hints = []
for mg in ("mg1", "mg2", "mg3", "substation", "unmapped"):
s = bucket_preview(mg)
if s:
hints.append(s)
if hints:
instruction += (
"\nAllowed device names (examples):\n"
+ "\n".join(hints)
+ "\nUse exactly one dot in `name`: [optional MIM].<device>.<point>\n"
)
return instruction
def _parse_name(name: str) -> Tuple[Optional[str], Optional[str], Optional[str]]:
# Accepts:
# MIM1.device.point
# device.point
# If multiple dots exist (bad), we capture best-effort pieces.
parts = name.split(".")
if len(parts) >= 3 and re.fullmatch(r"MIM[1-4]", parts[0]):
mim = parts[0]
point = parts[-1]
device = ".".join(parts[1:-1]) # may contain dots → caller will clean
return mim, device, point
elif len(parts) >= 2:
point = parts[-1]
device = ".".join(parts[:-1]) # may contain dots → caller will clean
return None, device, point
return None, None, None
def _build_name(mim: Optional[str], device: str, point: str) -> str:
if mim:
return f"{mim}.{device}.{point}"
return f"{device}.{point}"
def _is_switch_point(point: str) -> bool:
return point in {"status", "switchA", "switchB", "switchC"}
def _normalize_openclose(item: Dict[str, Any]) -> None:
op = item.get("op")
point = item.get("point")
if op in {"open", "close"} or _is_switch_point(point):
# Normalize values to uppercase OPEN/CLOSED
av = item.get("attack_value")
rv = item.get("real_value")
if isinstance(av, str):
item["attack_value"] = av.upper()
if isinstance(rv, str):
item["real_value"] = rv.upper()
# If op=open and attack_value missing → OPEN
if op == "open" and not item.get("attack_value"):
item["attack_value"] = "OPEN"
if op == "close" and not item.get("attack_value"):
item["attack_value"] = "CLOSED"
# Default real_value if absent (reasonable default)
if not item.get("real_value"):
item["real_value"] = "CLOSED" if item["attack_value"] == "OPEN" else "OPEN"
def validate_and_fix_attackplan(
ap: Dict[str, Any],
reg: TargetRegistry,
strict: bool = False,
autofix: bool = True,
cutoff: float = 0.92, # unused, kept for interface stability
) -> Tuple[Dict[str, Any], List[str]]:
notes: List[str] = []
if not isinstance(ap, dict):
notes.append("attack plan is not a dict")
return ap, notes
# Fix top-level mim.selected using first item's scope if needed
plan: List[Dict[str, Any]] = ap.get("plan") or []
if plan:
scope0 = plan[0].get("scope", {})
mg0 = scope0.get("mg")
mim0 = scope0.get("mim")
mim0 = reg.canonicalize_mim(mim0) or reg.pick_default_mim(mg0)
# Constrain selected to a single MIM when we have one
if mim0:
ap.setdefault("mim", {})
ap["mim"]["active"] = True
ap["mim"]["selected"] = [mim0]
new_plan: List[Dict[str, Any]] = []
for it in plan:
scope = it.get("scope", {}) or {}
mg = scope.get("mg")
mim = reg.canonicalize_mim(scope.get("mim")) or reg.pick_default_mim(mg)
# Parse and canonicalize name parts
name = it.get("name", "")
mim_in, dev_raw, point_raw = _parse_name(name)
point = reg.canonicalize_point(point_raw) if point_raw else point_raw
# Choose mim priority: explicit in scope, else in name
mim_final = mim or reg.canonicalize_mim(mim_in)
# Canonicalize device using mg + synonyms
dev_final = reg.canonicalize_device(dev_raw or "", mg)
# If still unknown and strict, drop
if strict and mg and dev_final not in reg.allowed_devices(mg):
notes.append(f"dropped unknown device for mg={mg}: {dev_raw}")
continue
# Rebuild name with exactly one dot in device segment
if dev_final and point:
it["name"] = _build_name(mim_final, dev_final, point)
# Update scope.mim to the resolved one
if mim_final:
it.setdefault("scope", {})
it["scope"]["mim"] = mim_final
# Normalize switch/OPEN-CLOSED values
_normalize_openclose(it)
new_plan.append(it)
ap["plan"] = new_plan
return ap, notes
|