SolarSys2025's picture
Upload 30 files
55da406 verified
import os
import sys
import numpy as np
import torch
# Ensure project root is on the Python path
# Please ensure you follow proper directory structure for running this code
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from Environment.solar_sys_environment import SolarSys
from Environment.cluster_env_wrapper import GlobalPriceVecEnvWrapper
from Environment.cluster_env_wrapper import make_vec_env
class InterClusterLedger:
"""
Tracks inter-cluster debts/transfers.
"""
def __init__(self):
self.balances = {}
def record_transfer(self, from_id: str, to_id: str, amount: float):
if from_id == to_id: return
self.balances.setdefault(from_id, {})
self.balances.setdefault(to_id, {})
self.balances[from_id][to_id] = self.balances[from_id].get(to_id, 0.0) - amount
self.balances[to_id][from_id] = self.balances[to_id].get(from_id, 0.0) + amount
def get_balance(self, a_id: str, b_id: str) -> float:
return self.balances.get(a_id, {}).get(b_id, 0.0)
def net_balances(self) -> dict:
return self.balances
class InterClusterCoordinator:
def __init__(
self,
cluster_env,
high_level_agent,
ledger,
max_transfer_kwh: float = 1000000.0,
w_cost_savings: float = 2.0,
w_grid_penalty: float = 0.3,
w_p2p_bonus: float = 0.3
):
self.cluster_env = cluster_env
self.agent = high_level_agent
self.ledger = ledger
self.max_transfer_kwh = max_transfer_kwh
self.w_cost_savings = w_cost_savings
self.w_grid_penalty = w_grid_penalty
self.w_p2p_bonus = w_p2p_bonus
def get_cluster_state(self, env, step_count: int) -> np.ndarray:
"""
array summarizing a single cluster's state by reading from its vectorized attributes.
"""
solar_env = env # This is one of the vectorized SolarSys envs
idx = min(step_count, solar_env.num_steps - 1)
agg_soc = np.sum(solar_env.battery_soc)
agg_max_capacity = np.sum(solar_env.battery_max_capacity)
agg_soc_fraction = agg_soc / agg_max_capacity if agg_max_capacity > 0 else 0.0
agg_demand = np.sum(solar_env.demands_day[idx])
agg_solar = np.sum(solar_env.solars_day[idx])
price = solar_env.get_grid_price(idx)
t_norm = idx / float(solar_env.steps_per_day)
return np.array([
agg_soc, agg_max_capacity, agg_soc_fraction,
agg_demand, agg_solar, price, t_norm
], dtype=np.float32)
def build_transfers(self, agent_action_vector: np.ndarray, reports: dict) -> tuple[np.ndarray, np.ndarray]:
"""
Acts as a centralized market maker based on agent actions and LIVE capacity reports.
"""
n = len(self.cluster_env.clusters)
raw_export_prefs = agent_action_vector[:, 0]
raw_import_prefs = agent_action_vector[:, 1]
export_prefs = torch.softmax(torch.tensor(raw_export_prefs), dim=-1).numpy()
import_prefs = torch.softmax(torch.tensor(raw_import_prefs), dim=-1).numpy()
total_available_for_export = 0.0
potential_exports = np.zeros(n)
for i in range(n):
export_capacity = reports[i]['export_capacity']
pref = float(export_prefs[i])
potential_exports[i] = min(pref * self.max_transfer_kwh, export_capacity)
total_available_for_export += potential_exports[i]
total_requested_for_import = 0.0
potential_imports = np.zeros(n)
for i in range(n):
import_capacity = reports[i]['import_capacity']
pref = float(import_prefs[i])
potential_imports[i] = min(pref * self.max_transfer_kwh, import_capacity)
total_requested_for_import += potential_imports[i]
total_matched_energy = min(total_available_for_export, total_requested_for_import)
actual_exports = np.zeros(n)
actual_imports = np.zeros(n)
if total_matched_energy > 1e-6:
if total_available_for_export > 0:
actual_exports = (potential_exports / total_available_for_export) * total_matched_energy
if total_requested_for_import > 0:
actual_imports = (potential_imports / total_requested_for_import) * total_matched_energy
return actual_exports, actual_imports
def compute_inter_cluster_reward(self, all_cluster_infos: dict, actual_transfers: tuple, step_count: int) -> np.ndarray:
"""
Computes an INDIVIDUAL reward for each cluster agent to solve
the credit assignment problem.
"""
actual_exports, actual_imports = actual_transfers
num_clusters = len(self.cluster_env.cluster_envs)
cluster_rewards = np.zeros(num_clusters, dtype=np.float32)
# Extract per-cluster cost and import data from the batched info dict
costs_per_cluster = [np.sum(c) for c in all_cluster_infos['costs']]
baseline_imports_per_cluster = [np.sum(imp) for imp in all_cluster_infos['grid_import_no_p2p']]
actual_imports_per_cluster = [np.sum(imp) for imp in all_cluster_infos['grid_import_with_p2p']]
# Get the single grid price for the current step
grid_price = self.cluster_env.cluster_envs[0].get_grid_price(step_count)
for i in range(num_clusters):
baseline_cost_this_cluster = baseline_imports_per_cluster[i] * grid_price
actual_cost_this_cluster = costs_per_cluster[i]
cost_saved = baseline_cost_this_cluster - actual_cost_this_cluster
r_savings = self.w_cost_savings * cost_saved
r_grid = self.w_grid_penalty * actual_imports_per_cluster[i]
p2p_volume_this_cluster = actual_exports[i] + actual_imports[i]
r_p2p = self.w_p2p_bonus * p2p_volume_this_cluster
cluster_rewards[i] = r_savings + r_p2p - r_grid
return cluster_rewards