Resource_Analytics / stability_index.py
pvanand's picture
Upload 18 files
cee8b9e verified
"""
Stability Index Engine
======================
Computes a deterministic composite risk score SI ∈ [0, 10] per minute
from the generated power and water 1MIN CSV files.
Formula (v1.0 β€” matches stability_index_spec.docx):
SI(t) = SI_raw(t) Γ— V_dep(t)
SI_raw = 10 Γ— (w_batΒ·C_bat + w_freshΒ·C_fresh + w_wasteΒ·C_waste
+ w_energyΒ·C_energy + w_stableΒ·C_stable)
V_dep = clamp(ttc / horizon_hr, 0.5, 1.0)
where ttc = min(time_to_crit_battery, time_to_crit_fresh) [hours]
Output CSV columns:
Time, SI, SI_raw, C_bat, C_fresh, C_waste, C_grey, C_black,
C_energy, C_stable, V_dep, ttc_hr, band, warm_up,
Battery_Level_Pct, FreshTank_Level_L, GreyTank_Level_L, BlackTank_Level_L
Usage:
python stability_index.py # uses ./output/
python stability_index.py --power_dir data/power --water_dir data/water
python stability_index.py --config si_config.json --out si_scores.csv
python stability_index.py --summary
"""
import csv
import json
import math
import argparse
import statistics
from collections import deque
from pathlib import Path
# ─────────────────────────────────────────────────────────────────────────────
# DEFAULT CONFIGURATION
# ─────────────────────────────────────────────────────────────────────────────
DEFAULT_CONFIG = {
# --- Component weights (must sum to 1.0) ---
"w_bat": 0.35,
"w_fresh": 0.25,
"w_waste": 0.12,
"w_energy": 0.18,
"w_stable": 0.10,
# --- Battery thresholds (%) ---
"bat_crit_pct": 20.0,
"bat_warn_pct": 50.0,
# --- Fresh water thresholds (%) ---
"fresh_crit_pct": 15.0,
"fresh_warn_pct": 40.0,
# --- Tank capacities (litres) ---
"fresh_cap_L": 378.541, # 100 gal
"grey_cap_L": 189.271, # 50 gal
"black_cap_L": 170.344, # 45 gal
# --- Waste tank penalty start (fill fraction 0–1) ---
"grey_penalty_start": 0.70,
"black_penalty_start": 0.60,
# --- Waste composite weights ---
"grey_weight_in_waste": 0.60,
"black_weight_in_waste": 0.40,
# --- Energy balance (C_energy) ---
"energy_window_min": 60,
"solar_cap_kW": 5.25,
# --- Consumption stability (C_stable) ---
"stable_window_min": 30,
"stable_k": 2.0,
"stable_mu_floor": 0.05,
# --- Depletion velocity modifier (V_dep) ---
"velocity_window_min": 60,
"velocity_horizon_hr": 4.0,
"velocity_min_factor": 0.5,
# --- Score band thresholds ---
"bands": [
{"min": 8.0, "max": 10.0, "label": "Excellent"},
{"min": 6.0, "max": 8.0, "label": "Good"},
{"min": 4.0, "max": 6.0, "label": "Fair"},
{"min": 2.0, "max": 4.0, "label": "Poor"},
{"min": 0.0, "max": 2.0, "label": "Critical"},
],
}
LOAD_COLS = [
"HVAC_Flow_kW", "Lighting_Flow_kW", "Devices_Flow_kW", "Fridge_Flow_kW",
"WaterPump_Flow_kW", "Cooking_Flow_kW", "Inverter_Flow_kW", "Unmetered_Flow_kW",
]
# ─────────────────────────────────────────────────────────────────────────────
# HELPERS
# ─────────────────────────────────────────────────────────────────────────────
def clamp(val, lo, hi):
return max(lo, min(hi, val))
def band_label(si, bands):
for b in bands:
if b["min"] <= si <= b["max"]:
return b["label"]
return "Critical"
def load_config(path):
cfg = dict(DEFAULT_CONFIG)
if path:
with open(path) as f:
cfg.update(json.load(f))
total_w = cfg["w_bat"] + cfg["w_fresh"] + cfg["w_waste"] + cfg["w_energy"] + cfg["w_stable"]
if abs(total_w - 1.0) > 1e-6:
raise ValueError(f"Component weights must sum to 1.0, got {total_w:.6f}")
return cfg
def load_csv(path):
with open(path, newline="") as f:
return list(csv.DictReader(f))
def merge_rows(power_rows, water_rows):
"""Inner join power + water rows on Time. Both files must be from same 1MIN export."""
if len(power_rows) == len(water_rows):
merged = []
for i, (p, w) in enumerate(zip(power_rows, water_rows)):
if p["Time"] != w["Time"]:
raise ValueError(
f"Timestamp mismatch at row {i}: power={p['Time']} water={w['Time']}"
)
merged.append({**p, **w})
return merged
# Fallback: join by timestamp key
water_idx = {r["Time"]: r for r in water_rows}
return [{**p, **water_idx[p["Time"]]} for p in power_rows if p["Time"] in water_idx]
# ─────────────────────────────────────────────────────────────────────────────
# COMPONENT SCORE FUNCTIONS
# ─────────────────────────────────────────────────────────────────────────────
def c_bat(bat_pct, crit, warn):
"""
Piecewise linear Battery SOC score β†’ [0, 1].
Zones (configurable via crit/warn thresholds):
[0, crit) β†’ [0.00, 0.30) steep penalty
[crit, warn) β†’ [0.30, 0.70) moderate penalty
[warn, 100] β†’ [0.70, 1.00] comfort zone
"""
p = clamp(bat_pct, 0.0, 100.0) / 100.0
c = crit / 100.0
w = warn / 100.0
if p < c:
return 0.00 + 0.30 * (p / c)
elif p < w:
return 0.30 + 0.40 * ((p - c) / (w - c))
else:
return 0.70 + 0.30 * ((p - w) / (1.0 - w))
def c_fresh(fresh_L, cap_L, crit_pct, warn_pct):
"""
Piecewise linear Fresh Water score β€” identical formula to c_bat,
applied to fill percentage. β†’ [0, 1].
"""
pct = clamp(fresh_L / cap_L * 100.0, 0.0, 100.0)
return c_bat(pct, crit_pct, warn_pct)
def headroom_score(fill, penalty_start):
"""
Single-tank waste headroom score.
fill : fraction of tank capacity used [0, 1]
penalty_start : fill fraction above which extra penalty applies
Linear headroom below penalty_start; steeply penalised above.
Returns 1.0 (empty) β†’ 0.0 (overflow).
"""
fill = clamp(fill, 0.0, 1.0)
headroom = 1.0 - fill
if fill <= penalty_start:
return headroom
excess = fill - penalty_start
range_above = 1.0 - penalty_start
factor = 1.0 - 3.0 * excess / range_above
return clamp(headroom * factor, 0.0, 1.0)
def c_waste(grey_L, black_L, grey_cap, black_cap,
grey_penalty, black_penalty, grey_w, black_w):
"""
Composite waste headroom = grey_w Γ— C_grey + black_w Γ— C_black.
Returns (C_waste, C_grey, C_black).
"""
cg = headroom_score(grey_L / grey_cap, grey_penalty)
cb = headroom_score(black_L / black_cap, black_penalty)
return grey_w * cg + black_w * cb, cg, cb
def c_energy(gen_window, load_window, solar_cap_kw):
"""
Rolling-window energy balance.
net_norm = clamp((mean_gen - mean_load) / solar_cap, -1, +1)
C_energy = 0.5 + 0.5 Γ— net_norm
"""
if not gen_window:
return 0.5
gen_mean = sum(gen_window) / len(gen_window)
load_mean = sum(load_window) / len(load_window)
net_norm = clamp((gen_mean - load_mean) / max(solar_cap_kw, 0.01), -1.0, 1.0)
return 0.5 + 0.5 * net_norm
def c_stable(load_window, k, mu_floor):
"""
Consumption stability via Coefficient of Variation.
CV = Οƒ / max(ΞΌ, mu_floor)
C_stable = 1 / (1 + k Γ— CV)
"""
n = len(load_window)
if n < 2:
return 1.0
mu = sum(load_window) / n
if mu < 1e-6:
return 1.0
variance = sum((x - mu) ** 2 for x in load_window) / (n - 1)
sigma = math.sqrt(variance)
cv = sigma / max(mu, mu_floor)
return 1.0 / (1.0 + k * cv)
def v_dep(bat_window, fresh_window,
bat_pct_now, fresh_L_now,
fresh_cap_L, bat_crit_pct,
horizon_hr, v_floor):
"""
Depletion velocity modifier β†’ (V_dep, ttc_hr).
Computes velocity of battery (% / hr) and fresh water (L / hr),
estimates time to critical threshold for each, takes the minimum,
then maps to a [v_floor, 1.0] suppression factor over horizon_hr.
"""
N = len(bat_window)
hrs = N / 60.0
INF = float("inf")
# Battery velocity (%/hr) β€” negative means depleting
v_bat = ((bat_pct_now - bat_window[0]) / hrs) if N >= 2 and hrs > 0 else 0.0
if v_bat < 0:
gap_bat = bat_pct_now - bat_crit_pct
ttc_bat = (gap_bat / max(abs(v_bat), 0.01)) if gap_bat > 0 else 0.0
else:
ttc_bat = INF
# Fresh water velocity (L/hr) β€” negative means depleting
crit_fresh_L = 0.15 * fresh_cap_L
v_fresh = ((fresh_L_now - fresh_window[0]) / hrs) if N >= 2 and hrs > 0 else 0.0
if v_fresh < 0:
gap_fresh = fresh_L_now - crit_fresh_L
ttc_fresh = (gap_fresh / max(abs(v_fresh), 0.1)) if gap_fresh > 0 else 0.0
else:
ttc_fresh = INF
ttc = min(ttc_bat, ttc_fresh)
ttc_hr = round(ttc, 4) if ttc != INF else 9999.0 # 9999 = "no depletion risk"
vd = clamp(ttc / horizon_hr, v_floor, 1.0) if ttc != INF else 1.0
return vd, ttc_hr
# ─────────────────────────────────────────────────────────────────────────────
# MAIN CALCULATION LOOP
# ─────────────────────────────────────────────────────────────────────────────
def calculate(power_path, water_path, cfg):
"""
Sequentially processes every 1-minute row.
Maintains rolling windows for multi-row computations.
Returns list of output dicts.
"""
rows = merge_rows(load_csv(power_path), load_csv(water_path))
N_energy = cfg["energy_window_min"]
N_stable = cfg["stable_window_min"]
N_velocity = cfg["velocity_window_min"]
warm_up = max(N_energy, N_velocity) # longest window = warm-up horizon
gen_win = deque(maxlen=N_energy) # Solar + Shore [kW]
load_win_e = deque(maxlen=N_energy) # total load [kW] β€” energy window
load_win_s = deque(maxlen=N_stable) # total load [kW] β€” stability window
bat_win = deque(maxlen=N_velocity) # battery % β€” velocity window
fresh_win = deque(maxlen=N_velocity) # fresh water L β€” velocity window
results = []
for i, row in enumerate(rows):
# ── Parse ──────────────────────────────────────────────────────────
bat_pct = float(row["Battery_Level_Pct"])
fresh_L = float(row["FreshTank_Level_L"])
grey_L = float(row["GreyTank_Level_L"])
black_L = float(row["BlackTank_Level_L"])
solar_kw = float(row["Solar_Flow_kW"])
shore_kw = float(row["Shore_Flow_kW"])
total_load = sum(float(row[c]) for c in LOAD_COLS)
# ── Feed rolling windows ───────────────────────────────────────────
gen_win.append(solar_kw + shore_kw)
load_win_e.append(total_load)
load_win_s.append(total_load)
bat_win.append(bat_pct)
fresh_win.append(fresh_L)
# ── Component scores ───────────────────────────────────────────────
sc_bat = c_bat(bat_pct, cfg["bat_crit_pct"], cfg["bat_warn_pct"])
sc_fresh = c_fresh(
fresh_L, cfg["fresh_cap_L"],
cfg["fresh_crit_pct"], cfg["fresh_warn_pct"]
)
sc_waste, sc_grey, sc_black = c_waste(
grey_L, black_L,
cfg["grey_cap_L"], cfg["black_cap_L"],
cfg["grey_penalty_start"], cfg["black_penalty_start"],
cfg["grey_weight_in_waste"], cfg["black_weight_in_waste"]
)
sc_energy = c_energy(gen_win, load_win_e, cfg["solar_cap_kW"])
sc_stable = c_stable(load_win_s, cfg["stable_k"], cfg["stable_mu_floor"])
# ── Weighted sum ───────────────────────────────────────────────────
si_raw = 10.0 * (
cfg["w_bat"] * sc_bat +
cfg["w_fresh"] * sc_fresh +
cfg["w_waste"] * sc_waste +
cfg["w_energy"] * sc_energy +
cfg["w_stable"] * sc_stable
)
si_raw = clamp(si_raw, 0.0, 10.0)
# ── Depletion velocity modifier ────────────────────────────────────
vd, ttc_hr = v_dep(
bat_win, fresh_win,
bat_pct, fresh_L,
cfg["fresh_cap_L"], cfg["bat_crit_pct"],
cfg["velocity_horizon_hr"], cfg["velocity_min_factor"]
)
si_final = clamp(si_raw * vd, 0.0, 10.0)
results.append({
"Time": row["Time"],
"SI": round(si_final, 4),
"SI_raw": round(si_raw, 4),
"C_bat": round(sc_bat, 4),
"C_fresh": round(sc_fresh, 4),
"C_waste": round(sc_waste, 4),
"C_grey": round(sc_grey, 4),
"C_black": round(sc_black, 4),
"C_energy": round(sc_energy, 4),
"C_stable": round(sc_stable, 4),
"V_dep": round(vd, 4),
"ttc_hr": ttc_hr,
"band": band_label(si_final, cfg["bands"]),
"warm_up": "true" if i < warm_up else "false",
# Passthrough context columns
"Battery_Level_Pct": round(bat_pct, 2),
"FreshTank_Level_L": round(fresh_L, 2),
"GreyTank_Level_L": round(grey_L, 2),
"BlackTank_Level_L": round(black_L, 2),
})
return results
# ─────────────────────────────────────────────────────────────────────────────
# SUMMARY REPORT
# ─────────────────────────────────────────────────────────────────────────────
def print_summary(results, cfg):
warm = [r for r in results if r["warm_up"] == "false"] or results
si_vals = [r["SI"] for r in warm]
si_mean = statistics.mean(si_vals)
si_min = min(si_vals)
si_max = max(si_vals)
si_std = statistics.stdev(si_vals) if len(si_vals) > 1 else 0.0
band_counts = {}
for r in warm:
band_counts[r["band"]] = band_counts.get(r["band"], 0) + 1
worst = min(warm, key=lambda r: r["SI"])
best = max(warm, key=lambda r: r["SI"])
def cmean(key):
return round(statistics.mean(r[key] for r in warm), 3)
W = 62
sep = "─" * W
print(f"\n{'═' * W}")
print(f" STABILITY INDEX β€” SUMMARY REPORT")
print(f" Period : {results[0]['Time']} β†’ {results[-1]['Time']}")
print(f" Rows : {len(results):,} | Warm-up excluded: {len(results)-len(warm):,}")
print(f"{'═' * W}")
print(f"\n OVERALL SI")
print(f" {sep}")
print(f" Mean : {si_mean:6.3f} ({band_label(si_mean, cfg['bands'])})")
print(f" Min : {si_min:6.3f} @ {worst['Time']} [{worst['band']}]")
print(f" Max : {si_max:6.3f} @ {best['Time']} [{best['band']}]")
print(f" Std : {si_std:6.3f}")
print(f"\n BAND DISTRIBUTION")
print(f" {sep}")
total = len(warm)
for b in ["Excellent", "Good", "Fair", "Poor", "Critical"]:
n = band_counts.get(b, 0)
pct = n / total * 100 if total else 0
bar = "β–ˆ" * int(pct / 2)
print(f" {b:<12} {n:>5} min ({pct:5.1f}%) {bar}")
print(f"\n COMPONENT AVERAGES (0–1, higher = more stable)")
print(f" {sep}")
rows = [
("C_bat", "Battery SOC ", cfg["w_bat"]),
("C_fresh", "Fresh Water ", cfg["w_fresh"]),
("C_waste", "Waste Headroom ", cfg["w_waste"]),
("C_grey", " ↳ Grey tank ", cfg["grey_weight_in_waste"] * cfg["w_waste"]),
("C_black", " ↳ Black tank ", cfg["black_weight_in_waste"] * cfg["w_waste"]),
("C_energy", "Energy Balance ", cfg["w_energy"]),
("C_stable", "Cons. Stability", cfg["w_stable"]),
("V_dep", "Depl. Velocity ", None),
]
for key, label, weight in rows:
val = cmean(key)
bar = "β–“" * int(val * 20)
wstr = f"w={weight:.2f}" if weight is not None else "modifier"
print(f" {label} {val:.3f} {bar:<22} {wstr}")
# Identify consecutive critical/poor runs
crit = [r for r in warm if r["band"] in ("Critical", "Poor")]
if crit:
# Build index for consecutive-run detection
idx_map = {r["Time"]: i for i, r in enumerate(results)}
runs, run_start, prev_i = [], None, None
for r in crit:
ri = idx_map[r["Time"]]
if prev_i is None or ri != prev_i + 1:
if run_start is not None:
runs.append((run_start, results[prev_i]))
run_start = r
prev_i = ri
if run_start:
runs.append((run_start, results[prev_i]))
print(f"\n ⚠ POOR / CRITICAL EVENTS ({len(crit)} minutes across {len(runs)} run(s))")
print(f" {sep}")
for start, end in runs[:10]:
dur = idx_map[end["Time"]] - idx_map[start["Time"]] + 1
print(f" {start['Time']} SI={start['SI']:.2f} "
f"[{start['band']}] duration={dur} min "
f"bat={start['Battery_Level_Pct']}% "
f"fresh={start['FreshTank_Level_L']:.0f}L")
if len(runs) > 10:
print(f" … and {len(runs)-10} more run(s)")
else:
print(f"\n βœ“ No Critical or Poor events detected.")
print(f"\n{'═' * W}\n")
# ─────────────────────────────────────────────────────────────────────────────
# CSV WRITER
# ─────────────────────────────────────────────────────────────────────────────
def write_csv(path, rows):
if not rows:
print(" No rows to write.")
return
Path(path).parent.mkdir(parents=True, exist_ok=True)
with open(path, "w", newline="") as f:
w = csv.DictWriter(f, fieldnames=rows[0].keys())
w.writeheader()
w.writerows(rows)
print(f" Wrote {len(rows):,} rows β†’ {path}")
# ─────────────────────────────────────────────────────────────────────────────
# ENTRY POINT
# ─────────────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="EV Camper Stability Index Engine",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python stability_index.py --summary
python stability_index.py --power_dir data/power --water_dir data/water
python stability_index.py --config si_config.json --out scores/si.csv --summary
python stability_index.py --w_bat 0.40 --w_fresh 0.20 --w_energy 0.20 --w_waste 0.10 --w_stable 0.10
"""
)
# Paths
parser.add_argument("--power_dir", default="output/power",
help="Directory containing 1MIN.csv (default: output/power)")
parser.add_argument("--water_dir", default="output/water",
help="Directory containing 1MIN.csv (default: output/water)")
parser.add_argument("--out", default="output/stability_index.csv",
help="Output CSV path (default: output/stability_index.csv)")
parser.add_argument("--config", default=None,
help="JSON config file to override any DEFAULT_CONFIG value")
# Inline weight overrides
parser.add_argument("--w_bat", type=float, default=None, metavar="W",
help="Battery SOC weight (overrides config)")
parser.add_argument("--w_fresh", type=float, default=None, metavar="W")
parser.add_argument("--w_waste", type=float, default=None, metavar="W")
parser.add_argument("--w_energy", type=float, default=None, metavar="W")
parser.add_argument("--w_stable", type=float, default=None, metavar="W")
# Other inline overrides
parser.add_argument("--solar_cap_kW", type=float, default=None)
parser.add_argument("--energy_window_min", type=int, default=None)
parser.add_argument("--stable_window_min", type=int, default=None)
parser.add_argument("--velocity_horizon_hr", type=float, default=None)
parser.add_argument("--fresh_cap_L", type=float, default=None)
parser.add_argument("--grey_cap_L", type=float, default=None)
parser.add_argument("--black_cap_L", type=float, default=None)
parser.add_argument("--summary", action="store_true",
help="Print a human-readable summary report to stdout")
args = parser.parse_args()
# Build config with optional file + CLI overrides
cfg = load_config(args.config)
for key in ["w_bat", "w_fresh", "w_waste", "w_energy", "w_stable",
"solar_cap_kW", "energy_window_min", "stable_window_min",
"velocity_horizon_hr", "fresh_cap_L", "grey_cap_L", "black_cap_L"]:
val = getattr(args, key, None)
if val is not None:
cfg[key] = val
total_w = cfg["w_bat"] + cfg["w_fresh"] + cfg["w_waste"] + cfg["w_energy"] + cfg["w_stable"]
if abs(total_w - 1.0) > 1e-4:
raise SystemExit(
f"ERROR: weights must sum to 1.0 (got {total_w:.4f})\n"
f" bat={cfg['w_bat']} fresh={cfg['w_fresh']} waste={cfg['w_waste']} "
f"energy={cfg['w_energy']} stable={cfg['w_stable']}"
)
power_path = str(Path(args.power_dir) / "1MIN.csv")
water_path = str(Path(args.water_dir) / "1MIN.csv")
print(f"\n{'='*62}")
print(f" Stability Index Engine v1.0")
print(f"{'='*62}")
print(f" Power : {power_path}")
print(f" Water : {water_path}")
print(f" Weights: bat={cfg['w_bat']} fresh={cfg['w_fresh']} "
f"waste={cfg['w_waste']} energy={cfg['w_energy']} stable={cfg['w_stable']}")
print(f" Windows: energy={cfg['energy_window_min']}min "
f"stable={cfg['stable_window_min']}min "
f"velocity={cfg['velocity_window_min']}min")
print()
print(" Calculating...")
results = calculate(power_path, water_path, cfg)
write_csv(args.out, results)
if args.summary:
print_summary(results, cfg)
print(f" Done.\n")
if __name__ == "__main__":
main()