lsnu commited on
Commit
1bb6705
·
verified ·
1 Parent(s): ed4f6d2

Add missing fuller logging launcher

Browse files
code/scripts/launch_oven_fuller_logging_batch.py ADDED
@@ -0,0 +1,336 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from pathlib import Path
2
+ import argparse
3
+ import json
4
+ import os
5
+ import shutil
6
+ import subprocess
7
+ import sys
8
+ import time
9
+ from typing import Dict, List, Optional, Sequence
10
+
11
+
12
+ PROJECT_ROOT = Path(__file__).resolve().parents[1]
13
+ if str(PROJECT_ROOT) not in sys.path:
14
+ sys.path.insert(0, str(PROJECT_ROOT))
15
+
16
+
17
+ def _configure_thread_env() -> None:
18
+ defaults = {
19
+ "OMP_NUM_THREADS": "1",
20
+ "OPENBLAS_NUM_THREADS": "1",
21
+ "MKL_NUM_THREADS": "1",
22
+ "NUMEXPR_NUM_THREADS": "1",
23
+ "VECLIB_MAXIMUM_THREADS": "1",
24
+ "BLIS_NUM_THREADS": "1",
25
+ "MALLOC_ARENA_MAX": "2",
26
+ }
27
+ for key, value in defaults.items():
28
+ os.environ.setdefault(key, value)
29
+
30
+
31
+ def _configure_coppeliasim_env() -> None:
32
+ coppeliasim_root = os.environ.setdefault("COPPELIASIM_ROOT", "/workspace/coppelia_sim")
33
+ ld_library_path_parts = [
34
+ part for part in os.environ.get("LD_LIBRARY_PATH", "").split(":") if part
35
+ ]
36
+ if coppeliasim_root not in ld_library_path_parts:
37
+ ld_library_path_parts.insert(0, coppeliasim_root)
38
+ os.environ["LD_LIBRARY_PATH"] = ":".join(ld_library_path_parts)
39
+
40
+
41
+ _configure_thread_env()
42
+ _configure_coppeliasim_env()
43
+
44
+ from rr_label_study.oven_study import _aggregate_summary, _episode_dirs
45
+
46
+
47
+ def _select_episode_indices(
48
+ total_episodes: int,
49
+ episode_offset: int,
50
+ max_episodes: Optional[int],
51
+ episode_indices: Optional[Sequence[int]],
52
+ ) -> List[int]:
53
+ if episode_indices is not None:
54
+ selected: List[int] = []
55
+ seen = set()
56
+ for raw_index in episode_indices:
57
+ episode_index = int(raw_index)
58
+ if not (0 <= episode_index < total_episodes):
59
+ raise ValueError(
60
+ f"episode index {episode_index} outside available range 0..{total_episodes - 1}"
61
+ )
62
+ if episode_index in seen:
63
+ continue
64
+ selected.append(episode_index)
65
+ seen.add(episode_index)
66
+ return selected
67
+
68
+ remaining = max(0, total_episodes - episode_offset)
69
+ if max_episodes is not None:
70
+ remaining = min(remaining, max_episodes)
71
+ if remaining <= 0:
72
+ return []
73
+ return list(range(episode_offset, episode_offset + remaining))
74
+
75
+
76
+ def _is_complete_episode_dir(output_dir: Path, episode_name: str) -> bool:
77
+ required = [
78
+ output_dir.joinpath(f"{episode_name}.dense.csv"),
79
+ output_dir.joinpath(f"{episode_name}.keyframes.csv"),
80
+ output_dir.joinpath(f"{episode_name}.debug.jsonl"),
81
+ output_dir.joinpath(f"{episode_name}.metrics.json"),
82
+ output_dir.joinpath("summary.json"),
83
+ output_dir.joinpath("templates.json"),
84
+ output_dir.joinpath("templates.pkl"),
85
+ ]
86
+ return all(path.exists() for path in required)
87
+
88
+
89
+ def _load_metrics(result_dir: Path, episode_names: Sequence[str]) -> List[Dict[str, object]]:
90
+ metrics: List[Dict[str, object]] = []
91
+ for episode_name in episode_names:
92
+ metrics_path = result_dir.joinpath(episode_name, f"{episode_name}.metrics.json")
93
+ if metrics_path.exists():
94
+ with metrics_path.open("r", encoding="utf-8") as handle:
95
+ metrics.append(json.load(handle))
96
+ return metrics
97
+
98
+
99
+ def _write_json(path: Path, payload: Dict[str, object]) -> None:
100
+ path.parent.mkdir(parents=True, exist_ok=True)
101
+ with path.open("w", encoding="utf-8") as handle:
102
+ json.dump(payload, handle, indent=2)
103
+
104
+
105
+ def _run_episode(
106
+ dataset_root: Path,
107
+ episode_dir: Path,
108
+ output_dir: Path,
109
+ checkpoint_stride: int,
110
+ num_workers: int,
111
+ base_display: int,
112
+ templates_json: Path,
113
+ stagger_seconds: float,
114
+ thread_count: int,
115
+ log_path: Path,
116
+ ) -> int:
117
+ env = os.environ.copy()
118
+ thread_count_str = str(thread_count)
119
+ env["OMP_NUM_THREADS"] = thread_count_str
120
+ env["OPENBLAS_NUM_THREADS"] = thread_count_str
121
+ env["MKL_NUM_THREADS"] = thread_count_str
122
+ env["NUMEXPR_NUM_THREADS"] = thread_count_str
123
+ env["VECLIB_MAXIMUM_THREADS"] = thread_count_str
124
+ env["BLIS_NUM_THREADS"] = thread_count_str
125
+ env["MALLOC_ARENA_MAX"] = "2"
126
+ env["PYTHONUNBUFFERED"] = "1"
127
+ with log_path.open("w", encoding="utf-8") as log_handle:
128
+ process = subprocess.Popen(
129
+ [
130
+ sys.executable,
131
+ str(PROJECT_ROOT.joinpath("scripts", "recompute_oven_episode_parallel.py")),
132
+ "--dataset-root",
133
+ str(dataset_root),
134
+ "--episode-dir",
135
+ str(episode_dir),
136
+ "--output-dir",
137
+ str(output_dir),
138
+ "--checkpoint-stride",
139
+ str(checkpoint_stride),
140
+ "--num-workers",
141
+ str(num_workers),
142
+ "--base-display",
143
+ str(base_display),
144
+ "--templates-json",
145
+ str(templates_json),
146
+ "--stagger-seconds",
147
+ str(stagger_seconds),
148
+ ],
149
+ stdout=log_handle,
150
+ stderr=subprocess.STDOUT,
151
+ cwd=str(PROJECT_ROOT),
152
+ env=env,
153
+ )
154
+ return process.wait()
155
+
156
+
157
+ def main() -> int:
158
+ parser = argparse.ArgumentParser()
159
+ parser.add_argument(
160
+ "--dataset-root",
161
+ default="/workspace/data/bimanual_take_tray_out_of_oven_train_128",
162
+ )
163
+ parser.add_argument("--result-dir", required=True)
164
+ parser.add_argument("--templates-json", required=True)
165
+ parser.add_argument("--episode-offset", type=int, default=0)
166
+ parser.add_argument("--max-episodes", type=int, default=100)
167
+ parser.add_argument("--episode-indices")
168
+ parser.add_argument("--checkpoint-stride", type=int, default=16)
169
+ parser.add_argument("--num-workers", type=int, default=24)
170
+ parser.add_argument("--base-display", type=int, default=900)
171
+ parser.add_argument("--stagger-seconds", type=float, default=0.15)
172
+ parser.add_argument("--thread-count", type=int, default=1)
173
+ parser.add_argument("--max-retries", type=int, default=2)
174
+ args = parser.parse_args()
175
+
176
+ dataset_root = Path(args.dataset_root)
177
+ result_dir = Path(args.result_dir)
178
+ result_dir.mkdir(parents=True, exist_ok=True)
179
+ templates_json = Path(args.templates_json)
180
+ if not templates_json.exists():
181
+ raise FileNotFoundError(f"missing templates json: {templates_json}")
182
+
183
+ all_episode_dirs = _episode_dirs(dataset_root)
184
+ explicit_episode_indices = None
185
+ if args.episode_indices:
186
+ explicit_episode_indices = [
187
+ int(chunk.strip()) for chunk in args.episode_indices.split(",") if chunk.strip()
188
+ ]
189
+ selected_episode_indices = _select_episode_indices(
190
+ total_episodes=len(all_episode_dirs),
191
+ episode_offset=args.episode_offset,
192
+ max_episodes=args.max_episodes,
193
+ episode_indices=explicit_episode_indices,
194
+ )
195
+ selected_episode_names = [f"episode{index}" for index in selected_episode_indices]
196
+
197
+ manifest = {
198
+ "dataset_root": str(dataset_root.resolve()),
199
+ "result_dir": str(result_dir.resolve()),
200
+ "templates_json": str(templates_json.resolve()),
201
+ "episode_indices": selected_episode_indices,
202
+ "checkpoint_stride": args.checkpoint_stride,
203
+ "num_workers": args.num_workers,
204
+ "base_display": args.base_display,
205
+ "stagger_seconds": args.stagger_seconds,
206
+ "thread_count": args.thread_count,
207
+ "max_retries": args.max_retries,
208
+ "started_at_epoch": time.time(),
209
+ }
210
+ _write_json(result_dir.joinpath("run_manifest.json"), manifest)
211
+
212
+ progress_path = result_dir.joinpath("progress.json")
213
+ logs_dir = result_dir.joinpath("logs")
214
+ logs_dir.mkdir(parents=True, exist_ok=True)
215
+
216
+ completed: List[int] = []
217
+ failed: List[Dict[str, object]] = []
218
+
219
+ for episode_index in selected_episode_indices:
220
+ episode_name = f"episode{episode_index}"
221
+ episode_dir = all_episode_dirs[episode_index]
222
+ final_output_dir = result_dir.joinpath(episode_name)
223
+ if _is_complete_episode_dir(final_output_dir, episode_name):
224
+ completed.append(episode_index)
225
+ _write_json(
226
+ progress_path,
227
+ {
228
+ "current_episode": None,
229
+ "completed_episode_indices": completed,
230
+ "failed": failed,
231
+ "total_selected": len(selected_episode_indices),
232
+ "updated_at_epoch": time.time(),
233
+ },
234
+ )
235
+ continue
236
+
237
+ attempt_success = False
238
+ current_failure: Optional[Dict[str, object]] = None
239
+ for attempt_index in range(1, args.max_retries + 2):
240
+ temp_output_dir = result_dir.joinpath(f".{episode_name}.tmp")
241
+ if temp_output_dir.exists():
242
+ shutil.rmtree(temp_output_dir)
243
+ log_path = logs_dir.joinpath(f"{episode_name}.attempt{attempt_index:02d}.log")
244
+ _write_json(
245
+ progress_path,
246
+ {
247
+ "current_episode": episode_name,
248
+ "current_attempt": attempt_index,
249
+ "completed_episode_indices": completed,
250
+ "failed": failed,
251
+ "total_selected": len(selected_episode_indices),
252
+ "updated_at_epoch": time.time(),
253
+ },
254
+ )
255
+ return_code = _run_episode(
256
+ dataset_root=dataset_root,
257
+ episode_dir=episode_dir,
258
+ output_dir=temp_output_dir,
259
+ checkpoint_stride=args.checkpoint_stride,
260
+ num_workers=args.num_workers,
261
+ base_display=args.base_display,
262
+ templates_json=templates_json,
263
+ stagger_seconds=args.stagger_seconds,
264
+ thread_count=args.thread_count,
265
+ log_path=log_path,
266
+ )
267
+ if return_code == 0 and _is_complete_episode_dir(temp_output_dir, episode_name):
268
+ if final_output_dir.exists():
269
+ shutil.rmtree(final_output_dir)
270
+ temp_output_dir.rename(final_output_dir)
271
+ completed.append(episode_index)
272
+ attempt_success = True
273
+ current_failure = None
274
+ break
275
+
276
+ current_failure = {
277
+ "episode_index": episode_index,
278
+ "episode_name": episode_name,
279
+ "attempt": attempt_index,
280
+ "return_code": return_code,
281
+ "log_path": str(log_path),
282
+ "updated_at_epoch": time.time(),
283
+ }
284
+ if temp_output_dir.exists():
285
+ failed_dir = result_dir.joinpath("failed_attempts", f"{episode_name}.attempt{attempt_index:02d}")
286
+ failed_dir.parent.mkdir(parents=True, exist_ok=True)
287
+ if failed_dir.exists():
288
+ shutil.rmtree(failed_dir)
289
+ temp_output_dir.rename(failed_dir)
290
+
291
+ if not attempt_success:
292
+ failed.append(current_failure or {"episode_index": episode_index, "episode_name": episode_name})
293
+ _write_json(
294
+ progress_path,
295
+ {
296
+ "current_episode": None,
297
+ "completed_episode_indices": completed,
298
+ "failed": failed,
299
+ "total_selected": len(selected_episode_indices),
300
+ "updated_at_epoch": time.time(),
301
+ },
302
+ )
303
+ raise RuntimeError(f"failed to produce complete result for {episode_name}")
304
+
305
+ metrics = _load_metrics(result_dir, selected_episode_names)
306
+ if metrics:
307
+ _write_json(result_dir.joinpath("summary.json"), _aggregate_summary(metrics))
308
+ _write_json(
309
+ progress_path,
310
+ {
311
+ "current_episode": None,
312
+ "completed_episode_indices": completed,
313
+ "failed": failed,
314
+ "total_selected": len(selected_episode_indices),
315
+ "updated_at_epoch": time.time(),
316
+ },
317
+ )
318
+
319
+ metrics = _load_metrics(result_dir, selected_episode_names)
320
+ if metrics:
321
+ _write_json(result_dir.joinpath("summary.json"), _aggregate_summary(metrics))
322
+ _write_json(
323
+ progress_path,
324
+ {
325
+ "current_episode": None,
326
+ "completed_episode_indices": completed,
327
+ "failed": failed,
328
+ "total_selected": len(selected_episode_indices),
329
+ "finished_at_epoch": time.time(),
330
+ },
331
+ )
332
+ return 0
333
+
334
+
335
+ if __name__ == "__main__":
336
+ raise SystemExit(main())