| from __future__ import annotations |
|
|
| import os |
|
|
| import pytest |
|
|
| from trenches_env.env import FogOfWarDiplomacyEnv |
| from trenches_env.models import LiveControlRequest |
| from trenches_env.source_ingestion import HttpSourceFetcher, SourceHarvester |
|
|
|
|
| pytestmark = pytest.mark.skipif( |
| os.getenv("TRENCHES_RUN_LIVE_SOURCE_TESTS") != "1", |
| reason="set TRENCHES_RUN_LIVE_SOURCE_TESTS=1 to run live network probes", |
| ) |
|
|
|
|
| def test_live_source_monitor_delivers_packets_and_briefs_per_agent() -> None: |
| harvester = SourceHarvester(fetcher=HttpSourceFetcher(timeout_seconds=10.0), auto_start=False) |
| env = FogOfWarDiplomacyEnv(source_harvester=harvester).enable_source_warm_start() |
| try: |
| session = env.create_session(seed=7) |
| session = env.configure_live_session( |
| session, |
| LiveControlRequest(enabled=True, auto_step=False, poll_interval_ms=15_000), |
| ) |
| report = env.source_monitor(session) |
|
|
| failures: list[str] = [] |
| for agent in report.agents: |
| if agent.available_training_packet_count == 0: |
| failures.append(f"{agent.agent_id}: no healthy training packet") |
| if agent.delivered_training_brief_count == 0: |
| failures.append(f"{agent.agent_id}: no training brief visible to the model") |
| if agent.configured_live_sources > 0 and agent.available_live_packet_count == 0: |
| failures.append(f"{agent.agent_id}: no healthy live packet") |
| if agent.configured_live_sources > 0 and agent.delivered_live_brief_count == 0: |
| failures.append(f"{agent.agent_id}: no live brief visible to the model") |
|
|
| assert not failures, "\n".join(failures) |
| finally: |
| env.shutdown() |
|
|