GR00T / scripts /validate_hf_config_alignment.py
yqi19's picture
add: source files (batch 4)
b88b79e verified
# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Validate HuggingFace config alignment against source-of-truth definitions.
Usage:
# Internal consistency checks only (no HF download required):
uv run python scripts/validate_hf_config_alignment.py
# Full check with HF configs (requires auth + local dirs):
uv run python scripts/validate_hf_config_alignment.py --hf-config-dir /tmp/hf_configs
"""
import argparse
import json
import math
from pathlib import Path
import re
import sys
REPO_ROOT = Path(__file__).resolve().parent.parent
PASS = "\033[92mβœ“ PASS\033[0m"
FAIL = "\033[91mβœ— FAIL\033[0m"
WARN = "\033[93m⚠ WARN\033[0m"
INFO = "\033[94mβ„Ή INFO\033[0m"
SKIP = "\033[90m⊘ SKIP\033[0m"
pass_count = 0
fail_count = 0
warn_count = 0
skip_count = 0
def check(condition, msg, *, warn_only=False, skip=False):
global pass_count, fail_count, warn_count, skip_count
if skip:
skip_count += 1
print(f" {SKIP} {msg}")
return True
if condition:
pass_count += 1
print(f" {PASS} {msg}")
return True
if warn_only:
warn_count += 1
print(f" {WARN} {msg}")
return True
fail_count += 1
print(f" {FAIL} {msg}")
return False
def info(msg):
print(f" {INFO} {msg}")
# ──────────────────────── Source-of-Truth Loaders ────────────────────────
def load_modality_configs():
"""Load MODALITY_CONFIGS from embodiment_configs.py as serializable dicts."""
sys.path.insert(0, str(REPO_ROOT))
from gr00t.configs.data.embodiment_configs import MODALITY_CONFIGS
from gr00t.data.utils import to_json_serializable
raw = to_json_serializable(MODALITY_CONFIGS)
return raw
def load_model_config_defaults():
"""Load Gr00tN1d7Config defaults."""
sys.path.insert(0, str(REPO_ROOT))
from gr00t.configs.model.gr00t_n1d7 import Gr00tN1d7Config
cfg = Gr00tN1d7Config()
return cfg
def load_embodiment_tags():
sys.path.insert(0, str(REPO_ROOT))
from gr00t.data.embodiment_tags import POSTTRAIN_TAGS, PRETRAIN_TAGS, EmbodimentTag
return EmbodimentTag, PRETRAIN_TAGS, POSTTRAIN_TAGS
def load_projector_index():
sys.path.insert(0, str(REPO_ROOT))
from gr00t.model.gr00t_n1d7.processing_gr00t_n1d7 import EMBODIMENT_TAG_TO_PROJECTOR_INDEX
return EMBODIMENT_TAG_TO_PROJECTOR_INDEX
# ──────────────────────── HF Model Definitions ────────────────────────
HF_MODELS = {
"GR00T-N1.7-3B": {
"hf_id": "nvidia/GR00T-N1.7-3B",
"type": "base",
"embodiment_tags": [
"oxe_droid_relative_eef_relative_joint",
"xdof_relative_eef_relative_joint",
"xdof_relative_eef_relative_joint_subtask",
"real_g1_relative_eef_relative_joints",
"real_r1_pro_sharpa_relative_eef",
"real_r1_pro_sharpa_relative_eef_human",
"real_r1_pro_sharpa_relative_eef_maxinsights",
"real_r1_pro_sharpa_relative_eef_mecka",
],
"subdir": None,
},
"GR00T-N1.7-DROID": {
"hf_id": "nvidia/GR00T-N1.7-DROID",
"type": "finetuned",
"embodiment_tags": ["oxe_droid_relative_eef_relative_joint"],
"subdir": None,
},
"GR00T-N1.7-LIBERO": {
"hf_id": "nvidia/GR00T-N1.7-LIBERO",
"type": "finetuned",
"embodiment_tags": ["libero_sim"],
"subdir": "libero_10",
},
"SimplerEnv-Fractal": {
"hf_id": "nvidia/GR00T-N1.7-SimplerEnv-Fractal",
"type": "finetuned",
"embodiment_tags": ["simpler_env_google"],
"subdir": None,
},
"SimplerEnv-Bridge": {
"hf_id": "nvidia/GR00T-N1.7-SimplerEnv-Bridge",
"type": "finetuned",
"embodiment_tags": ["simpler_env_widowx"],
"subdir": None,
},
}
# ──────────────────────── Dimension F & Internal Consistency ────────────────────────
def check_dim_f_internal_consistency():
"""Dimension F β€” Cross-file consistency (source-of-truth only)."""
print("\n" + "=" * 70)
print("DIMENSION F β€” Internal Source-of-Truth Consistency")
print("=" * 70)
modality_configs = load_modality_configs()
model_cfg = load_model_config_defaults()
EmbodimentTag, PRETRAIN_TAGS, POSTTRAIN_TAGS = load_embodiment_tags()
projector_index = load_projector_index()
# F3: action horizon ≀ model max
print("\n[F3] Action horizon ≀ model max capacity")
for tag, cfg in modality_configs.items():
actual_horizon = len(cfg["action"]["delta_indices"])
check(
actual_horizon <= model_cfg.action_horizon,
f" {tag}: actual={actual_horizon} ≀ max={model_cfg.action_horizon}",
)
# F5: EMBODIMENT_TAG_TO_PROJECTOR_INDEX ↔ EmbodimentTag
print("\n[F5] EMBODIMENT_TAG_TO_PROJECTOR_INDEX ↔ EmbodimentTag enum")
for member in EmbodimentTag:
if member.value in modality_configs:
check(
member.value in projector_index,
f" {member.value} in MODALITY_CONFIGS β†’ has projector index: {projector_index.get(member.value, 'MISSING')}",
)
all_tag_values = {m.value for m in EmbodimentTag}
for tag in projector_index:
check(
tag in all_tag_values,
f" projector index key '{tag}' β†’ is valid EmbodimentTag value",
)
# F6: naming mismatch awareness
print("\n[F6] Known naming mismatches (informational)")
info(f"Model config: action_horizon={model_cfg.action_horizon}")
info("Processor uses: max_action_horizon (same value, different key name)")
info(f"Model config: use_albumentations_transforms={model_cfg.use_albumentations_transforms}")
info("Processor uses: use_albumentations (same semantics, different key name)")
def check_dim_e_documentation():
"""Dimension E β€” README & Documentation Consistency."""
print("\n" + "=" * 70)
print("DIMENSION E β€” README & Documentation Consistency")
print("=" * 70)
EmbodimentTag, PRETRAIN_TAGS, POSTTRAIN_TAGS = load_embodiment_tags()
modality_configs = load_modality_configs()
# E1: Checkpoint table in README.md
print("\n[E1] Checkpoint table in README.md")
readme = (REPO_ROOT / "README.md").read_text()
for model_name, model_info in HF_MODELS.items():
check(
model_info["hf_id"] in readme,
f" {model_info['hf_id']} found in README.md",
)
# E2: --embodiment-tag in example commands uses enum NAMES
print("\n[E2] --embodiment-tag uses enum NAMES in example commands")
example_readmes = {
"DROID": REPO_ROOT / "examples/DROID/README.md",
"LIBERO": REPO_ROOT / "examples/LIBERO/README.md",
"SimplerEnv": REPO_ROOT / "examples/SimplerEnv/README.md",
}
tag_name_to_value = {m.name: m.value for m in EmbodimentTag}
for name, path in example_readmes.items():
if not path.exists():
check(False, f" {path} exists", skip=True)
continue
content = path.read_text()
tags_in_commands = re.findall(r"--embodiment-tag\s+(\S+)", content)
for tag in tags_in_commands:
is_enum_name = tag in tag_name_to_value
is_enum_value = tag in {m.value for m in EmbodimentTag}
check(
is_enum_name,
f" {name}: --embodiment-tag {tag} is valid enum NAME"
+ (" (used value instead of name)" if is_enum_value and not is_enum_name else ""),
)
# E4: DROID modality table
print("\n[E4] DROID modality table matches MODALITY_CONFIGS")
droid_readme = (REPO_ROOT / "examples/DROID/README.md").read_text()
droid_cfg = modality_configs.get("oxe_droid_relative_eef_relative_joint", {})
if droid_cfg:
for vkey in droid_cfg["video"]["modality_keys"]:
check(vkey in droid_readme, f" Video key '{vkey}' mentioned in DROID README")
for skey in droid_cfg["state"]["modality_keys"]:
check(skey in droid_readme, f" State key '{skey}' mentioned in DROID README")
check(
"17D" in droid_readme or "17d" in droid_readme.lower(),
" 17D dimension mentioned in DROID README",
warn_only=True,
)
# E5: Example modality.json files match MODALITY_CONFIGS
print("\n[E5] Example modality.json ↔ MODALITY_CONFIGS key consistency")
modality_json_map = {
"simpler_env_google": REPO_ROOT / "examples/SimplerEnv/fractal_modality.json",
"simpler_env_widowx": REPO_ROOT / "examples/SimplerEnv/bridge_modality.json",
"libero_sim": REPO_ROOT / "examples/LIBERO/modality.json",
}
for tag, json_path in modality_json_map.items():
if not json_path.exists():
check(False, f" {json_path} exists", skip=True)
continue
with open(json_path) as f:
mj = json.load(f)
code_cfg = modality_configs.get(tag, {})
if not code_cfg:
check(False, f" {tag} in MODALITY_CONFIGS")
continue
mj_state_keys = list(mj.get("state", {}).keys())
code_state_keys = code_cfg["state"]["modality_keys"]
check(
mj_state_keys == code_state_keys,
f" {tag} state keys: modality.json={mj_state_keys} vs code={code_state_keys}",
)
mj_action_keys = list(mj.get("action", {}).keys())
code_action_keys = code_cfg["action"]["modality_keys"]
check(
mj_action_keys == code_action_keys,
f" {tag} action keys: modality.json={mj_action_keys} vs code={code_action_keys}",
)
mj_video_keys = list(mj.get("video", {}).keys())
code_video_keys = code_cfg["video"]["modality_keys"]
check(
mj_video_keys == code_video_keys,
f" {tag} video keys: modality.json={mj_video_keys} vs code={code_video_keys}",
)
# E7: --action-horizon in commands
print("\n[E7] --action-horizon in commands ≀ embodiment actual horizon")
for name, path in example_readmes.items():
if not path.exists():
continue
content = path.read_text()
horizons = re.findall(r"--action-horizon\s+(\d+)", content)
for h in horizons:
info(f" {name}: --action-horizon {h} found in commands")
def check_dim_f2_modality_json():
"""Dimension F2 β€” MODALITY_CONFIGS ↔ examples/*/modality.json."""
print("\n" + "=" * 70)
print("DIMENSION F2 β€” MODALITY_CONFIGS ↔ modality.json Structural Check")
print("=" * 70)
modality_configs = load_modality_configs()
modality_json_files = {
"simpler_env_google": REPO_ROOT / "examples/SimplerEnv/fractal_modality.json",
"simpler_env_widowx": REPO_ROOT / "examples/SimplerEnv/bridge_modality.json",
"libero_sim": REPO_ROOT / "examples/LIBERO/modality.json",
}
for tag, json_path in modality_json_files.items():
print(f"\n [{tag}]")
if not json_path.exists():
check(False, f" {json_path.name} exists", skip=True)
continue
with open(json_path) as f:
mj = json.load(f)
code_cfg = modality_configs[tag]
code_state_count = len(code_cfg["state"]["modality_keys"])
mj_state_count = len(mj.get("state", {}))
check(
code_state_count == mj_state_count,
f" State key count: code={code_state_count} vs modality.json={mj_state_count}",
)
code_action_count = len(code_cfg["action"]["modality_keys"])
mj_action_count = len(mj.get("action", {}))
check(
code_action_count == mj_action_count,
f" Action key count: code={code_action_count} vs modality.json={mj_action_count}",
)
# ──────────────────────── Dimension J β€” Enum Serialization ────────────────────────
def check_dim_j_enum_serialization():
"""Dimension J β€” Verify enum serialization uses names not values."""
print("\n" + "=" * 70)
print("DIMENSION J β€” Enum Serialization Format (code-level)")
print("=" * 70)
modality_configs = load_modality_configs()
valid_rep_names = {"RELATIVE", "ABSOLUTE"}
valid_type_names = {"EEF", "NON_EEF"}
valid_format_names = {"DEFAULT", "XYZ_ROT6D", "ROTATION_6D", "SCALAR"}
for tag, cfg in modality_configs.items():
action_configs = cfg.get("action", {}).get("action_configs")
if not action_configs:
continue
print(f"\n [{tag}]")
for i, ac in enumerate(action_configs):
rep = ac.get("rep")
atype = ac.get("type")
afmt = ac.get("format")
check(
rep in valid_rep_names,
f" action_configs[{i}].rep = '{rep}' (valid name: {rep in valid_rep_names})",
)
check(
atype in valid_type_names,
f" action_configs[{i}].type = '{atype}' (valid name: {atype in valid_type_names})",
)
if afmt:
check(
afmt in valid_format_names,
f" action_configs[{i}].format = '{afmt}' (valid name: {afmt in valid_format_names})",
)
# ──────────────────────── HF Config Checks (require downloads) ────────────────────────
def load_hf_json(base_dir, model_name, filename, subdir=None):
model_dir = Path(base_dir) / model_name
if subdir:
model_dir = model_dir / subdir
path = model_dir / filename
if not path.exists():
return None
with open(path) as f:
return json.load(f)
def check_dim_a_processor_config(hf_dir, model_name, model_def):
"""Dimension A β€” processor_config.json checks for a single model."""
print(f"\n--- {model_name} ---")
pc = load_hf_json(hf_dir, model_name, "processor_config.json", model_def.get("subdir"))
if pc is None:
check(False, "processor_config.json found", skip=True)
return
modality_configs = load_modality_configs()
# A10: processor_class
check(
pc.get("processor_class") == "Gr00tN1d7Processor",
f"[A10] processor_class = '{pc.get('processor_class')}' (expected 'Gr00tN1d7Processor')",
)
pk = pc.get("processor_kwargs", {})
# A1: modality_configs top-level keys
hf_modality_keys = list(pk.get("modality_configs", {}).keys())
for expected_tag in model_def["embodiment_tags"]:
check(
expected_tag in hf_modality_keys,
f"[A1] modality_configs contains '{expected_tag}'",
)
# Per-tag modality checks
for tag in model_def["embodiment_tags"]:
hf_mc = pk.get("modality_configs", {}).get(tag)
code_mc = modality_configs.get(tag)
if not hf_mc:
check(False, f"[A1] {tag} present in HF modality_configs")
continue
if not code_mc:
info(f" {tag} not in code MODALITY_CONFIGS (pretrain-only tag, expected)")
continue
# A2: video.delta_indices
check(
hf_mc["video"]["delta_indices"] == code_mc["video"]["delta_indices"],
f"[A2] {tag} video.delta_indices: HF={hf_mc['video']['delta_indices']} vs code={code_mc['video']['delta_indices']}",
)
# A3: video.modality_keys count
hf_vkeys = hf_mc["video"]["modality_keys"]
code_vkeys = code_mc["video"]["modality_keys"]
check(
len(hf_vkeys) == len(code_vkeys),
f"[A3] {tag} video key count: HF={len(hf_vkeys)} ({hf_vkeys}) vs code={len(code_vkeys)} ({code_vkeys})",
)
# A4: state.delta_indices
check(
hf_mc["state"]["delta_indices"] == code_mc["state"]["delta_indices"],
f"[A4] {tag} state.delta_indices: HF={hf_mc['state']['delta_indices']} vs code={code_mc['state']['delta_indices']}",
)
# A5: state.modality_keys
check(
hf_mc["state"]["modality_keys"] == code_mc["state"]["modality_keys"],
f"[A5] {tag} state.modality_keys match",
)
# A6: action.delta_indices
check(
hf_mc["action"]["delta_indices"] == code_mc["action"]["delta_indices"],
f"[A6] {tag} action.delta_indices: HF len={len(hf_mc['action']['delta_indices'])} vs code len={len(code_mc['action']['delta_indices'])}",
)
# A7: action.modality_keys
check(
hf_mc["action"]["modality_keys"] == code_mc["action"]["modality_keys"],
f"[A7] {tag} action.modality_keys match",
)
# A8: action.action_configs
hf_ac = hf_mc["action"].get("action_configs")
code_ac = code_mc["action"].get("action_configs")
if code_ac:
check(
hf_ac is not None,
f"[A8] {tag} action.action_configs present in HF",
)
if hf_ac:
check(
len(hf_ac) == len(code_ac),
f"[A8] {tag} action_configs count: HF={len(hf_ac)} vs code={len(code_ac)}",
)
for i, (h, c) in enumerate(zip(hf_ac, code_ac)):
for field in ("rep", "type", "format"):
check(
h.get(field) == c.get(field),
f"[A8] {tag} action_configs[{i}].{field}: HF={h.get(field)} vs code={c.get(field)}",
)
# A9: language.modality_keys
check(
hf_mc["language"]["modality_keys"] == code_mc["language"]["modality_keys"],
f"[A9] {tag} language.modality_keys match",
)
# A11-A31: scalar parameters
scalar_checks = {
"max_state_dim": ("A11", None),
"max_action_dim": ("A12", None),
"max_action_horizon": ("A13", None),
"model_name": ("A14", "nvidia/Cosmos-Reason2-2B"),
"model_type": ("A15", "qwen"),
"use_percentiles": ("A16", None),
"apply_sincos_state_encoding": ("A17", None),
"use_relative_action": ("A18", None),
"formalize_language": ("A19", True),
"clip_outliers": ("A20", True),
"use_mean_std": ("A21", False),
"letter_box_transform": ("A22", None),
"exclude_state": ("A23", None),
"state_dropout_prob": ("A24", None),
"image_crop_size": ("A25", None),
"image_target_size": ("A26", None),
"shortest_image_edge": ("A27", 256),
"crop_fraction": ("A28", 0.95),
"use_albumentations": ("A29", None),
"random_rotation_angle": ("A30", None),
"color_jitter_params": ("A31", None),
}
for field, (item_id, expected) in scalar_checks.items():
actual = pk.get(field)
if expected is not None:
check(
actual == expected,
f"[{item_id}] {field}: HF={actual!r} (expected {expected!r})",
)
else:
info(f"[{item_id}] {field} = {actual!r}")
def check_dim_b_config_json(hf_dir, model_name, model_def):
"""Dimension B β€” config.json checks for a single model."""
print(f"\n--- {model_name} ---")
cfg = load_hf_json(hf_dir, model_name, "config.json", model_def.get("subdir"))
if cfg is None:
check(False, "config.json found", skip=True)
return
model_cfg = load_model_config_defaults()
b_checks = {
"B1": ("model_type", "Gr00tN1d7"),
"B2": ("max_state_dim", None),
"B3": ("max_action_dim", None),
"B4": ("action_horizon", model_cfg.action_horizon),
"B5": ("backbone_embedding_dim", model_cfg.backbone_embedding_dim),
"B6": ("hidden_size", model_cfg.hidden_size),
"B7": ("input_embedding_dim", model_cfg.input_embedding_dim),
"B11": ("num_inference_timesteps", model_cfg.num_inference_timesteps),
"B12": ("max_num_embodiments", model_cfg.max_num_embodiments),
"B13": ("model_name", "nvidia/Cosmos-Reason2-2B"),
"B14": ("select_layer", model_cfg.select_layer),
"B15": ("state_history_length", model_cfg.state_history_length),
"B16": ("noise_beta_alpha", model_cfg.noise_beta_alpha),
"B17": ("noise_beta_beta", model_cfg.noise_beta_beta),
"B18": ("noise_s", model_cfg.noise_s),
"B19": ("num_timestep_buckets", model_cfg.num_timestep_buckets),
"B20": ("add_pos_embed", model_cfg.add_pos_embed),
"B21": ("attn_dropout", model_cfg.attn_dropout),
"B22": ("use_vlln", model_cfg.use_vlln),
"B23": ("max_seq_len", model_cfg.max_seq_len),
"B24": ("use_alternate_vl_dit", model_cfg.use_alternate_vl_dit),
"B25": ("attend_text_every_n_blocks", model_cfg.attend_text_every_n_blocks),
"B27": ("backbone_model_type", model_cfg.backbone_model_type),
"B28": ("reproject_vision", model_cfg.reproject_vision),
"B29": ("use_percentiles", model_cfg.use_percentiles),
"B30": ("use_relative_action", model_cfg.use_relative_action),
}
for item_id, (field, expected) in b_checks.items():
actual = cfg.get(field)
if expected is not None:
check(
actual == expected,
f"[{item_id}] {field}: HF={actual!r} (expected {expected!r})",
)
else:
info(f"[{item_id}] {field} = {actual!r}")
# B8-B10: diffusion_model_cfg nested
diff_cfg = cfg.get("diffusion_model_cfg", {})
check(
diff_cfg.get("num_layers") == 16,
f"[B8] diffusion_model_cfg.num_layers: {diff_cfg.get('num_layers')} (expected 16)",
)
check(
diff_cfg.get("num_attention_heads") == 32,
f"[B9] diffusion_model_cfg.num_attention_heads: {diff_cfg.get('num_attention_heads')} (expected 32)",
)
check(
diff_cfg.get("attention_head_dim") == 48,
f"[B10] diffusion_model_cfg.attention_head_dim: {diff_cfg.get('attention_head_dim')} (expected 48)",
)
# I4: No internal/legacy field names
legacy_fields = ["vlm_model_path", "GrootN1d7"]
for lf in legacy_fields:
check(lf not in cfg, f"[I4] No legacy field '{lf}' in config.json")
# B26 / I2: torch_dtype
dtype_val = cfg.get("torch_dtype") or cfg.get("model_dtype")
info(f"[B26/I2] torch_dtype/model_dtype = {dtype_val!r}")
# I1: architectures
archs = cfg.get("architectures")
if archs is not None:
check(
"Gr00tN1d7" in archs,
f"[I1] architectures contains 'Gr00tN1d7': {archs}",
)
else:
info("[I1] 'architectures' field not present")
def check_dim_c_embodiment_id(hf_dir, model_name, model_def):
"""Dimension C β€” embodiment_id.json checks."""
print(f"\n--- {model_name} ---")
eid = load_hf_json(hf_dir, model_name, "embodiment_id.json", model_def.get("subdir"))
if eid is None:
check(False, "embodiment_id.json found", skip=True)
return
projector_index = load_projector_index()
# C1: all entries match code
for tag, idx in eid.items():
code_idx = projector_index.get(tag)
if code_idx is not None:
check(
idx == code_idx,
f"[C1] {tag}: HF={idx} vs code={code_idx}",
)
else:
check(
False, f"[C1] {tag} not in code EMBODIMENT_TAG_TO_PROJECTOR_INDEX", warn_only=True
)
# C2: pretrain tags present (derived from source of truth)
_, PRETRAIN_TAGS, _ = load_embodiment_tags()
pretrain_tag_values = [t.value for t in PRETRAIN_TAGS]
for pt in pretrain_tag_values:
check(
pt in eid,
f"[C2] Pretrain tag '{pt}' present in embodiment_id.json",
)
def check_dim_d_statistics(hf_dir, model_name, model_def):
"""Dimension D β€” statistics.json checks."""
print(f"\n--- {model_name} ---")
stats = load_hf_json(hf_dir, model_name, "statistics.json", model_def.get("subdir"))
pc = load_hf_json(hf_dir, model_name, "processor_config.json", model_def.get("subdir"))
if stats is None:
check(False, "statistics.json found", skip=True)
return
pk = pc.get("processor_kwargs", {}) if pc else {}
use_percentiles = pk.get("use_percentiles", True)
for tag in model_def["embodiment_tags"]:
tag_stats = stats.get(tag)
check(tag_stats is not None, f"[D1] Top-level key '{tag}' in statistics.json")
if not tag_stats:
continue
# D2: state/action sub-dicts
check("state" in tag_stats, f"[D2] {tag} has 'state' sub-dict")
check("action" in tag_stats, f"[D2] {tag} has 'action' sub-dict")
# D3: modality key coverage
hf_mc = pk.get("modality_configs", {}).get(tag, {})
for modality in ("state", "action"):
if modality not in tag_stats or modality not in hf_mc:
continue
expected_keys = hf_mc[modality].get("modality_keys", [])
actual_keys = list(tag_stats[modality].keys())
for ek in expected_keys:
check(
ek in actual_keys,
f"[D3] {tag}/{modality}: key '{ek}' in statistics",
)
# D4: normalization fields
for modality in ("state", "action"):
if modality not in tag_stats:
continue
for key, key_stats in tag_stats[modality].items():
check(
"min" in key_stats and "max" in key_stats,
f"[D4] {tag}/{modality}/{key}: has min/max",
)
if use_percentiles:
has_pct = "q01" in key_stats or "p01" in key_stats
check(
has_pct,
f"[D4] {tag}/{modality}/{key}: has percentile fields (use_percentiles={use_percentiles})",
)
# D6: No NaN/Inf
def check_finite(obj, path=""):
if isinstance(obj, dict):
for k, v in obj.items():
check_finite(v, f"{path}/{k}")
elif isinstance(obj, list):
for i, v in enumerate(obj):
if isinstance(v, (int, float)):
check(
math.isfinite(v),
f"[D6] {path}[{i}] = {v} is finite",
)
elif isinstance(obj, (int, float)):
check(math.isfinite(obj), f"[D6] {path} = {obj} is finite")
check_finite(tag_stats, f"{tag}")
def check_dim_f1_cross_file(hf_dir, model_name, model_def):
"""Dimension F1 β€” config.json ↔ processor_config.json agreement."""
print(f"\n--- {model_name} ---")
cfg = load_hf_json(hf_dir, model_name, "config.json", model_def.get("subdir"))
pc = load_hf_json(hf_dir, model_name, "processor_config.json", model_def.get("subdir"))
if cfg is None or pc is None:
check(False, "Both config.json and processor_config.json found", skip=True)
return
pk = pc.get("processor_kwargs", {})
# F1: max_state_dim, max_action_dim, action_horizon
check(
cfg.get("max_state_dim") == pk.get("max_state_dim"),
f"[F1] max_state_dim: config.json={cfg.get('max_state_dim')} vs processor={pk.get('max_state_dim')}",
)
check(
cfg.get("max_action_dim") == pk.get("max_action_dim"),
f"[F1] max_action_dim: config.json={cfg.get('max_action_dim')} vs processor={pk.get('max_action_dim')}",
)
check(
cfg.get("action_horizon") == pk.get("max_action_horizon"),
f"[F1] action_horizon={cfg.get('action_horizon')} vs max_action_horizon={pk.get('max_action_horizon')}",
)
# F7: use_percentiles, use_relative_action
check(
cfg.get("use_percentiles") == pk.get("use_percentiles"),
f"[F7] use_percentiles: config.json={cfg.get('use_percentiles')} vs processor={pk.get('use_percentiles')}",
)
check(
cfg.get("use_relative_action") == pk.get("use_relative_action"),
f"[F7] use_relative_action: config.json={cfg.get('use_relative_action')} vs processor={pk.get('use_relative_action')}",
)
# B13 cross: model_name
check(
cfg.get("model_name") == pk.get("model_name"),
f"[B13] model_name: config.json={cfg.get('model_name')} vs processor={pk.get('model_name')}",
)
# ──────────────────────── Test Fixture Check ────────────────────────
def check_test_fixture():
"""Check the test fixture processor_config against source of truth."""
print("\n" + "=" * 70)
print("TEST FIXTURE β€” tests/fixtures/processor_config/ Check")
print("=" * 70)
fixture_dir = REPO_ROOT / "tests/fixtures/processor_config"
pc_path = fixture_dir / "processor_config.json"
eid_path = fixture_dir / "embodiment_id.json"
stats_path = fixture_dir / "statistics.json"
if not pc_path.exists():
check(False, "Test fixture processor_config.json exists", skip=True)
return
with open(pc_path) as f:
pc = json.load(f)
if not eid_path.exists():
check(False, "Test fixture embodiment_id.json exists", skip=True)
return
with open(eid_path) as f:
eid = json.load(f)
if not stats_path.exists():
check(False, "Test fixture statistics.json exists", skip=True)
return
with open(stats_path) as f:
stats = json.load(f)
modality_configs = load_modality_configs()
model_cfg = load_model_config_defaults()
projector_index = load_projector_index()
pk = pc.get("processor_kwargs", {})
# processor_class
check(
pc.get("processor_class") == "Gr00tN1d7Processor",
f"processor_class = '{pc.get('processor_class')}'",
)
# modality_configs: libero_sim
hf_mc = pk.get("modality_configs", {}).get("libero_sim")
code_mc = modality_configs.get("libero_sim")
check(hf_mc is not None, "modality_configs contains 'libero_sim'")
if hf_mc and code_mc:
# video delta_indices
check(
hf_mc["video"]["delta_indices"] == code_mc["video"]["delta_indices"],
f"video.delta_indices: fixture={hf_mc['video']['delta_indices']} vs code={code_mc['video']['delta_indices']}",
)
# video key count
check(
len(hf_mc["video"]["modality_keys"]) == len(code_mc["video"]["modality_keys"]),
f"video key count: fixture={len(hf_mc['video']['modality_keys'])} vs code={len(code_mc['video']['modality_keys'])}",
)
# state keys
check(
hf_mc["state"]["modality_keys"] == code_mc["state"]["modality_keys"],
"state.modality_keys match",
)
# action delta_indices
check(
hf_mc["action"]["delta_indices"] == code_mc["action"]["delta_indices"],
f"action.delta_indices: fixture len={len(hf_mc['action']['delta_indices'])} vs code len={len(code_mc['action']['delta_indices'])}",
)
# action keys
check(
hf_mc["action"]["modality_keys"] == code_mc["action"]["modality_keys"],
"action.modality_keys match",
)
# language keys
check(
hf_mc["language"]["modality_keys"] == code_mc["language"]["modality_keys"],
"language.modality_keys match",
)
# Scalar params β€” notable mismatches to flag
print("\n Scalar Parameter Comparison (fixture vs model config defaults):")
info(
f"max_state_dim: fixture={pk.get('max_state_dim')} vs model_cfg default={model_cfg.max_state_dim}"
)
info(
f"max_action_dim: fixture={pk.get('max_action_dim')} vs model_cfg default={model_cfg.max_action_dim}"
)
info(
f"max_action_horizon: fixture={pk.get('max_action_horizon')} vs model_cfg.action_horizon={model_cfg.action_horizon}"
)
info(
f"use_percentiles: fixture={pk.get('use_percentiles')} vs model_cfg={model_cfg.use_percentiles}"
)
info(
f"apply_sincos_state_encoding: fixture={pk.get('apply_sincos_state_encoding')} vs model_cfg={model_cfg.apply_sincos_state_encoding}"
)
info(
f"use_relative_action: fixture={pk.get('use_relative_action')} vs model_cfg={model_cfg.use_relative_action}"
)
# Check missing fields (new fields added to save_pretrained)
expected_fields = [
"letter_box_transform",
"exclude_state",
"state_dropout_prob",
"use_mean_std",
]
print("\n New Fields Check (may be missing in older fixtures):")
for field in expected_fields:
present = field in pk
check(present, f"Field '{field}' present in fixture processor_config", warn_only=True)
# embodiment_id.json
print("\n Embodiment ID Check:")
for tag, idx in eid.items():
code_idx = projector_index.get(tag)
check(
code_idx is not None and idx == code_idx,
f" {tag}: fixture={idx} vs code={code_idx}",
)
# statistics.json structure
print("\n Statistics Structure Check:")
for tag in pk.get("modality_configs", {}).keys():
check(tag in stats, f" statistics.json has key '{tag}'")
if tag in stats:
check("state" in stats[tag], f" {tag}/state present")
check("action" in stats[tag], f" {tag}/action present")
# ──────────────────────── Main ────────────────────────
def main():
parser = argparse.ArgumentParser(description="Validate HF config alignment")
parser.add_argument(
"--hf-config-dir",
type=str,
default=None,
help="Directory containing downloaded HF configs (subdirs per model)",
)
args = parser.parse_args()
print("β•”" + "═" * 68 + "β•—")
print("β•‘ HuggingFace Config Alignment Validation β•‘")
print("β•š" + "═" * 68 + "╝")
# Always run: internal consistency checks
check_dim_f_internal_consistency()
check_dim_e_documentation()
check_dim_f2_modality_json()
check_dim_j_enum_serialization()
check_test_fixture()
# HF config checks (if directory provided)
if args.hf_config_dir:
hf_dir = Path(args.hf_config_dir)
if not hf_dir.exists():
print(f"\n[ERROR] HF config directory not found: {hf_dir}")
sys.exit(1)
for model_name, model_def in HF_MODELS.items():
print("\n" + "=" * 70)
print(f"DIMENSION A β€” processor_config.json: {model_name}")
print("=" * 70)
check_dim_a_processor_config(hf_dir, model_name, model_def)
for model_name, model_def in HF_MODELS.items():
print("\n" + "=" * 70)
print(f"DIMENSION B β€” config.json: {model_name}")
print("=" * 70)
check_dim_b_config_json(hf_dir, model_name, model_def)
for model_name, model_def in HF_MODELS.items():
print("\n" + "=" * 70)
print(f"DIMENSION C β€” embodiment_id.json: {model_name}")
print("=" * 70)
check_dim_c_embodiment_id(hf_dir, model_name, model_def)
for model_name, model_def in HF_MODELS.items():
print("\n" + "=" * 70)
print(f"DIMENSION D β€” statistics.json: {model_name}")
print("=" * 70)
check_dim_d_statistics(hf_dir, model_name, model_def)
for model_name, model_def in HF_MODELS.items():
print("\n" + "=" * 70)
print(f"DIMENSION F1 β€” Cross-file: {model_name}")
print("=" * 70)
check_dim_f1_cross_file(hf_dir, model_name, model_def)
else:
print("\n" + "=" * 70)
print("HF CONFIG CHECKS SKIPPED β€” No --hf-config-dir provided")
print("To run full checks, download HF configs first:")
print(" uv run huggingface-cli login")
print(" # Then download configs for each model (see checklist)")
print(
" uv run python scripts/validate_hf_config_alignment.py --hf-config-dir /tmp/hf_configs"
)
print("=" * 70)
# Summary
print("\n" + "=" * 70)
print("SUMMARY")
print("=" * 70)
print(f" {PASS}: {pass_count}")
print(f" {FAIL}: {fail_count}")
print(f" {WARN}: {warn_count}")
print(f" {SKIP}: {skip_count}")
total = pass_count + fail_count
if total > 0:
print(f" Pass rate: {pass_count}/{total} ({100 * pass_count / total:.1f}%)")
if fail_count > 0:
sys.exit(1)
if __name__ == "__main__":
main()