|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple, Set
|
|
|
|
|
|
def _read_json(path: str) -> Dict[str, Any]:
|
|
return json.loads(Path(path).read_text(encoding="utf-8"))
|
|
|
|
|
|
class TargetRegistry:
|
|
def __init__(self, data: Dict[str, Any]):
|
|
self.data = data or {}
|
|
self.canonical = self.data.get("canonical", {})
|
|
self.points = self.data.get("points", {})
|
|
self.synonyms = self.data.get("synonyms", {})
|
|
self.mim_default_by_mg = self.data.get("mim_default_by_mg", {})
|
|
|
|
|
|
self.allowed_by_mg: Dict[str, Set[str]] = {}
|
|
for group, mg_map in self.canonical.items():
|
|
for mg, devices in mg_map.items():
|
|
self.allowed_by_mg.setdefault(mg, set()).update(devices)
|
|
|
|
|
|
self.dev_syn = self.synonyms.get("device", {})
|
|
self.point_syn = self.synonyms.get("point", {})
|
|
self.mim_syn = self.synonyms.get("mim", {})
|
|
|
|
@classmethod
|
|
def from_json(cls, path: str) -> "TargetRegistry":
|
|
return cls(_read_json(path))
|
|
|
|
def allowed_devices(self, mg: Optional[str]) -> Set[str]:
|
|
if not mg:
|
|
return set()
|
|
return self.allowed_by_mg.get(mg, set())
|
|
|
|
def canonicalize_mim(self, mim: Optional[str]) -> Optional[str]:
|
|
if mim is None:
|
|
return None
|
|
m = self.mim_syn.get(mim, mim)
|
|
m = m.upper()
|
|
if re.fullmatch(r"MIM[1-4]", m):
|
|
return m
|
|
return None
|
|
|
|
def canonicalize_point(self, point: str) -> str:
|
|
return self.point_syn.get(point, point)
|
|
|
|
def canonicalize_device(self, dev: str, mg: Optional[str]) -> str:
|
|
|
|
dev2 = self.dev_syn.get(dev, dev)
|
|
|
|
|
|
if "." in dev2 and mg:
|
|
core = dev2.split(".")[-1]
|
|
candidate = f"{mg}{core}"
|
|
|
|
if candidate in self.allowed_devices(mg):
|
|
return candidate
|
|
|
|
dev3 = self.dev_syn.get(core, core)
|
|
candidate2 = f"{mg}{dev3}"
|
|
if candidate2 in self.allowed_devices(mg):
|
|
return candidate2
|
|
|
|
|
|
if mg and not dev2.startswith(mg):
|
|
cand = f"{mg}{dev2}"
|
|
if cand in self.allowed_devices(mg):
|
|
return cand
|
|
|
|
|
|
if mg and dev2 in self.allowed_devices(mg):
|
|
return dev2
|
|
|
|
|
|
return dev2
|
|
|
|
def pick_default_mim(self, mg: Optional[str]) -> Optional[str]:
|
|
if mg is None:
|
|
return None
|
|
return self.mim_default_by_mg.get(mg)
|
|
|
|
|
|
def augment_prompt_with_allowlist(instruction: str, reg: TargetRegistry) -> str:
|
|
|
|
def bucket_preview(mg: str) -> str:
|
|
items = sorted(list(reg.allowed_devices(mg)))[:8]
|
|
return f"{mg}: {', '.join(items)}" if items else ""
|
|
|
|
hints = []
|
|
for mg in ("mg1", "mg2", "mg3", "substation", "unmapped"):
|
|
s = bucket_preview(mg)
|
|
if s:
|
|
hints.append(s)
|
|
|
|
if hints:
|
|
instruction += (
|
|
"\nAllowed device names (examples):\n"
|
|
+ "\n".join(hints)
|
|
+ "\nUse exactly one dot in `name`: [optional MIM].<device>.<point>\n"
|
|
)
|
|
return instruction
|
|
|
|
|
|
def _parse_name(name: str) -> Tuple[Optional[str], Optional[str], Optional[str]]:
|
|
|
|
|
|
|
|
|
|
parts = name.split(".")
|
|
if len(parts) >= 3 and re.fullmatch(r"MIM[1-4]", parts[0]):
|
|
mim = parts[0]
|
|
point = parts[-1]
|
|
device = ".".join(parts[1:-1])
|
|
return mim, device, point
|
|
elif len(parts) >= 2:
|
|
point = parts[-1]
|
|
device = ".".join(parts[:-1])
|
|
return None, device, point
|
|
return None, None, None
|
|
|
|
|
|
def _build_name(mim: Optional[str], device: str, point: str) -> str:
|
|
if mim:
|
|
return f"{mim}.{device}.{point}"
|
|
return f"{device}.{point}"
|
|
|
|
|
|
def _is_switch_point(point: str) -> bool:
|
|
return point in {"status", "switchA", "switchB", "switchC"}
|
|
|
|
|
|
def _normalize_openclose(item: Dict[str, Any]) -> None:
|
|
op = item.get("op")
|
|
point = item.get("point")
|
|
if op in {"open", "close"} or _is_switch_point(point):
|
|
|
|
av = item.get("attack_value")
|
|
rv = item.get("real_value")
|
|
if isinstance(av, str):
|
|
item["attack_value"] = av.upper()
|
|
if isinstance(rv, str):
|
|
item["real_value"] = rv.upper()
|
|
|
|
if op == "open" and not item.get("attack_value"):
|
|
item["attack_value"] = "OPEN"
|
|
if op == "close" and not item.get("attack_value"):
|
|
item["attack_value"] = "CLOSED"
|
|
|
|
if not item.get("real_value"):
|
|
item["real_value"] = "CLOSED" if item["attack_value"] == "OPEN" else "OPEN"
|
|
|
|
|
|
def validate_and_fix_attackplan(
|
|
ap: Dict[str, Any],
|
|
reg: TargetRegistry,
|
|
strict: bool = False,
|
|
autofix: bool = True,
|
|
cutoff: float = 0.92,
|
|
) -> Tuple[Dict[str, Any], List[str]]:
|
|
notes: List[str] = []
|
|
|
|
if not isinstance(ap, dict):
|
|
notes.append("attack plan is not a dict")
|
|
return ap, notes
|
|
|
|
|
|
plan: List[Dict[str, Any]] = ap.get("plan") or []
|
|
if plan:
|
|
scope0 = plan[0].get("scope", {})
|
|
mg0 = scope0.get("mg")
|
|
mim0 = scope0.get("mim")
|
|
mim0 = reg.canonicalize_mim(mim0) or reg.pick_default_mim(mg0)
|
|
|
|
if mim0:
|
|
ap.setdefault("mim", {})
|
|
ap["mim"]["active"] = True
|
|
ap["mim"]["selected"] = [mim0]
|
|
|
|
new_plan: List[Dict[str, Any]] = []
|
|
|
|
for it in plan:
|
|
scope = it.get("scope", {}) or {}
|
|
mg = scope.get("mg")
|
|
mim = reg.canonicalize_mim(scope.get("mim")) or reg.pick_default_mim(mg)
|
|
|
|
|
|
name = it.get("name", "")
|
|
mim_in, dev_raw, point_raw = _parse_name(name)
|
|
point = reg.canonicalize_point(point_raw) if point_raw else point_raw
|
|
|
|
|
|
mim_final = mim or reg.canonicalize_mim(mim_in)
|
|
|
|
|
|
dev_final = reg.canonicalize_device(dev_raw or "", mg)
|
|
|
|
|
|
if strict and mg and dev_final not in reg.allowed_devices(mg):
|
|
notes.append(f"dropped unknown device for mg={mg}: {dev_raw}")
|
|
continue
|
|
|
|
|
|
if dev_final and point:
|
|
it["name"] = _build_name(mim_final, dev_final, point)
|
|
|
|
|
|
if mim_final:
|
|
it.setdefault("scope", {})
|
|
it["scope"]["mim"] = mim_final
|
|
|
|
|
|
_normalize_openclose(it)
|
|
|
|
new_plan.append(it)
|
|
|
|
ap["plan"] = new_plan
|
|
return ap, notes
|
|
|