SolarSys2025's picture
Upload 30 files
55da406 verified
import gym
import numpy as np
import math
import sys
import os
import functools
import pandas as pd
# Ensure SolarSys Environement is on the Python path
# Please ensure you follow proper directory structure for running this code
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from Environment.solar_sys_environment import SolarSys
def form_clusters(metrics: dict, size: int) -> list:
"""
Forms balanced, heterogeneous clusters by categorizing houses based on their
energy profile and distributing them evenly in a round-robin fashion.
"""
house_ids = list(metrics.keys())
if not house_ids:
return []
all_consumption = [m['consumption'] for m in metrics.values()]
all_solar = [m['solar'] for m in metrics.values()]
median_consumption = np.median(all_consumption) if all_consumption else 0
median_solar = np.median(all_solar) if all_solar else 0
#Categorize each house based on its profile relative to the median
producers = [h for h in house_ids if metrics[h]['solar'] >= median_solar and metrics[h]['consumption'] < median_consumption]
consumers = [h for h in house_ids if metrics[h]['solar'] < median_solar and metrics[h]['consumption'] >= median_consumption]
prosumers = [h for h in house_ids if metrics[h]['solar'] >= median_solar and metrics[h]['consumption'] >= median_consumption]
neutrals = [h for h in house_ids if metrics[h]['solar'] < median_solar and metrics[h]['consumption'] < median_consumption]
# Create a master list ordered by category
sorted_categorized_houses = producers + consumers + prosumers + neutrals
# Add any houses that weren't categorized to ensure none are missed
categorized_set = set(sorted_categorized_houses)
uncategorized = [h for h in house_ids if h not in categorized_set]
final_house_list = sorted_categorized_houses + uncategorized
num_houses = len(house_ids)
num_clusters = math.ceil(num_houses / size)
clusters = [[] for _ in range(num_clusters)]
for i, house_id in enumerate(final_house_list):
target_cluster_idx = i % num_clusters
clusters[target_cluster_idx].append(house_id)
return clusters
class GlobalPriceVecEnvWrapper(gym.vector.VectorEnvWrapper):
def __init__(self, env, clusters: list):
super().__init__(env)
self.clusters = clusters
# Expose the underlying SolarSys environments for inspection by the coordinator
# self.env.envs gets the list of individual envs from the SyncVectorEnv
self.cluster_envs = self.env.envs
def step(self, actions: np.ndarray, exports: np.ndarray = None, imports: np.ndarray = None):
num_clusters = len(self.cluster_envs)
net_transfers = np.zeros(num_clusters)
if exports is not None and imports is not None:
net_transfers = imports - exports
batched_low_level_actions = actions
batched_transfers = net_transfers.reshape(-1, 1).astype(np.float32)
batched_prices = np.full((num_clusters, 1), -1.0, dtype=np.float32)
final_packed_actions_tuple = (batched_low_level_actions, batched_transfers, batched_prices)
obs_next, rewards, terminateds, truncateds, infos = self.env.step(final_packed_actions_tuple)
dones = terminateds | truncateds
done_all = dones.all()
if done_all:
final_infos = infos['final_info']
keys = final_infos[0].keys()
infos = {k: np.stack([info[k] for info in final_infos]) for k in keys}
info_agg = {
"cluster_dones": dones,
"cluster_infos": infos,
}
return obs_next, rewards, done_all, info_agg
def get_export_capacity(self, cluster_idx: int) -> float:
"""Returns the total physically exportable energy from a cluster's batteries and solar in kWh."""
cluster_env = self.cluster_envs[cluster_idx]
available_from_batt = cluster_env.battery_soc * cluster_env.battery_discharge_efficiency
total_exportable = np.sum(available_from_batt) + cluster_env.current_solar
return float(total_exportable)
def get_import_capacity(self, cluster_idx: int) -> float:
"""Returns the total physically importable space in a cluster's batteries in kWh."""
cluster_env = self.cluster_envs[cluster_idx]
free_space = cluster_env.battery_max_capacity - cluster_env.battery_soc
total_storable = np.sum(free_space)
return float(total_storable)
def send_energy(self, from_cluster_idx: int, amount: float) -> float:
"""Drains 'amount' of energy from the specified cluster (batteries first, then solar)."""
cluster_env = self.cluster_envs[from_cluster_idx]
return cluster_env.send_energy(amount)
def receive_energy(self, to_cluster_idx: int, amount: float) -> float:
"""Charges batteries in the specified cluster with 'amount' of energy."""
cluster_env = self.cluster_envs[to_cluster_idx]
return cluster_env.receive_energy(amount)
def make_vec_env(data_path: str, time_freq: str, cluster_size: int, state: str):
print("--- Pre-loading shared dataset for all environments ---")
try:
shared_df = pd.read_csv(data_path)
shared_df["local_15min"] = pd.to_datetime(shared_df["local_15min"], utc=True)
shared_df.set_index("local_15min", inplace=True)
# ADD THIS LINE
shared_df = shared_df.resample(time_freq).mean()
# ADD THIS LINE
except Exception as e:
raise ValueError(f"Failed to pre-load data in make_vec_env: {e}")
base_env_for_metrics = SolarSys(
data_path=data_path,
time_freq=time_freq,
preloaded_data=shared_df, # Pass the shared DataFrame here
state=state
)
# This part for calculating metrics and forming clusters
metrics = {}
for hid in base_env_for_metrics.house_ids:
total_consumption = float(
np.clip(base_env_for_metrics.original_no_p2p_import[hid], 0.0, None).sum()
)
total_solar = float(
base_env_for_metrics.all_data[f"total_solar_{hid}"].clip(lower=0.0).sum()
)
metrics[hid] = {'consumption': total_consumption, 'solar': total_solar}
clusters = form_clusters(metrics, cluster_size)
print(f"Formed {len(clusters)} clusters of size up to {cluster_size}.")
# functools.partial to create environment
env_fns = []
for cluster_house_ids in clusters:
preset_env_fn = functools.partial(
SolarSys,
data_path=data_path,
time_freq=time_freq,
house_ids_in_cluster=cluster_house_ids,
preloaded_data=shared_df,
state=state
)
env_fns.append(preset_env_fn)
sync_vec_env = gym.vector.SyncVectorEnv(env_fns)
wrapped_vec_env = GlobalPriceVecEnvWrapper(sync_vec_env, clusters=clusters)
return wrapped_vec_env