|
|
import gym |
|
|
import numpy as np |
|
|
import math |
|
|
import sys |
|
|
import os |
|
|
import functools |
|
|
|
|
|
import pandas as pd |
|
|
|
|
|
|
|
|
|
|
|
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) |
|
|
from Environment.solar_sys_environment import SolarSys |
|
|
|
|
|
|
|
|
def form_clusters(metrics: dict, size: int) -> list: |
|
|
""" |
|
|
Forms balanced, heterogeneous clusters by categorizing houses based on their |
|
|
energy profile and distributing them evenly in a round-robin fashion. |
|
|
""" |
|
|
house_ids = list(metrics.keys()) |
|
|
if not house_ids: |
|
|
return [] |
|
|
all_consumption = [m['consumption'] for m in metrics.values()] |
|
|
all_solar = [m['solar'] for m in metrics.values()] |
|
|
|
|
|
median_consumption = np.median(all_consumption) if all_consumption else 0 |
|
|
median_solar = np.median(all_solar) if all_solar else 0 |
|
|
|
|
|
|
|
|
producers = [h for h in house_ids if metrics[h]['solar'] >= median_solar and metrics[h]['consumption'] < median_consumption] |
|
|
consumers = [h for h in house_ids if metrics[h]['solar'] < median_solar and metrics[h]['consumption'] >= median_consumption] |
|
|
prosumers = [h for h in house_ids if metrics[h]['solar'] >= median_solar and metrics[h]['consumption'] >= median_consumption] |
|
|
neutrals = [h for h in house_ids if metrics[h]['solar'] < median_solar and metrics[h]['consumption'] < median_consumption] |
|
|
|
|
|
|
|
|
sorted_categorized_houses = producers + consumers + prosumers + neutrals |
|
|
|
|
|
|
|
|
categorized_set = set(sorted_categorized_houses) |
|
|
uncategorized = [h for h in house_ids if h not in categorized_set] |
|
|
final_house_list = sorted_categorized_houses + uncategorized |
|
|
num_houses = len(house_ids) |
|
|
num_clusters = math.ceil(num_houses / size) |
|
|
|
|
|
clusters = [[] for _ in range(num_clusters)] |
|
|
|
|
|
for i, house_id in enumerate(final_house_list): |
|
|
target_cluster_idx = i % num_clusters |
|
|
clusters[target_cluster_idx].append(house_id) |
|
|
|
|
|
return clusters |
|
|
|
|
|
class GlobalPriceVecEnvWrapper(gym.vector.VectorEnvWrapper): |
|
|
def __init__(self, env, clusters: list): |
|
|
super().__init__(env) |
|
|
self.clusters = clusters |
|
|
|
|
|
|
|
|
self.cluster_envs = self.env.envs |
|
|
|
|
|
def step(self, actions: np.ndarray, exports: np.ndarray = None, imports: np.ndarray = None): |
|
|
num_clusters = len(self.cluster_envs) |
|
|
net_transfers = np.zeros(num_clusters) |
|
|
if exports is not None and imports is not None: |
|
|
net_transfers = imports - exports |
|
|
batched_low_level_actions = actions |
|
|
batched_transfers = net_transfers.reshape(-1, 1).astype(np.float32) |
|
|
batched_prices = np.full((num_clusters, 1), -1.0, dtype=np.float32) |
|
|
final_packed_actions_tuple = (batched_low_level_actions, batched_transfers, batched_prices) |
|
|
obs_next, rewards, terminateds, truncateds, infos = self.env.step(final_packed_actions_tuple) |
|
|
dones = terminateds | truncateds |
|
|
done_all = dones.all() |
|
|
|
|
|
|
|
|
|
|
|
if done_all: |
|
|
final_infos = infos['final_info'] |
|
|
keys = final_infos[0].keys() |
|
|
infos = {k: np.stack([info[k] for info in final_infos]) for k in keys} |
|
|
|
|
|
info_agg = { |
|
|
"cluster_dones": dones, |
|
|
"cluster_infos": infos, |
|
|
} |
|
|
|
|
|
return obs_next, rewards, done_all, info_agg |
|
|
|
|
|
def get_export_capacity(self, cluster_idx: int) -> float: |
|
|
"""Returns the total physically exportable energy from a cluster's batteries and solar in kWh.""" |
|
|
cluster_env = self.cluster_envs[cluster_idx] |
|
|
available_from_batt = cluster_env.battery_soc * cluster_env.battery_discharge_efficiency |
|
|
total_exportable = np.sum(available_from_batt) + cluster_env.current_solar |
|
|
return float(total_exportable) |
|
|
|
|
|
def get_import_capacity(self, cluster_idx: int) -> float: |
|
|
"""Returns the total physically importable space in a cluster's batteries in kWh.""" |
|
|
cluster_env = self.cluster_envs[cluster_idx] |
|
|
free_space = cluster_env.battery_max_capacity - cluster_env.battery_soc |
|
|
total_storable = np.sum(free_space) |
|
|
return float(total_storable) |
|
|
|
|
|
def send_energy(self, from_cluster_idx: int, amount: float) -> float: |
|
|
"""Drains 'amount' of energy from the specified cluster (batteries first, then solar).""" |
|
|
cluster_env = self.cluster_envs[from_cluster_idx] |
|
|
return cluster_env.send_energy(amount) |
|
|
|
|
|
def receive_energy(self, to_cluster_idx: int, amount: float) -> float: |
|
|
"""Charges batteries in the specified cluster with 'amount' of energy.""" |
|
|
cluster_env = self.cluster_envs[to_cluster_idx] |
|
|
return cluster_env.receive_energy(amount) |
|
|
|
|
|
|
|
|
def make_vec_env(data_path: str, time_freq: str, cluster_size: int, state: str): |
|
|
print("--- Pre-loading shared dataset for all environments ---") |
|
|
try: |
|
|
shared_df = pd.read_csv(data_path) |
|
|
shared_df["local_15min"] = pd.to_datetime(shared_df["local_15min"], utc=True) |
|
|
shared_df.set_index("local_15min", inplace=True) |
|
|
|
|
|
|
|
|
shared_df = shared_df.resample(time_freq).mean() |
|
|
|
|
|
|
|
|
except Exception as e: |
|
|
raise ValueError(f"Failed to pre-load data in make_vec_env: {e}") |
|
|
|
|
|
base_env_for_metrics = SolarSys( |
|
|
data_path=data_path, |
|
|
time_freq=time_freq, |
|
|
preloaded_data=shared_df, |
|
|
state=state |
|
|
) |
|
|
|
|
|
|
|
|
metrics = {} |
|
|
for hid in base_env_for_metrics.house_ids: |
|
|
total_consumption = float( |
|
|
np.clip(base_env_for_metrics.original_no_p2p_import[hid], 0.0, None).sum() |
|
|
) |
|
|
total_solar = float( |
|
|
base_env_for_metrics.all_data[f"total_solar_{hid}"].clip(lower=0.0).sum() |
|
|
) |
|
|
metrics[hid] = {'consumption': total_consumption, 'solar': total_solar} |
|
|
|
|
|
clusters = form_clusters(metrics, cluster_size) |
|
|
print(f"Formed {len(clusters)} clusters of size up to {cluster_size}.") |
|
|
|
|
|
|
|
|
env_fns = [] |
|
|
for cluster_house_ids in clusters: |
|
|
preset_env_fn = functools.partial( |
|
|
SolarSys, |
|
|
data_path=data_path, |
|
|
time_freq=time_freq, |
|
|
house_ids_in_cluster=cluster_house_ids, |
|
|
preloaded_data=shared_df, |
|
|
state=state |
|
|
) |
|
|
env_fns.append(preset_env_fn) |
|
|
sync_vec_env = gym.vector.SyncVectorEnv(env_fns) |
|
|
wrapped_vec_env = GlobalPriceVecEnvWrapper(sync_vec_env, clusters=clusters) |
|
|
|
|
|
return wrapped_vec_env |