| """Tests for the Coordinator: multi-rate stepping order, dc_buffer accumulation.""" |
|
|
| from __future__ import annotations |
|
|
| import warnings |
| from collections.abc import Callable |
| from fractions import Fraction |
|
|
| import numpy as np |
|
|
| from openg2g.clock import SimulationClock |
| from openg2g.common import ThreePhase |
| from openg2g.controller.base import Controller |
| from openg2g.coordinator import Coordinator, _gcd_fraction |
| from openg2g.datacenter.base import DatacenterBackend, DatacenterState |
| from openg2g.datacenter.command import DatacenterCommand, SetBatchSize |
| from openg2g.events import EventEmitter |
| from openg2g.grid.base import BusVoltages, GridBackend, GridState, PhaseVoltages |
| from openg2g.grid.command import GridCommand |
| from openg2g.grid.opendss import OpenDSSGrid |
|
|
|
|
| def test_gcd_fraction(): |
| assert _gcd_fraction(Fraction(1, 10), Fraction(1)) == Fraction(1, 10) |
| assert _gcd_fraction(Fraction(1, 2), Fraction(3, 2)) == Fraction(1, 2) |
| assert _gcd_fraction(Fraction(60), Fraction(1)) == Fraction(1) |
|
|
|
|
| class _StubDC(DatacenterBackend[DatacenterState]): |
| """Minimal datacenter for coordinator tests.""" |
|
|
| def __init__(self, dt_s: Fraction = Fraction(1, 10)) -> None: |
| super().__init__() |
| self._dt_s = dt_s |
| self.step_count = 0 |
| self.apply_control_calls: list[DatacenterCommand] = [] |
|
|
| @property |
| def dt_s(self) -> Fraction: |
| return self._dt_s |
|
|
| def reset(self) -> None: |
| self.step_count = 0 |
| self.apply_control_calls = [] |
|
|
| def step(self, clock: SimulationClock, events: EventEmitter) -> DatacenterState: |
| self.step_count += 1 |
| return DatacenterState( |
| time_s=clock.time_s, |
| power_w=ThreePhase(a=100.0, b=100.0, c=100.0), |
| ) |
|
|
| def apply_control(self, command: DatacenterCommand, events: EventEmitter) -> None: |
| self.apply_control_calls.append(command) |
|
|
|
|
| class _StubGrid(GridBackend[GridState]): |
| """Minimal grid for coordinator tests.""" |
|
|
| def __init__(self, dt_s: Fraction = Fraction(1)) -> None: |
| super().__init__() |
| self._dt_s = dt_s |
| self.step_count = 0 |
| self.step_calls: list[tuple[SimulationClock, list[ThreePhase]]] = [] |
|
|
| @property |
| def dt_s(self) -> Fraction: |
| return self._dt_s |
|
|
| def reset(self) -> None: |
| self.step_count = 0 |
| self.step_calls = [] |
|
|
| def step( |
| self, |
| clock: SimulationClock, |
| power_samples_w: list[ThreePhase], |
| events: EventEmitter, |
| ) -> GridState: |
| self.step_count += 1 |
| self.step_calls.append((clock, list(power_samples_w))) |
| return GridState( |
| time_s=clock.time_s, |
| voltages=BusVoltages({"671": PhaseVoltages(a=1.0, b=1.0, c=1.0)}), |
| ) |
|
|
| @property |
| def v_index(self) -> list[tuple[str, int]]: |
| return [("671", 0), ("671", 1), ("671", 2)] |
|
|
| def voltages_vector(self) -> np.ndarray: |
| return np.array([1.0, 1.0, 1.0]) |
|
|
| def estimate_sensitivity(self, perturbation_kw: float = 100.0) -> tuple[np.ndarray, np.ndarray]: |
| return np.zeros((3, 3)), np.ones(3) |
|
|
| def apply_control(self, command: GridCommand, events: EventEmitter) -> None: |
| pass |
|
|
|
|
| class _StubController(Controller[DatacenterBackend, GridBackend]): |
| """Test controller that delegates to a callback or returns a fixed action.""" |
|
|
| def __init__( |
| self, |
| dt_s: Fraction = Fraction(1), |
| commands: list[DatacenterCommand | GridCommand] | None = None, |
| on_step: Callable[ |
| [SimulationClock, DatacenterBackend, GridBackend, EventEmitter], |
| list[DatacenterCommand | GridCommand], |
| ] |
| | None = None, |
| ): |
| self._dt_s = dt_s |
| self._commands = commands if commands is not None else [] |
| self._on_step = on_step |
| self.call_count = 0 |
|
|
| def reset(self) -> None: |
| self.call_count = 0 |
|
|
| @property |
| def dt_s(self) -> Fraction: |
| return self._dt_s |
|
|
| def step( |
| self, |
| clock: SimulationClock, |
| datacenter: DatacenterBackend, |
| grid: GridBackend, |
| events: EventEmitter, |
| ) -> list[DatacenterCommand | GridCommand]: |
| self.call_count += 1 |
| if self._on_step is not None: |
| return self._on_step(clock, datacenter, grid, events) |
| return self._commands |
|
|
|
|
| def test_coordinator_dc_fires_every_tick(): |
| """DC (dt=1/10) should fire 10x per grid step (dt=1).""" |
| dc = _StubDC(dt_s=Fraction(1, 10)) |
| grid = _StubGrid(dt_s=Fraction(1)) |
| ctrl = _StubController(dt_s=Fraction(1)) |
|
|
| coord = Coordinator(dc, grid, [ctrl], total_duration_s=1, dc_bus="671") |
| coord.run() |
|
|
| |
| assert dc.step_count == 10 |
| |
| assert grid.step_count == 1 |
|
|
|
|
| def test_coordinator_dc_buffer_flush(): |
| """Grid receives full DC buffer at each grid step.""" |
| dc = _StubDC(dt_s=Fraction(1, 10)) |
| grid = _StubGrid(dt_s=Fraction(1, 2)) |
| ctrl = _StubController(dt_s=Fraction(1, 2)) |
|
|
| coord = Coordinator(dc, grid, [ctrl], total_duration_s=1, dc_bus="671") |
| coord.run() |
|
|
| |
| assert grid.step_count == 2 |
| |
| |
| sizes = [len(call[1]) for call in grid.step_calls] |
| assert sizes == [1, 5] |
|
|
|
|
| def test_coordinator_controller_order(): |
| """Controllers execute in order, after DC and grid.""" |
| call_order: list[str] = [] |
|
|
| class _OrderDC(_StubDC): |
| def step(self, clock: SimulationClock, events: EventEmitter) -> DatacenterState: |
| call_order.append("dc") |
| return super().step(clock, events) |
|
|
| class _OrderGrid(_StubGrid): |
| def step( |
| self, |
| clock: SimulationClock, |
| power_samples_w: list[ThreePhase], |
| events: EventEmitter, |
| ) -> GridState: |
| call_order.append("grid") |
| return super().step(clock, power_samples_w, events) |
|
|
| dc = _OrderDC(dt_s=Fraction(1)) |
| grid = _OrderGrid(dt_s=Fraction(1)) |
|
|
| ctrl1 = _StubController( |
| dt_s=Fraction(1), |
| on_step=lambda clock, dc, g, ev: ( |
| call_order.append("ctrl1"), |
| [], |
| )[-1], |
| ) |
|
|
| ctrl2 = _StubController( |
| dt_s=Fraction(1), |
| on_step=lambda clock, dc, g, ev: ( |
| call_order.append("ctrl2"), |
| [], |
| )[-1], |
| ) |
|
|
| coord = Coordinator(dc, grid, [ctrl1, ctrl2], total_duration_s=1, dc_bus="671") |
| coord.run() |
|
|
| |
| assert call_order[0] == "dc" |
| assert call_order[1] == "grid" |
| assert call_order[2] == "ctrl1" |
| assert call_order[3] == "ctrl2" |
|
|
|
|
| def test_coordinator_batch_action_applied(): |
| """Batch size changes from controllers are applied to datacenter.""" |
| dc = _StubDC(dt_s=Fraction(1)) |
| grid = _StubGrid(dt_s=Fraction(1)) |
|
|
| batch_cmd = SetBatchSize(batch_size_by_model={"model_a": 64}) |
| ctrl = _StubController(dt_s=Fraction(1), commands=[batch_cmd]) |
|
|
| coord = Coordinator(dc, grid, [ctrl], total_duration_s=1, dc_bus="671") |
| coord.run() |
|
|
| assert dc.apply_control_calls == [batch_cmd] |
|
|
|
|
| def test_coordinator_exposes_clock_stamped_controller_events(): |
| dc = _StubDC(dt_s=Fraction(1)) |
| grid = _StubGrid(dt_s=Fraction(1)) |
|
|
| def _on_step( |
| clock: SimulationClock, |
| datacenter: DatacenterBackend, |
| grid: GridBackend, |
| events: EventEmitter, |
| ) -> list[DatacenterCommand | GridCommand]: |
|
|
| events.emit("controller.test", {"value": 1}) |
| return [] |
|
|
| ctrl = _StubController(dt_s=Fraction(1), on_step=_on_step) |
| coord = Coordinator(dc, grid, [ctrl], total_duration_s=1, dc_bus="671") |
| log = coord.run() |
|
|
| assert len(log.events) == 1 |
| ev = log.events[0] |
| assert ev.source == "controller" |
| assert ev.topic == "controller.test" |
| assert ev.tick == 0 |
| assert abs(ev.t_s - 0.0) < 1e-12 |
|
|
|
|
| def test_datacenter_events_are_recorded(): |
| class _EventedDC(DatacenterBackend[DatacenterState]): |
| def __init__(self) -> None: |
| super().__init__() |
| self._dt_s = Fraction(1) |
|
|
| @property |
| def dt_s(self) -> Fraction: |
| return self._dt_s |
|
|
| def reset(self) -> None: |
| pass |
|
|
| def step(self, clock: SimulationClock, events: EventEmitter) -> DatacenterState: |
| return DatacenterState(time_s=clock.time_s, power_w=ThreePhase(a=1.0, b=1.0, c=1.0)) |
|
|
| def apply_control(self, command: DatacenterCommand, events: EventEmitter) -> None: |
| events.emit( |
| "datacenter.batch_size.updated", |
| {"batch_size_by_model": {"model_a": 64}}, |
| ) |
|
|
| dc = _EventedDC() |
| grid = _StubGrid(dt_s=Fraction(1)) |
| ctrl = _StubController( |
| dt_s=Fraction(1), |
| commands=[SetBatchSize(batch_size_by_model={"model_a": 64})], |
| ) |
|
|
| coord = Coordinator(dc, grid, [ctrl], total_duration_s=1, dc_bus="671") |
| log = coord.run() |
| batch_events = [e for e in log.events if e.topic == "datacenter.batch_size.updated"] |
| assert len(batch_events) == 1 |
| assert batch_events[0].data["batch_size_by_model"]["model_a"] == 64 |
|
|
|
|
| def test_controller_generic_types_auto_extracted(): |
| class _TypedController(Controller[DatacenterBackend, OpenDSSGrid]): |
| @property |
| def dt_s(self) -> Fraction: |
| return Fraction(1) |
|
|
| def reset(self) -> None: |
| pass |
|
|
| def step( |
| self, |
| clock: SimulationClock, |
| datacenter: DatacenterBackend, |
| grid: OpenDSSGrid, |
| events: EventEmitter, |
| ) -> list[DatacenterCommand | GridCommand]: |
|
|
| return [] |
|
|
| assert _TypedController.compatible_datacenter_types() == (DatacenterBackend,) |
| assert _TypedController.compatible_grid_types() == (OpenDSSGrid,) |
|
|
|
|
| def test_controller_datacenter_mismatch_error_has_underlined_generic_snippet(): |
| class _ExpectedDC(DatacenterBackend[DatacenterState]): |
| def __init__(self) -> None: |
| super().__init__() |
|
|
| @property |
| def dt_s(self) -> Fraction: |
| return Fraction(1) |
|
|
| def reset(self) -> None: |
| pass |
|
|
| def step(self, clock: SimulationClock, events: EventEmitter) -> DatacenterState: |
| return DatacenterState(time_s=clock.time_s, power_w=ThreePhase(a=1.0, b=1.0, c=1.0)) |
|
|
| def apply_control(self, command: DatacenterCommand, events: EventEmitter) -> None: |
| pass |
|
|
| class _OtherDC(DatacenterBackend[DatacenterState]): |
| def __init__(self) -> None: |
| super().__init__() |
|
|
| @property |
| def dt_s(self) -> Fraction: |
| return Fraction(1) |
|
|
| def reset(self) -> None: |
| pass |
|
|
| def step(self, clock: SimulationClock, events: EventEmitter) -> DatacenterState: |
| return DatacenterState(time_s=clock.time_s, power_w=ThreePhase(a=1.0, b=1.0, c=1.0)) |
|
|
| def apply_control(self, command: DatacenterCommand, events: EventEmitter) -> None: |
| pass |
|
|
| class _NeedsExpectedDC(Controller[_ExpectedDC, OpenDSSGrid]): |
| @property |
| def dt_s(self) -> Fraction: |
| return Fraction(1) |
|
|
| def reset(self) -> None: |
| pass |
|
|
| def step( |
| self, |
| clock: SimulationClock, |
| datacenter: _ExpectedDC, |
| grid: OpenDSSGrid, |
| events: EventEmitter, |
| ) -> list[DatacenterCommand | GridCommand]: |
|
|
| return [] |
|
|
| dc = _OtherDC() |
| grid = _StubGrid(dt_s=Fraction(1)) |
| ctrl = _NeedsExpectedDC() |
| coord = Coordinator(dc, grid, [ctrl], total_duration_s=1, dc_bus="671") |
|
|
| try: |
| coord.run() |
| raise AssertionError("Expected TypeError for controller/datacenter incompatibility.") |
| except TypeError as exc: |
| msg = str(exc) |
| assert "_NeedsExpectedDC" in msg |
| assert "_ExpectedDC" in msg |
| assert "_OtherDC" in msg |
|
|
|
|
| def test_coordinator_grid_uses_stale_power_when_dc_buffer_empty(): |
| """When dt_grid < dt_dc, grid gets the most recent DC power as fallback.""" |
| dc = _StubDC(dt_s=Fraction(1)) |
| grid = _StubGrid(dt_s=Fraction(1, 2)) |
|
|
| with warnings.catch_warnings(): |
| warnings.simplefilter("ignore") |
| coord = Coordinator(dc, grid, [], total_duration_s=1, dc_bus="671") |
| log = coord.run() |
|
|
| |
| |
| assert grid.step_count == 2 |
| |
| assert len(log.grid_states) == 2 |
|
|
|
|
| def test_coordinator_warns_dt_grid_lt_dt_dc(): |
| with warnings.catch_warnings(record=True) as w: |
| warnings.simplefilter("always") |
| Coordinator( |
| _StubDC(dt_s=Fraction(1)), |
| _StubGrid(dt_s=Fraction(1, 2)), |
| [], |
| total_duration_s=1, |
| dc_bus="671", |
| ) |
| msgs = [str(x.message) for x in w] |
| assert any("dt_grid" in m and "dt_dc" in m for m in msgs) |
|
|
|
|
| def test_coordinator_warns_ctrl_dt_lt_grid_dt(): |
| with warnings.catch_warnings(record=True) as w: |
| warnings.simplefilter("always") |
| Coordinator( |
| _StubDC(dt_s=Fraction(1)), |
| _StubGrid(dt_s=Fraction(1)), |
| [_StubController(dt_s=Fraction(1, 2))], |
| total_duration_s=1, |
| dc_bus="671", |
| ) |
| msgs = [str(x.message) for x in w] |
| assert any("stale voltages" in m for m in msgs) |
|
|
|
|
| def test_coordinator_run_twice_identical(): |
| """Calling run() twice on the same coordinator produces identical results.""" |
| dc = _StubDC(dt_s=Fraction(1, 10)) |
| grid = _StubGrid(dt_s=Fraction(1)) |
| ctrl = _StubController(dt_s=Fraction(1)) |
| coord = Coordinator(dc, grid, [ctrl], total_duration_s=2, dc_bus="671") |
|
|
| log1 = coord.run() |
| log2 = coord.run() |
|
|
| assert len(log1.dc_states) == len(log2.dc_states) |
| assert len(log1.grid_states) == len(log2.grid_states) |
| assert [s.time_s for s in log1.grid_states] == [s.time_s for s in log2.grid_states] |
| assert dc.step_count == 20 |
| assert grid.step_count == 2 |
|
|