| from __future__ import annotations |
|
|
| import os |
| from concurrent.futures import ThreadPoolExecutor |
|
|
| import pytest |
|
|
| from trenches_env.source_catalog import get_all_sources |
| from trenches_env.source_ingestion import HttpSourceFetcher, SourceHarvester |
|
|
|
|
| pytestmark = pytest.mark.skipif( |
| os.getenv("TRENCHES_RUN_LIVE_SOURCE_TESTS") != "1", |
| reason="set TRENCHES_RUN_LIVE_SOURCE_TESTS=1 to run live network probes", |
| ) |
|
|
|
|
| def test_all_sources_have_a_live_accessible_probe() -> None: |
| harvester = SourceHarvester(fetcher=HttpSourceFetcher(timeout_seconds=10.0), auto_start=False) |
| sources = get_all_sources() |
|
|
| def probe(source_id: str) -> tuple[str, str, str | None]: |
| source = next(source for source in sources if source.id == source_id) |
| packet = harvester.probe_source(source) |
| return source.name, packet.status, packet.error |
|
|
| failures: list[str] = [] |
| try: |
| with ThreadPoolExecutor(max_workers=12) as executor: |
| for source_name, status, error in executor.map(probe, [source.id for source in sources]): |
| if status != "ok": |
| failures.append(f"{source_name}: {error or 'probe failed'}") |
| finally: |
| harvester.stop() |
|
|
| assert not failures, "\n".join(failures) |
|
|