| |
| """Probe local HPC resources and suggest safe EDA concurrency settings. |
| |
| This script dynamically calculates memory per worker from the EDA config file |
| (configs/eda_optimized.yaml) using max_memory_gib / max_workers. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import os |
| import platform |
| import shutil |
| import sys |
| from pathlib import Path |
| import yaml |
|
|
|
|
| def _mem_available_gib() -> float: |
| meminfo = Path("/proc/meminfo") |
| if not meminfo.exists(): |
| return 0.0 |
| for line in meminfo.read_text().splitlines(): |
| if line.startswith("MemAvailable:"): |
| kb = int(line.split()[1]) |
| return kb / (1024 * 1024) |
| return 0.0 |
|
|
|
|
| def _mem_total_gib() -> float: |
| meminfo = Path("/proc/meminfo") |
| if not meminfo.exists(): |
| return 0.0 |
| for line in meminfo.read_text().splitlines(): |
| if line.startswith("MemTotal:"): |
| kb = int(line.split()[1]) |
| return kb / (1024 * 1024) |
| return 0.0 |
|
|
|
|
| def _recommend_workers(cpu_count: int, mem_available_gib: float, mem_per_worker_gib: float) -> int: |
| |
| by_cpu = max(1, int(cpu_count * 0.75)) |
| by_mem = max(1, int(mem_available_gib // max(1.0, mem_per_worker_gib))) |
| return max(1, min(by_cpu, by_mem)) |
|
|
|
|
| def main() -> None: |
| parser = argparse.ArgumentParser(description=__doc__) |
| parser.add_argument( |
| "--config", |
| type=Path, |
| default=Path(__file__).parent.parent / "configs" / "eda_optimized.yaml", |
| help="Path to YAML configuration file.", |
| ) |
| parser.add_argument( |
| "--workdir", |
| type=Path, |
| help="Path to check disk usage for (if not specified, uses first input_dir from config).", |
| ) |
| args = parser.parse_args() |
|
|
| |
| config_path = args.config |
| if not config_path.exists(): |
| print(f"Error: Config file not found: {config_path}", file=sys.stderr) |
| sys.exit(1) |
| |
| with open(config_path) as f: |
| config = yaml.safe_load(f) |
| max_memory_gib = config['resources']['max_memory_gib'] |
| max_workers = config['resources']['max_workers'] |
| mem_per_worker_gib = max_memory_gib / max_workers |
| |
| |
| workdir = args.workdir |
| if workdir is None: |
| input_dirs = config.get('paths', {}).get('input_dirs', []) |
| if input_dirs: |
| workdir = Path(input_dirs[0]).parent.parent |
| else: |
| workdir = Path.cwd() |
|
|
| cpu_count = os.cpu_count() or 1 |
| mem_total_gib = _mem_total_gib() |
| mem_available_gib = _mem_available_gib() |
| disk_total, disk_used, disk_free = shutil.disk_usage(workdir) |
|
|
| recommended_workers = _recommend_workers( |
| cpu_count=cpu_count, |
| mem_available_gib=mem_available_gib, |
| mem_per_worker_gib=mem_per_worker_gib, |
| ) |
| recommended_shards = max(1, min(8, cpu_count // max(1, recommended_workers))) |
|
|
| report = { |
| "hostname": platform.node(), |
| "platform": platform.platform(), |
| "cpu_count": cpu_count, |
| "memory_total_gib": round(mem_total_gib, 2), |
| "memory_available_gib": round(mem_available_gib, 2), |
| "disk_total_gib": round(disk_total / (1024**3), 2), |
| "disk_used_gib": round(disk_used / (1024**3), 2), |
| "disk_free_gib": round(disk_free / (1024**3), 2), |
| "assumptions": {"mem_per_worker_gib": mem_per_worker_gib}, |
| "recommendation": { |
| "workers_per_node": recommended_workers, |
| "num_shards_suggestion": recommended_shards, |
| "chunk_size_suggestion": 4096, |
| }, |
| } |
| print(json.dumps(report, indent=2)) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|