adetuire1's picture
Upload folder using huggingface_hub
fba140f verified
# scripts/guardrails.py
from __future__ import annotations
import json
import re
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Set
def _read_json(path: str) -> Dict[str, Any]:
return json.loads(Path(path).read_text(encoding="utf-8"))
class TargetRegistry:
def __init__(self, data: Dict[str, Any]):
self.data = data or {}
self.canonical = self.data.get("canonical", {})
self.points = self.data.get("points", {})
self.synonyms = self.data.get("synonyms", {})
self.mim_default_by_mg = self.data.get("mim_default_by_mg", {})
# Build quick lookup of allowed devices per mg (union of all groups)
self.allowed_by_mg: Dict[str, Set[str]] = {}
for group, mg_map in self.canonical.items():
for mg, devices in mg_map.items():
self.allowed_by_mg.setdefault(mg, set()).update(devices)
# Flatten device synonyms
self.dev_syn = self.synonyms.get("device", {})
self.point_syn = self.synonyms.get("point", {})
self.mim_syn = self.synonyms.get("mim", {})
@classmethod
def from_json(cls, path: str) -> "TargetRegistry":
return cls(_read_json(path))
def allowed_devices(self, mg: Optional[str]) -> Set[str]:
if not mg:
return set()
return self.allowed_by_mg.get(mg, set())
def canonicalize_mim(self, mim: Optional[str]) -> Optional[str]:
if mim is None:
return None
m = self.mim_syn.get(mim, mim)
m = m.upper()
if re.fullmatch(r"MIM[1-4]", m):
return m
return None # unknown MIM label
def canonicalize_point(self, point: str) -> str:
return self.point_syn.get(point, point)
def canonicalize_device(self, dev: str, mg: Optional[str]) -> str:
# direct synonym
dev2 = self.dev_syn.get(dev, dev)
# If device has a dot (like mg1.cap_01), squash using mg+core
if "." in dev2 and mg:
core = dev2.split(".")[-1]
candidate = f"{mg}{core}"
# Try mg-prefixed candidate first
if candidate in self.allowed_devices(mg):
return candidate
# Try synonym mapping of the core
dev3 = self.dev_syn.get(core, core)
candidate2 = f"{mg}{dev3}"
if candidate2 in self.allowed_devices(mg):
return candidate2
# If device lacks mg prefix but mg is present, and mg+dev is allowed, use it
if mg and not dev2.startswith(mg):
cand = f"{mg}{dev2}"
if cand in self.allowed_devices(mg):
return cand
# If already allowed, keep it
if mg and dev2 in self.allowed_devices(mg):
return dev2
# Fall back to synonym again (idempotent) or original
return dev2
def pick_default_mim(self, mg: Optional[str]) -> Optional[str]:
if mg is None:
return None
return self.mim_default_by_mg.get(mg)
def augment_prompt_with_allowlist(instruction: str, reg: TargetRegistry) -> str:
# Append a short allow-list hint (kept compact)
def bucket_preview(mg: str) -> str:
items = sorted(list(reg.allowed_devices(mg)))[:8]
return f"{mg}: {', '.join(items)}" if items else ""
hints = []
for mg in ("mg1", "mg2", "mg3", "substation", "unmapped"):
s = bucket_preview(mg)
if s:
hints.append(s)
if hints:
instruction += (
"\nAllowed device names (examples):\n"
+ "\n".join(hints)
+ "\nUse exactly one dot in `name`: [optional MIM].<device>.<point>\n"
)
return instruction
def _parse_name(name: str) -> Tuple[Optional[str], Optional[str], Optional[str]]:
# Accepts:
# MIM1.device.point
# device.point
# If multiple dots exist (bad), we capture best-effort pieces.
parts = name.split(".")
if len(parts) >= 3 and re.fullmatch(r"MIM[1-4]", parts[0]):
mim = parts[0]
point = parts[-1]
device = ".".join(parts[1:-1]) # may contain dots → caller will clean
return mim, device, point
elif len(parts) >= 2:
point = parts[-1]
device = ".".join(parts[:-1]) # may contain dots → caller will clean
return None, device, point
return None, None, None
def _build_name(mim: Optional[str], device: str, point: str) -> str:
if mim:
return f"{mim}.{device}.{point}"
return f"{device}.{point}"
def _is_switch_point(point: str) -> bool:
return point in {"status", "switchA", "switchB", "switchC"}
def _normalize_openclose(item: Dict[str, Any]) -> None:
op = item.get("op")
point = item.get("point")
if op in {"open", "close"} or _is_switch_point(point):
# Normalize values to uppercase OPEN/CLOSED
av = item.get("attack_value")
rv = item.get("real_value")
if isinstance(av, str):
item["attack_value"] = av.upper()
if isinstance(rv, str):
item["real_value"] = rv.upper()
# If op=open and attack_value missing → OPEN
if op == "open" and not item.get("attack_value"):
item["attack_value"] = "OPEN"
if op == "close" and not item.get("attack_value"):
item["attack_value"] = "CLOSED"
# Default real_value if absent (reasonable default)
if not item.get("real_value"):
item["real_value"] = "CLOSED" if item["attack_value"] == "OPEN" else "OPEN"
def validate_and_fix_attackplan(
ap: Dict[str, Any],
reg: TargetRegistry,
strict: bool = False,
autofix: bool = True,
cutoff: float = 0.92, # unused, kept for interface stability
) -> Tuple[Dict[str, Any], List[str]]:
notes: List[str] = []
if not isinstance(ap, dict):
notes.append("attack plan is not a dict")
return ap, notes
# Fix top-level mim.selected using first item's scope if needed
plan: List[Dict[str, Any]] = ap.get("plan") or []
if plan:
scope0 = plan[0].get("scope", {})
mg0 = scope0.get("mg")
mim0 = scope0.get("mim")
mim0 = reg.canonicalize_mim(mim0) or reg.pick_default_mim(mg0)
# Constrain selected to a single MIM when we have one
if mim0:
ap.setdefault("mim", {})
ap["mim"]["active"] = True
ap["mim"]["selected"] = [mim0]
new_plan: List[Dict[str, Any]] = []
for it in plan:
scope = it.get("scope", {}) or {}
mg = scope.get("mg")
mim = reg.canonicalize_mim(scope.get("mim")) or reg.pick_default_mim(mg)
# Parse and canonicalize name parts
name = it.get("name", "")
mim_in, dev_raw, point_raw = _parse_name(name)
point = reg.canonicalize_point(point_raw) if point_raw else point_raw
# Choose mim priority: explicit in scope, else in name
mim_final = mim or reg.canonicalize_mim(mim_in)
# Canonicalize device using mg + synonyms
dev_final = reg.canonicalize_device(dev_raw or "", mg)
# If still unknown and strict, drop
if strict and mg and dev_final not in reg.allowed_devices(mg):
notes.append(f"dropped unknown device for mg={mg}: {dev_raw}")
continue
# Rebuild name with exactly one dot in device segment
if dev_final and point:
it["name"] = _build_name(mim_final, dev_final, point)
# Update scope.mim to the resolved one
if mim_final:
it.setdefault("scope", {})
it["scope"]["mim"] = mim_final
# Normalize switch/OPEN-CLOSED values
_normalize_openclose(it)
new_plan.append(it)
ap["plan"] = new_plan
return ap, notes