| |
| from __future__ import annotations |
| import os |
| import json |
| from typing import Any, Dict, Tuple |
| import numpy as np |
| import torch |
| import torch.nn.functional as F |
| import requests |
| import numpy as np |
| import json |
|
|
| import requests |
| import numpy as np |
|
|
| class RemoteHTTPPolicy: |
| def __init__(self, server_url: str = "http://host.docker.internal:8000"): |
| self.server_url = server_url |
| self.predict_endpoint = f"{server_url}/predict" |
| self.reset_endpoint = f"{server_url}/reset" |
| print(f"[RemotePolicy] Connecting to {self.server_url}...") |
|
|
| def reset(self): |
| try: |
| requests.post(self.reset_endpoint, json={"message": "reset"}) |
| print("[RemotePolicy] Remote buffer reset.") |
| except Exception as e: |
| print(f"[RemotePolicy] Reset failed: {e}") |
|
|
| def act(self, obs, info, step): |
| obs_list = np.array(obs, dtype=np.float32).tolist() |
| payload = {"step": int(step), "obs": obs_list, "info": {}} |
| try: |
| resp = requests.post(self.predict_endpoint, json=payload) |
| resp.raise_for_status() |
| action = np.array(resp.json()["action"], dtype=np.float32) |
| return action, {}, {} |
| except Exception as e: |
| print(f"[RemotePolicy] Error: {e}") |
| return np.array([21.0, 24.0] * 5, dtype=np.float32), {}, {} |
|
|
|
|
| def _get_int_env(name: str, default: int) -> int: |
| try: |
| v = int(os.environ.get(name, str(default))) |
| return v |
| except Exception: |
| return default |
|
|
|
|
| def _get_bool_env(name: str, default: bool) -> bool: |
| v = os.environ.get(name, None) |
| if v is None: |
| return default |
| return v.strip().lower() in ("1", "true", "yes", "y", "on") |
|
|
|
|
| |
| |
| |
| class ConstantSetpointPolicy5Zone: |
| """ |
| Constant rule-based controller: 5 zones × (htg, clg) each. |
| Returns action = [htg, clg] * 5. |
| """ |
| def __init__(self, heating_sp: float = 21.0, cooling_sp: float = 24.0): |
| self.heating_sp = float(heating_sp) |
| self.cooling_sp = float(cooling_sp) |
| self.action = np.array([self.heating_sp, self.cooling_sp] * 5, dtype=np.float32) |
|
|
| def reset(self): |
| return |
|
|
| def act(self, obs, info, step): |
| return self.action.copy(), {}, {} |
|
|
|
|
| class DecisionTransformerPolicy5Zone: |
| """ |
| CPU-safe DT policy with robust observation mapping and deadband protection. |
| """ |
|
|
| def __init__( |
| self, |
| ckpt_path: str, |
| model_config_path: str, |
| norm_stats_path: str, |
| context_len: int, |
| max_tokens_per_step: int, |
| device: str = "cpu", |
| temperature: float = 0.5, |
| ): |
| import dataloader as dl |
| from embeddings import GeneralistComfortDT |
|
|
| |
| torch.set_grad_enabled(False) |
| torch.backends.mha.set_fastpath_enabled(True) |
| torch.backends.mkldnn.enabled = _get_bool_env("DT_MKLDNN", True) |
| import multiprocessing |
| avail = multiprocessing.cpu_count() |
| dt_threads = _get_int_env("DT_NUM_THREADS", min(18, avail)) |
| torch.set_num_threads(dt_threads) |
| torch.set_num_interop_threads(1) |
|
|
| self.dl = dl |
| self.device = torch.device("cpu") |
| self.temperature = float(temperature) |
| |
| with open(model_config_path, "r") as f: |
| cfg = json.load(f) |
|
|
| cfg["CONTEXT_LEN"] = int(context_len) |
| self.L = int(context_len) |
| self.K = int(max_tokens_per_step) |
|
|
| self.model = GeneralistComfortDT(cfg).to(self.device) |
| ckpt = torch.load(ckpt_path, map_location="cpu") |
| self.model.load_state_dict(ckpt["model"], strict=True) |
| self.model.eval() |
| |
| |
| z = np.load(norm_stats_path) |
| self.obs_mean = z["obs_mean"].astype(np.float32) |
| self.obs_std = z["obs_std"].astype(np.float32) |
| self.act_mean = z["act_mean"].astype(np.float32) |
| self.act_std = z["act_std"].astype(np.float32) |
| self.max_return = float(z["max_return"][0]) if "max_return" in z else 1.0 |
|
|
| |
| self.rtg_scale_mode = "max_return" |
| self.rtg_constant_div = 1.0 |
| self.desired_rtg_raw = -0.5 |
|
|
| self.prev_action = np.array([21.0, 24.0] * 5, dtype=np.float32) |
|
|
| |
|
|
| self.env_keys_order = [ |
| 'month', 'day_of_month', 'hour', |
| 'outdoor_temp', 'core_temp', 'perim1_temp', 'perim2_temp', 'perim3_temp', 'perim4_temp', |
| 'elec_power', |
| 'core_occ_count', 'perim1_occ_count', 'perim2_occ_count', 'perim3_occ_count', 'perim4_occ_count', |
| 'outdoor_dewpoint', 'outdoor_wetbulb', |
| 'core_rh', 'perim1_rh', 'perim2_rh', 'perim3_rh', 'perim4_rh', |
| 'core_ash55_notcomfortable_summer', 'core_ash55_notcomfortable_winter', 'core_ash55_notcomfortable_any', |
| 'p1_ash55_notcomfortable_any', 'p2_ash55_notcomfortable_any', 'p3_ash55_notcomfortable_any', 'p4_ash55_notcomfortable_any', |
| 'total_electricity_HVAC' |
| ] |
|
|
|
|
| self.model_state_keys = [ |
| 'outdoor_temp', 'core_temp', 'perim1_temp', 'perim2_temp', 'perim3_temp', 'perim4_temp', |
| 'elec_power', |
| 'core_occ_count', 'perim1_occ_count', 'perim2_occ_count', 'perim3_occ_count', 'perim4_occ_count', |
| 'outdoor_dewpoint', 'outdoor_wetbulb', |
| 'core_rh', 'perim1_rh', 'perim2_rh', 'perim3_rh', 'perim4_rh', |
| 'core_ash55_notcomfortable_summer', 'core_ash55_notcomfortable_winter', 'core_ash55_notcomfortable_any', |
| 'p1_ash55_notcomfortable_any', 'p2_ash55_notcomfortable_any', 'p3_ash55_notcomfortable_any', 'p4_ash55_notcomfortable_any', |
| 'month', 'hour' |
| ] |
| |
| self.obs_indices = [] |
| for k in self.model_state_keys: |
| try: |
| self.obs_indices.append(self.env_keys_order.index(k)) |
| except ValueError: |
| print(f"Key {k} missing") |
| self.obs_indices.append(0) |
| self.obs_indices = np.array(self.obs_indices, dtype=np.int64) |
|
|
| self.action_keys = [ |
| "htg_core", "clg_core", "htg_p1", "clg_p1", "htg_p2", "clg_p2", |
| "htg_p3", "clg_p3", "htg_p4", "clg_p4", |
| ] |
|
|
| |
| self.s_meta = [self.dl.parse_feature_identity(k, is_action=False) for k in self.model_state_keys] |
| self.a_meta = [self.dl.parse_feature_identity(k, is_action=True) for k in self.action_keys] |
|
|
| self.num_act = min(len(self.a_meta), self.K) |
| self.num_state = min(len(self.s_meta), self.K - self.num_act) |
|
|
| |
| self.row_feat_ids = np.zeros((self.K,), dtype=np.int64) |
| self.row_zone_ids = np.zeros((self.K,), dtype=np.int64) |
| self.row_attn = np.zeros((self.K,), dtype=np.int64) |
| self.row_feat_vals = np.zeros((self.K,), dtype=np.float32) |
|
|
| if self.num_state > 0: |
| s_meta = self.s_meta[:self.num_state] |
| self.row_feat_ids[:self.num_state] = np.array([m[0] for m in s_meta], dtype=np.int64) |
| self.row_zone_ids[:self.num_state] = np.array([m[1] for m in s_meta], dtype=np.int64) |
| self.row_attn[:self.num_state] = 1 |
|
|
| if self.num_act > 0: |
| start = self.num_state |
| end = start + self.num_act |
| a_meta = self.a_meta[:self.num_act] |
| self.row_feat_ids[start:end] = np.array([m[0] for m in a_meta], dtype=np.int64) |
| self.row_zone_ids[start:end] = np.array([m[1] for m in a_meta], dtype=np.int64) |
| self.row_attn[start:end] = 1 |
|
|
| |
| self.context_dim = cfg.get("CONTEXT_DIM", 10) |
|
|
| |
| |
| self.buf_feature_ids = torch.zeros((self.L, self.K), dtype=torch.long, device=self.device) |
| self.buf_feature_vals = torch.zeros((self.L, self.K), dtype=torch.float32, device=self.device) |
| self.buf_zone_ids = torch.zeros((self.L, self.K), dtype=torch.long, device=self.device) |
| self.buf_attn = torch.zeros((self.L, self.K), dtype=torch.long, device=self.device) |
| self.buf_rtg = torch.zeros((self.L,), dtype=torch.float32, device=self.device) |
|
|
| |
| self.t_feature_ids = torch.zeros((1, self.L, self.K), dtype=torch.long, device=self.device) |
| self.t_feature_vals = torch.zeros((1, self.L, self.K), dtype=torch.float32, device=self.device) |
| self.t_zone_ids = torch.zeros((1, self.L, self.K), dtype=torch.long, device=self.device) |
| self.t_attn = torch.zeros((1, self.L, self.K), dtype=torch.long, device=self.device) |
| self.t_rtg = torch.zeros((1, self.L), dtype=torch.float32, device=self.device) |
|
|
| self.ptr = 0 |
| self.filled = 0 |
|
|
| |
| self.t_context = torch.zeros((1, self.context_dim), dtype=torch.float32, device=self.device) |
|
|
| def reset(self): |
| self.buf_feature_ids.zero_() |
| self.buf_feature_vals.zero_() |
| self.buf_zone_ids.zero_() |
| self.buf_attn.zero_() |
| self.buf_rtg.zero_() |
| self.t_feature_ids.zero_() |
| self.t_feature_vals.zero_() |
| self.t_zone_ids.zero_() |
| self.t_attn.zero_() |
| self.t_rtg.zero_() |
| self.prev_action = np.array([21.0, 24.0] * 5, dtype=np.float32) |
| self.ptr = 0 |
| self.filled = 0 |
|
|
| def _decode_bin_to_setpoint(self, bin_id: int, key: str) -> float: |
| if "clg" in key.lower() or "cool" in key.lower(): |
| lo, hi = self.dl.CLG_LOW, self.dl.CLG_HIGH |
| else: |
| lo, hi = self.dl.HTG_LOW, self.dl.HTG_HIGH |
| x = float(bin_id) / float(self.dl.NUM_ACTION_BINS - 1) |
| return lo + x * (hi - lo) |
|
|
| def _scale_rtg(self, rtg_raw: float) -> float: |
| if self.rtg_scale_mode == "max_return": |
| scale = max(self.max_return, 1e-6) |
| return float(rtg_raw) / scale |
| return float(rtg_raw) / float(self.rtg_constant_div) |
|
|
| def _write_model_inputs_from_ring(self): |
| if self.filled < self.L: |
| start = self.L - self.filled |
| self.t_feature_ids.zero_(); self.t_feature_vals.zero_() |
| self.t_zone_ids.zero_(); self.t_attn.zero_(); self.t_rtg.zero_() |
| self.t_feature_ids[0, start:].copy_(self.buf_feature_ids[: self.filled]) |
| self.t_feature_vals[0, start:].copy_(self.buf_feature_vals[: self.filled]) |
| self.t_zone_ids[0, start:].copy_(self.buf_zone_ids[: self.filled]) |
| self.t_attn[0, start:].copy_(self.buf_attn[: self.filled]) |
| self.t_rtg[0, start:].copy_(self.buf_rtg[: self.filled]) |
| return |
|
|
| p = self.ptr |
| n1 = self.L - p |
| self.t_feature_ids[0, :n1].copy_(self.buf_feature_ids[p:]) |
| self.t_feature_vals[0, :n1].copy_(self.buf_feature_vals[p:]) |
| self.t_zone_ids[0, :n1].copy_(self.buf_zone_ids[p:]) |
| self.t_attn[0, :n1].copy_(self.buf_attn[p:]) |
| self.t_rtg[0, :n1].copy_(self.buf_rtg[p:]) |
| |
| self.t_feature_ids[0, n1:].copy_(self.buf_feature_ids[:p]) |
| self.t_feature_vals[0, n1:].copy_(self.buf_feature_vals[:p]) |
| self.t_zone_ids[0, n1:].copy_(self.buf_zone_ids[:p]) |
| self.t_attn[0, n1:].copy_(self.buf_attn[:p]) |
| self.t_rtg[0, n1:].copy_(self.buf_rtg[:p]) |
|
|
| def act(self, obs: Any, info: Dict[str, Any], step: int) -> Tuple[np.ndarray, Dict, Dict]: |
| |
| |
| obs_raw = np.asarray(obs, dtype=np.float32) |
| env_map = dict(zip(self.env_keys_order, obs_raw)) |
| obs_ordered = np.array([env_map.get(k, 0.0) for k in self.model_state_keys], dtype=np.float32) |
|
|
| |
| obs_norm = obs_ordered.copy() |
| D = min(len(self.obs_mean), obs_norm.shape[0]) |
| eps = 1e-6 |
| obs_norm[:D] = (obs_norm[:D] - self.obs_mean[:D]) / (self.obs_std[:D] + eps) |
| |
|
|
|
|
| |
| |
| |
| |
| out_temp = env_map.get('outdoor_temp', 0.0) |
| out_dew = env_map.get('outdoor_dewpoint', 0.0) |
| hour = env_map.get('hour', 0.0) |
| month = env_map.get('month', 1.0) |
| |
| occ_total = 0.0 |
| occ_keys = ['core_occ_count', 'perim1_occ_count', 'perim2_occ_count', 'perim3_occ_count', 'perim4_occ_count'] |
| for k in occ_keys: |
| if env_map.get(k, 0.0) > 0.5: |
| occ_total += 1.0 |
| occ_frac = occ_total / 5.0 |
|
|
| hr_sin = np.sin(2 * np.pi * hour / 24.0) |
| hr_cos = np.cos(2 * np.pi * hour / 24.0) |
| mth_norm = month - 1.0 |
| mth_sin = np.sin(2 * np.pi * mth_norm / 12.0) |
| mth_cos = np.cos(2 * np.pi * mth_norm / 12.0) |
| |
| ctx_vec = np.array([ |
| out_temp, 0.0, |
| out_dew, |
| occ_frac, |
| hr_sin, hr_cos, |
| mth_sin, mth_cos, |
| 0.0, 0.0 |
| ], dtype=np.float32) |
| |
| self.t_context[0].copy_(torch.from_numpy(ctx_vec)) |
| act_norm = self.prev_action.copy() |
| A = min(len(self.act_mean), act_norm.shape[0]) |
| act_norm[:A] = (act_norm[:A] - self.act_mean[:A]) / self.act_std[:A] |
|
|
|
|
|
|
| |
| self.row_feat_vals.fill(0.0) |
| if self.num_state > 0: |
| self.row_feat_vals[: self.num_state] = obs_norm[: self.num_state] |
| if self.num_act > 0: |
| s, e = self.num_state, self.num_state + self.num_act |
| if step < 5: |
| good_action = np.array([22.0, 25.0] * 5, dtype=np.float32) |
| good_norm = good_action.copy() |
| A_len = min(len(self.act_mean), good_norm.shape[0]) |
| good_norm[:A_len] = (good_norm[:A_len] - self.act_mean[:A_len]) / self.act_std[:A_len] |
| self.row_feat_vals[s:e] = good_norm[: self.num_act] |
| else: |
| self.row_feat_vals[s:e] = act_norm[: self.num_act] |
|
|
| i = self.ptr |
| self.buf_feature_ids[i].copy_(torch.as_tensor(self.row_feat_ids, dtype=torch.long)) |
| self.buf_zone_ids[i].copy_(torch.as_tensor(self.row_zone_ids, dtype=torch.long)) |
| self.buf_attn[i].copy_(torch.as_tensor(self.row_attn, dtype=torch.long)) |
| self.buf_feature_vals[i].copy_(torch.as_tensor(self.row_feat_vals, dtype=torch.float32)) |
| self.buf_rtg[i] = float(self._scale_rtg(self.desired_rtg_raw)) |
|
|
| self.ptr = (self.ptr + 1) % self.L |
| self.filled = min(self.filled + 1, self.L) |
|
|
| self._write_model_inputs_from_ring() |
| with torch.inference_mode(): |
| with torch.amp.autocast(device_type="cpu", dtype=torch.bfloat16): |
| out = self.model(self.t_feature_ids, self.t_feature_vals, self.t_zone_ids, self.t_attn, rtg=self.t_rtg, context=self.t_context) |
|
|
|
|
|
|
| logits = out["action_logits"] |
| last = logits[0, -1] |
| s, e = self.num_state, self.num_state + self.num_act |
| temp = max(self.temperature, 1e-4) |
| raw_logits = last[s:e] |
| if torch.isnan(raw_logits).any() or torch.isinf(raw_logits).any(): |
| raw_logits = torch.nan_to_num(raw_logits, nan=0.0, posinf=10.0, neginf=-10.0) |
|
|
| |
| action_logits = raw_logits / temp |
| |
| |
| action_probs = F.softmax(action_logits, dim=-1) |
| if torch.isnan(action_probs).any() or (action_probs < 0).any(): |
| action_probs = torch.ones_like(action_probs) / action_probs.size(-1) |
|
|
| |
| try: |
| pred_bins = torch.multinomial(action_probs, num_samples=1).flatten().cpu().numpy().astype(np.int64) |
| except RuntimeError as err: |
| pred_bins = torch.argmax(action_probs, dim=-1).cpu().numpy().astype(np.int64) |
| |
| action = self.prev_action.copy() |
| for j in range(self.num_act): |
| action[j] = self._decode_bin_to_setpoint(int(pred_bins[j]), self.action_keys[j]) |
| |
| for j, k in enumerate(self.action_keys): |
| if "clg" in k.lower(): |
| action[j] = float(np.clip(action[j], self.dl.CLG_LOW, self.dl.CLG_HIGH)) |
| else: |
| action[j] = float(np.clip(action[j], self.dl.HTG_LOW, self.dl.HTG_HIGH)) |
| DEADBAND_GAP = 3.0 |
| |
| for z in range(5): |
| h_idx = 2 * z |
| c_idx = 2 * z + 1 |
| if action[c_idx] < action[h_idx] + DEADBAND_GAP: |
| action[c_idx] = min(self.dl.CLG_HIGH, action[h_idx] + DEADBAND_GAP) |
| if action[c_idx] < action[h_idx] + DEADBAND_GAP: |
| action[h_idx] = max(self.dl.HTG_LOW, action[c_idx] - DEADBAND_GAP) |
|
|
|
|
|
|
| if step < 5 or step % 1000 == 0: |
| print(f"[DT] Step {step} Raw Bins: {pred_bins}") |
| h_val = self._decode_bin_to_setpoint(int(pred_bins[0]), "htg_core") |
| c_val = self._decode_bin_to_setpoint(int(pred_bins[1]), "clg_core") |
| print(f"[DT] Step {step} Decoded Core: Heat {h_val:.2f} | Cool {c_val:.2f}") |
|
|
|
|
| self.prev_action = action |
| return action, {}, {} |
|
|
| def make_policy(policy_type: str, **kwargs): |
| policy_type = (policy_type or "").lower().strip() |
| if policy_type == "dt": |
| return DecisionTransformerPolicy5Zone( |
| ckpt_path=kwargs["ckpt_path"], |
| model_config_path=kwargs["model_config_path"], |
| norm_stats_path=kwargs["norm_stats_path"], |
| context_len=kwargs["context_len"], |
| max_tokens_per_step=kwargs["max_tokens_per_step"], |
| device=kwargs.get("device", "cpu"), |
| temperature=kwargs.get("temperature", 0.8), |
| ) |
| raise ValueError(f"Unknown policy_type={policy_type}.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|