VLAdaptorBench / code /scripts /launch_oven_fuller_logging_batch.py
lsnu's picture
Add files using upload-large-folder tool
09fbe32 verified
from pathlib import Path
import argparse
import json
import os
import shutil
import subprocess
import sys
import time
from typing import Dict, List, Optional, Sequence
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
def _configure_thread_env() -> None:
defaults = {
"OMP_NUM_THREADS": "1",
"OPENBLAS_NUM_THREADS": "1",
"MKL_NUM_THREADS": "1",
"NUMEXPR_NUM_THREADS": "1",
"VECLIB_MAXIMUM_THREADS": "1",
"BLIS_NUM_THREADS": "1",
"MALLOC_ARENA_MAX": "2",
}
for key, value in defaults.items():
os.environ.setdefault(key, value)
def _configure_coppeliasim_env() -> None:
coppeliasim_root = os.environ.setdefault("COPPELIASIM_ROOT", "/workspace/coppelia_sim")
ld_library_path_parts = [
part for part in os.environ.get("LD_LIBRARY_PATH", "").split(":") if part
]
if coppeliasim_root not in ld_library_path_parts:
ld_library_path_parts.insert(0, coppeliasim_root)
os.environ["LD_LIBRARY_PATH"] = ":".join(ld_library_path_parts)
_configure_thread_env()
_configure_coppeliasim_env()
from rr_label_study.oven_study import _aggregate_summary, _episode_dirs
def _select_episode_indices(
total_episodes: int,
episode_offset: int,
max_episodes: Optional[int],
episode_indices: Optional[Sequence[int]],
) -> List[int]:
if episode_indices is not None:
selected: List[int] = []
seen = set()
for raw_index in episode_indices:
episode_index = int(raw_index)
if not (0 <= episode_index < total_episodes):
raise ValueError(
f"episode index {episode_index} outside available range 0..{total_episodes - 1}"
)
if episode_index in seen:
continue
selected.append(episode_index)
seen.add(episode_index)
return selected
remaining = max(0, total_episodes - episode_offset)
if max_episodes is not None:
remaining = min(remaining, max_episodes)
if remaining <= 0:
return []
return list(range(episode_offset, episode_offset + remaining))
def _is_complete_episode_dir(output_dir: Path, episode_name: str) -> bool:
required = [
output_dir.joinpath(f"{episode_name}.dense.csv"),
output_dir.joinpath(f"{episode_name}.keyframes.csv"),
output_dir.joinpath(f"{episode_name}.debug.jsonl"),
output_dir.joinpath(f"{episode_name}.metrics.json"),
output_dir.joinpath("summary.json"),
output_dir.joinpath("templates.json"),
output_dir.joinpath("templates.pkl"),
]
return all(path.exists() for path in required)
def _load_metrics(result_dir: Path, episode_names: Sequence[str]) -> List[Dict[str, object]]:
metrics: List[Dict[str, object]] = []
for episode_name in episode_names:
metrics_path = result_dir.joinpath(episode_name, f"{episode_name}.metrics.json")
if metrics_path.exists():
with metrics_path.open("r", encoding="utf-8") as handle:
metrics.append(json.load(handle))
return metrics
def _write_json(path: Path, payload: Dict[str, object]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as handle:
json.dump(payload, handle, indent=2)
def _run_episode(
dataset_root: Path,
episode_dir: Path,
output_dir: Path,
checkpoint_stride: int,
num_workers: int,
base_display: int,
templates_json: Path,
stagger_seconds: float,
thread_count: int,
log_path: Path,
) -> int:
env = os.environ.copy()
thread_count_str = str(thread_count)
env["OMP_NUM_THREADS"] = thread_count_str
env["OPENBLAS_NUM_THREADS"] = thread_count_str
env["MKL_NUM_THREADS"] = thread_count_str
env["NUMEXPR_NUM_THREADS"] = thread_count_str
env["VECLIB_MAXIMUM_THREADS"] = thread_count_str
env["BLIS_NUM_THREADS"] = thread_count_str
env["MALLOC_ARENA_MAX"] = "2"
env["PYTHONUNBUFFERED"] = "1"
with log_path.open("w", encoding="utf-8") as log_handle:
process = subprocess.Popen(
[
sys.executable,
str(PROJECT_ROOT.joinpath("scripts", "recompute_oven_episode_parallel.py")),
"--dataset-root",
str(dataset_root),
"--episode-dir",
str(episode_dir),
"--output-dir",
str(output_dir),
"--checkpoint-stride",
str(checkpoint_stride),
"--num-workers",
str(num_workers),
"--base-display",
str(base_display),
"--templates-json",
str(templates_json),
"--stagger-seconds",
str(stagger_seconds),
],
stdout=log_handle,
stderr=subprocess.STDOUT,
cwd=str(PROJECT_ROOT),
env=env,
)
return process.wait()
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument(
"--dataset-root",
default="/workspace/data/bimanual_take_tray_out_of_oven_train_128",
)
parser.add_argument("--result-dir", required=True)
parser.add_argument("--templates-json", required=True)
parser.add_argument("--episode-offset", type=int, default=0)
parser.add_argument("--max-episodes", type=int, default=100)
parser.add_argument("--episode-indices")
parser.add_argument("--checkpoint-stride", type=int, default=16)
parser.add_argument("--num-workers", type=int, default=24)
parser.add_argument("--base-display", type=int, default=900)
parser.add_argument("--stagger-seconds", type=float, default=0.15)
parser.add_argument("--thread-count", type=int, default=1)
parser.add_argument("--max-retries", type=int, default=2)
args = parser.parse_args()
dataset_root = Path(args.dataset_root)
result_dir = Path(args.result_dir)
result_dir.mkdir(parents=True, exist_ok=True)
templates_json = Path(args.templates_json)
if not templates_json.exists():
raise FileNotFoundError(f"missing templates json: {templates_json}")
all_episode_dirs = _episode_dirs(dataset_root)
explicit_episode_indices = None
if args.episode_indices:
explicit_episode_indices = [
int(chunk.strip()) for chunk in args.episode_indices.split(",") if chunk.strip()
]
selected_episode_indices = _select_episode_indices(
total_episodes=len(all_episode_dirs),
episode_offset=args.episode_offset,
max_episodes=args.max_episodes,
episode_indices=explicit_episode_indices,
)
selected_episode_names = [f"episode{index}" for index in selected_episode_indices]
manifest = {
"dataset_root": str(dataset_root.resolve()),
"result_dir": str(result_dir.resolve()),
"templates_json": str(templates_json.resolve()),
"episode_indices": selected_episode_indices,
"checkpoint_stride": args.checkpoint_stride,
"num_workers": args.num_workers,
"base_display": args.base_display,
"stagger_seconds": args.stagger_seconds,
"thread_count": args.thread_count,
"max_retries": args.max_retries,
"started_at_epoch": time.time(),
}
_write_json(result_dir.joinpath("run_manifest.json"), manifest)
progress_path = result_dir.joinpath("progress.json")
logs_dir = result_dir.joinpath("logs")
logs_dir.mkdir(parents=True, exist_ok=True)
completed: List[int] = []
failed: List[Dict[str, object]] = []
for episode_index in selected_episode_indices:
episode_name = f"episode{episode_index}"
episode_dir = all_episode_dirs[episode_index]
final_output_dir = result_dir.joinpath(episode_name)
if _is_complete_episode_dir(final_output_dir, episode_name):
completed.append(episode_index)
_write_json(
progress_path,
{
"current_episode": None,
"completed_episode_indices": completed,
"failed": failed,
"total_selected": len(selected_episode_indices),
"updated_at_epoch": time.time(),
},
)
continue
attempt_success = False
current_failure: Optional[Dict[str, object]] = None
for attempt_index in range(1, args.max_retries + 2):
temp_output_dir = result_dir.joinpath(f".{episode_name}.tmp")
if temp_output_dir.exists():
shutil.rmtree(temp_output_dir)
log_path = logs_dir.joinpath(f"{episode_name}.attempt{attempt_index:02d}.log")
_write_json(
progress_path,
{
"current_episode": episode_name,
"current_attempt": attempt_index,
"completed_episode_indices": completed,
"failed": failed,
"total_selected": len(selected_episode_indices),
"updated_at_epoch": time.time(),
},
)
return_code = _run_episode(
dataset_root=dataset_root,
episode_dir=episode_dir,
output_dir=temp_output_dir,
checkpoint_stride=args.checkpoint_stride,
num_workers=args.num_workers,
base_display=args.base_display,
templates_json=templates_json,
stagger_seconds=args.stagger_seconds,
thread_count=args.thread_count,
log_path=log_path,
)
if return_code == 0 and _is_complete_episode_dir(temp_output_dir, episode_name):
if final_output_dir.exists():
shutil.rmtree(final_output_dir)
temp_output_dir.rename(final_output_dir)
completed.append(episode_index)
attempt_success = True
current_failure = None
break
current_failure = {
"episode_index": episode_index,
"episode_name": episode_name,
"attempt": attempt_index,
"return_code": return_code,
"log_path": str(log_path),
"updated_at_epoch": time.time(),
}
if temp_output_dir.exists():
failed_dir = result_dir.joinpath("failed_attempts", f"{episode_name}.attempt{attempt_index:02d}")
failed_dir.parent.mkdir(parents=True, exist_ok=True)
if failed_dir.exists():
shutil.rmtree(failed_dir)
temp_output_dir.rename(failed_dir)
if not attempt_success:
failed.append(current_failure or {"episode_index": episode_index, "episode_name": episode_name})
_write_json(
progress_path,
{
"current_episode": None,
"completed_episode_indices": completed,
"failed": failed,
"total_selected": len(selected_episode_indices),
"updated_at_epoch": time.time(),
},
)
raise RuntimeError(f"failed to produce complete result for {episode_name}")
metrics = _load_metrics(result_dir, selected_episode_names)
if metrics:
_write_json(result_dir.joinpath("summary.json"), _aggregate_summary(metrics))
_write_json(
progress_path,
{
"current_episode": None,
"completed_episode_indices": completed,
"failed": failed,
"total_selected": len(selected_episode_indices),
"updated_at_epoch": time.time(),
},
)
metrics = _load_metrics(result_dir, selected_episode_names)
if metrics:
_write_json(result_dir.joinpath("summary.json"), _aggregate_summary(metrics))
_write_json(
progress_path,
{
"current_episode": None,
"completed_episode_indices": completed,
"failed": failed,
"total_selected": len(selected_episode_indices),
"finished_at_epoch": time.time(),
},
)
return 0
if __name__ == "__main__":
raise SystemExit(main())