| from pathlib import Path |
| import argparse |
| import json |
| import os |
| import shutil |
| import subprocess |
| import sys |
| import time |
| from typing import Dict, List, Optional, Sequence |
|
|
|
|
| PROJECT_ROOT = Path(__file__).resolve().parents[1] |
| if str(PROJECT_ROOT) not in sys.path: |
| sys.path.insert(0, str(PROJECT_ROOT)) |
|
|
|
|
| def _configure_thread_env() -> None: |
| defaults = { |
| "OMP_NUM_THREADS": "1", |
| "OPENBLAS_NUM_THREADS": "1", |
| "MKL_NUM_THREADS": "1", |
| "NUMEXPR_NUM_THREADS": "1", |
| "VECLIB_MAXIMUM_THREADS": "1", |
| "BLIS_NUM_THREADS": "1", |
| "MALLOC_ARENA_MAX": "2", |
| } |
| for key, value in defaults.items(): |
| os.environ.setdefault(key, value) |
|
|
|
|
| def _configure_coppeliasim_env() -> None: |
| coppeliasim_root = os.environ.setdefault("COPPELIASIM_ROOT", "/workspace/coppelia_sim") |
| ld_library_path_parts = [ |
| part for part in os.environ.get("LD_LIBRARY_PATH", "").split(":") if part |
| ] |
| if coppeliasim_root not in ld_library_path_parts: |
| ld_library_path_parts.insert(0, coppeliasim_root) |
| os.environ["LD_LIBRARY_PATH"] = ":".join(ld_library_path_parts) |
|
|
|
|
| _configure_thread_env() |
| _configure_coppeliasim_env() |
|
|
| from rr_label_study.oven_study import _aggregate_summary, _episode_dirs |
|
|
|
|
| def _select_episode_indices( |
| total_episodes: int, |
| episode_offset: int, |
| max_episodes: Optional[int], |
| episode_indices: Optional[Sequence[int]], |
| ) -> List[int]: |
| if episode_indices is not None: |
| selected: List[int] = [] |
| seen = set() |
| for raw_index in episode_indices: |
| episode_index = int(raw_index) |
| if not (0 <= episode_index < total_episodes): |
| raise ValueError( |
| f"episode index {episode_index} outside available range 0..{total_episodes - 1}" |
| ) |
| if episode_index in seen: |
| continue |
| selected.append(episode_index) |
| seen.add(episode_index) |
| return selected |
|
|
| remaining = max(0, total_episodes - episode_offset) |
| if max_episodes is not None: |
| remaining = min(remaining, max_episodes) |
| if remaining <= 0: |
| return [] |
| return list(range(episode_offset, episode_offset + remaining)) |
|
|
|
|
| def _is_complete_episode_dir(output_dir: Path, episode_name: str) -> bool: |
| required = [ |
| output_dir.joinpath(f"{episode_name}.dense.csv"), |
| output_dir.joinpath(f"{episode_name}.keyframes.csv"), |
| output_dir.joinpath(f"{episode_name}.debug.jsonl"), |
| output_dir.joinpath(f"{episode_name}.metrics.json"), |
| output_dir.joinpath("summary.json"), |
| output_dir.joinpath("templates.json"), |
| output_dir.joinpath("templates.pkl"), |
| ] |
| return all(path.exists() for path in required) |
|
|
|
|
| def _load_metrics(result_dir: Path, episode_names: Sequence[str]) -> List[Dict[str, object]]: |
| metrics: List[Dict[str, object]] = [] |
| for episode_name in episode_names: |
| metrics_path = result_dir.joinpath(episode_name, f"{episode_name}.metrics.json") |
| if metrics_path.exists(): |
| with metrics_path.open("r", encoding="utf-8") as handle: |
| metrics.append(json.load(handle)) |
| return metrics |
|
|
|
|
| def _write_json(path: Path, payload: Dict[str, object]) -> None: |
| path.parent.mkdir(parents=True, exist_ok=True) |
| with path.open("w", encoding="utf-8") as handle: |
| json.dump(payload, handle, indent=2) |
|
|
|
|
| def _run_episode( |
| dataset_root: Path, |
| episode_dir: Path, |
| output_dir: Path, |
| checkpoint_stride: int, |
| num_workers: int, |
| base_display: int, |
| templates_json: Path, |
| stagger_seconds: float, |
| thread_count: int, |
| log_path: Path, |
| ) -> int: |
| env = os.environ.copy() |
| thread_count_str = str(thread_count) |
| env["OMP_NUM_THREADS"] = thread_count_str |
| env["OPENBLAS_NUM_THREADS"] = thread_count_str |
| env["MKL_NUM_THREADS"] = thread_count_str |
| env["NUMEXPR_NUM_THREADS"] = thread_count_str |
| env["VECLIB_MAXIMUM_THREADS"] = thread_count_str |
| env["BLIS_NUM_THREADS"] = thread_count_str |
| env["MALLOC_ARENA_MAX"] = "2" |
| env["PYTHONUNBUFFERED"] = "1" |
| with log_path.open("w", encoding="utf-8") as log_handle: |
| process = subprocess.Popen( |
| [ |
| sys.executable, |
| str(PROJECT_ROOT.joinpath("scripts", "recompute_oven_episode_parallel.py")), |
| "--dataset-root", |
| str(dataset_root), |
| "--episode-dir", |
| str(episode_dir), |
| "--output-dir", |
| str(output_dir), |
| "--checkpoint-stride", |
| str(checkpoint_stride), |
| "--num-workers", |
| str(num_workers), |
| "--base-display", |
| str(base_display), |
| "--templates-json", |
| str(templates_json), |
| "--stagger-seconds", |
| str(stagger_seconds), |
| ], |
| stdout=log_handle, |
| stderr=subprocess.STDOUT, |
| cwd=str(PROJECT_ROOT), |
| env=env, |
| ) |
| return process.wait() |
|
|
|
|
| def main() -> int: |
| parser = argparse.ArgumentParser() |
| parser.add_argument( |
| "--dataset-root", |
| default="/workspace/data/bimanual_take_tray_out_of_oven_train_128", |
| ) |
| parser.add_argument("--result-dir", required=True) |
| parser.add_argument("--templates-json", required=True) |
| parser.add_argument("--episode-offset", type=int, default=0) |
| parser.add_argument("--max-episodes", type=int, default=100) |
| parser.add_argument("--episode-indices") |
| parser.add_argument("--checkpoint-stride", type=int, default=16) |
| parser.add_argument("--num-workers", type=int, default=24) |
| parser.add_argument("--base-display", type=int, default=900) |
| parser.add_argument("--stagger-seconds", type=float, default=0.15) |
| parser.add_argument("--thread-count", type=int, default=1) |
| parser.add_argument("--max-retries", type=int, default=2) |
| args = parser.parse_args() |
|
|
| dataset_root = Path(args.dataset_root) |
| result_dir = Path(args.result_dir) |
| result_dir.mkdir(parents=True, exist_ok=True) |
| templates_json = Path(args.templates_json) |
| if not templates_json.exists(): |
| raise FileNotFoundError(f"missing templates json: {templates_json}") |
|
|
| all_episode_dirs = _episode_dirs(dataset_root) |
| explicit_episode_indices = None |
| if args.episode_indices: |
| explicit_episode_indices = [ |
| int(chunk.strip()) for chunk in args.episode_indices.split(",") if chunk.strip() |
| ] |
| selected_episode_indices = _select_episode_indices( |
| total_episodes=len(all_episode_dirs), |
| episode_offset=args.episode_offset, |
| max_episodes=args.max_episodes, |
| episode_indices=explicit_episode_indices, |
| ) |
| selected_episode_names = [f"episode{index}" for index in selected_episode_indices] |
|
|
| manifest = { |
| "dataset_root": str(dataset_root.resolve()), |
| "result_dir": str(result_dir.resolve()), |
| "templates_json": str(templates_json.resolve()), |
| "episode_indices": selected_episode_indices, |
| "checkpoint_stride": args.checkpoint_stride, |
| "num_workers": args.num_workers, |
| "base_display": args.base_display, |
| "stagger_seconds": args.stagger_seconds, |
| "thread_count": args.thread_count, |
| "max_retries": args.max_retries, |
| "started_at_epoch": time.time(), |
| } |
| _write_json(result_dir.joinpath("run_manifest.json"), manifest) |
|
|
| progress_path = result_dir.joinpath("progress.json") |
| logs_dir = result_dir.joinpath("logs") |
| logs_dir.mkdir(parents=True, exist_ok=True) |
|
|
| completed: List[int] = [] |
| failed: List[Dict[str, object]] = [] |
|
|
| for episode_index in selected_episode_indices: |
| episode_name = f"episode{episode_index}" |
| episode_dir = all_episode_dirs[episode_index] |
| final_output_dir = result_dir.joinpath(episode_name) |
| if _is_complete_episode_dir(final_output_dir, episode_name): |
| completed.append(episode_index) |
| _write_json( |
| progress_path, |
| { |
| "current_episode": None, |
| "completed_episode_indices": completed, |
| "failed": failed, |
| "total_selected": len(selected_episode_indices), |
| "updated_at_epoch": time.time(), |
| }, |
| ) |
| continue |
|
|
| attempt_success = False |
| current_failure: Optional[Dict[str, object]] = None |
| for attempt_index in range(1, args.max_retries + 2): |
| temp_output_dir = result_dir.joinpath(f".{episode_name}.tmp") |
| if temp_output_dir.exists(): |
| shutil.rmtree(temp_output_dir) |
| log_path = logs_dir.joinpath(f"{episode_name}.attempt{attempt_index:02d}.log") |
| _write_json( |
| progress_path, |
| { |
| "current_episode": episode_name, |
| "current_attempt": attempt_index, |
| "completed_episode_indices": completed, |
| "failed": failed, |
| "total_selected": len(selected_episode_indices), |
| "updated_at_epoch": time.time(), |
| }, |
| ) |
| return_code = _run_episode( |
| dataset_root=dataset_root, |
| episode_dir=episode_dir, |
| output_dir=temp_output_dir, |
| checkpoint_stride=args.checkpoint_stride, |
| num_workers=args.num_workers, |
| base_display=args.base_display, |
| templates_json=templates_json, |
| stagger_seconds=args.stagger_seconds, |
| thread_count=args.thread_count, |
| log_path=log_path, |
| ) |
| if return_code == 0 and _is_complete_episode_dir(temp_output_dir, episode_name): |
| if final_output_dir.exists(): |
| shutil.rmtree(final_output_dir) |
| temp_output_dir.rename(final_output_dir) |
| completed.append(episode_index) |
| attempt_success = True |
| current_failure = None |
| break |
|
|
| current_failure = { |
| "episode_index": episode_index, |
| "episode_name": episode_name, |
| "attempt": attempt_index, |
| "return_code": return_code, |
| "log_path": str(log_path), |
| "updated_at_epoch": time.time(), |
| } |
| if temp_output_dir.exists(): |
| failed_dir = result_dir.joinpath("failed_attempts", f"{episode_name}.attempt{attempt_index:02d}") |
| failed_dir.parent.mkdir(parents=True, exist_ok=True) |
| if failed_dir.exists(): |
| shutil.rmtree(failed_dir) |
| temp_output_dir.rename(failed_dir) |
|
|
| if not attempt_success: |
| failed.append(current_failure or {"episode_index": episode_index, "episode_name": episode_name}) |
| _write_json( |
| progress_path, |
| { |
| "current_episode": None, |
| "completed_episode_indices": completed, |
| "failed": failed, |
| "total_selected": len(selected_episode_indices), |
| "updated_at_epoch": time.time(), |
| }, |
| ) |
| raise RuntimeError(f"failed to produce complete result for {episode_name}") |
|
|
| metrics = _load_metrics(result_dir, selected_episode_names) |
| if metrics: |
| _write_json(result_dir.joinpath("summary.json"), _aggregate_summary(metrics)) |
| _write_json( |
| progress_path, |
| { |
| "current_episode": None, |
| "completed_episode_indices": completed, |
| "failed": failed, |
| "total_selected": len(selected_episode_indices), |
| "updated_at_epoch": time.time(), |
| }, |
| ) |
|
|
| metrics = _load_metrics(result_dir, selected_episode_names) |
| if metrics: |
| _write_json(result_dir.joinpath("summary.json"), _aggregate_summary(metrics)) |
| _write_json( |
| progress_path, |
| { |
| "current_episode": None, |
| "completed_episode_indices": completed, |
| "failed": failed, |
| "total_selected": len(selected_episode_indices), |
| "finished_at_epoch": time.time(), |
| }, |
| ) |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|