| """IsaacLab Arena EnvHub Environment. |
| |
| For more information, visit https://huggingface.co/docs/lerobot/envhub_isaaclab_arena |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import importlib |
| import importlib.util |
| import logging |
| import os |
| import sys |
| from pathlib import Path |
| import yaml |
|
|
| import gymnasium as gym |
|
|
| from huggingface_hub import hf_hub_download |
| from lerobot.envs.configs import EnvConfig |
|
|
| |
| HUB_REPO_ID = "nvidia/isaaclab-arena-envs" |
| EXAMPLE_ENVS = "example_envs.yaml" |
|
|
| def _download_hub_file(filename: str): |
| return hf_hub_download(repo_id=HUB_REPO_ID, filename=filename) |
|
|
| def _download_and_import(filename: str): |
| """Download file from Hub and import as module.""" |
| local_path = _download_hub_file(filename) |
| module_dir = os.path.dirname(local_path) |
|
|
| |
| if module_dir not in sys.path: |
| sys.path.insert(0, module_dir) |
|
|
| module_name = filename.replace(".py", "") |
|
|
| |
| with open(local_path) as f: |
| content = f.read() |
|
|
| |
| content = content.replace("from .errors import", "from errors import") |
| content = content.replace("from .isaaclab_env_wrapper import", "from isaaclab_env_wrapper import") |
|
|
| |
| module = importlib.util.module_from_spec(importlib.util.spec_from_file_location(module_name, local_path)) |
| sys.modules[module_name] = module |
|
|
| |
| code = compile(content, local_path, "exec") |
| exec(code, module.__dict__) |
|
|
| return module |
|
|
|
|
| try: |
| from .errors import IsaacLabArenaCameraKeyError, IsaacLabArenaStateKeyError |
| from .isaaclab_env_wrapper import IsaacLabEnvWrapper, cleanup_isaaclab |
| except ImportError: |
| _errors = _download_and_import("errors.py") |
| _isaaclab_wrapper = _download_and_import("isaaclab_env_wrapper.py") |
| IsaacLabEnvWrapper = _isaaclab_wrapper.IsaacLabEnvWrapper |
| cleanup_isaaclab = _isaaclab_wrapper.cleanup_isaaclab |
| IsaacLabArenaCameraKeyError = _errors.IsaacLabArenaCameraKeyError |
| IsaacLabArenaStateKeyError = _errors.IsaacLabArenaStateKeyError |
|
|
|
|
| def validate_config( |
| env, |
| state_keys: tuple[str, ...], |
| camera_keys: tuple[str, ...], |
| cfg_state_dim: int, |
| cfg_action_dim: int, |
| ) -> None: |
| """Validate observation keys and dimensions against IsaacLab managers.""" |
| obs_manager = env.observation_manager |
| active_terms = obs_manager.active_terms |
| policy_terms = set(active_terms.get("policy", [])) |
| camera_terms = set(active_terms.get("camera_obs", [])) |
|
|
| |
| missing_state = [k for k in state_keys if k not in policy_terms] |
| if missing_state: |
| raise IsaacLabArenaStateKeyError(missing_state, policy_terms) |
|
|
| missing_cam = [k for k in camera_keys if k not in camera_terms] |
| if missing_cam: |
| raise IsaacLabArenaCameraKeyError(missing_cam, camera_terms) |
|
|
| |
| env_action_dim = env.action_space.shape[-1] |
| if cfg_action_dim != env_action_dim: |
| raise ValueError(f"action_dim mismatch: config={cfg_action_dim}, env={env_action_dim}") |
|
|
| |
| policy_dims = obs_manager.group_obs_dim.get("policy", []) |
| policy_names = active_terms.get("policy", []) |
| term_dims = dict(zip(policy_names, policy_dims, strict=False)) |
|
|
| expected_state_dim = 0 |
| for key in state_keys: |
| if key in term_dims: |
| shape = term_dims[key] |
| dim = 1 |
| for s in shape if isinstance(shape, (tuple, list)) else [shape]: |
| dim *= s |
| expected_state_dim += dim |
|
|
| if cfg_state_dim != expected_state_dim: |
| raise ValueError( |
| f"state_dim mismatch: config={cfg_state_dim}, " |
| f"computed={expected_state_dim}. " |
| f"Term dims: {term_dims}" |
| ) |
|
|
| logging.info(f"Validated: state_keys={state_keys}, camera_keys={camera_keys}") |
|
|
|
|
| def resolve_environment_alias(environment: str) -> str: |
| envs_mapping = _download_hub_file(EXAMPLE_ENVS) |
| with open(envs_mapping, "r") as f: |
| envs_mapping = yaml.safe_load(f) |
|
|
| module = ( |
| f"{envs_mapping['repo']['base_module']}.{envs_mapping['repo']['envs'][environment]}" |
| ) |
| return module |
|
|
| def _create_isaaclab_env(config: dict, n_envs: int) -> dict[str, dict[int, gym.vector.VectorEnv]]: |
| """Create IsaacLab Arena environment from configuration. |
| |
| Args: |
| config: Configuration dictionary. |
| n_envs: Number of parallel environments. |
| |
| Returns: |
| Dict mapping environment name to {task_id: VectorEnv}. |
| """ |
| from isaaclab.app import AppLauncher |
|
|
| if config.get("enable_pinocchio", True): |
| import pinocchio |
|
|
| |
| config["num_envs"] = n_envs |
|
|
| |
| as_isaaclab_argparse = argparse.Namespace(**config) |
|
|
| logging.info("Launching IsaacLab simulation app...") |
| app_launcher = AppLauncher(as_isaaclab_argparse) |
|
|
| from isaaclab_arena.environments.arena_env_builder import ArenaEnvBuilder |
|
|
| environment = config.get("environment") |
| if environment is None: |
| raise ValueError("cfg.environment must be specified") |
|
|
| |
| environment_path = resolve_environment_alias(environment) |
| logging.info(f"Creating environment: {environment_path}") |
|
|
| module_path, class_name = environment_path.rsplit(".", 1) |
| environment_module = importlib.import_module(module_path) |
| environment_class = getattr(environment_module, class_name)() |
|
|
| env_builder = ArenaEnvBuilder(environment_class.get_env(as_isaaclab_argparse), as_isaaclab_argparse) |
|
|
| |
| render_mode = "rgb_array" if config.get("enable_cameras", False) else None |
|
|
| raw_env = env_builder.make_registered() |
|
|
| |
| if render_mode and hasattr(raw_env, "render_mode"): |
| raw_env.render_mode = render_mode |
| logging.info(f"Set render_mode={render_mode} on underlying IsaacLab env") |
|
|
| |
| state_keys = tuple(k.strip() for k in (config.get("state_keys") or "").split(",") if k.strip()) |
| camera_keys = tuple(k.strip() for k in (config.get("camera_keys") or "").split(",") if k.strip()) |
| try: |
| validate_config( |
| raw_env, |
| state_keys, |
| camera_keys, |
| config.get("state_dim", 54), |
| config.get("action_dim", 36), |
| ) |
| except (IsaacLabArenaCameraKeyError, IsaacLabArenaStateKeyError, ValueError) as e: |
| logging.error(f"Validation failed: {e}") |
| cleanup_isaaclab(raw_env, app_launcher) |
| raise |
| except Exception as e: |
| logging.error(f"Validation failed with unexpected error: {type(e).__name__}: {e}") |
| cleanup_isaaclab(raw_env, app_launcher) |
| raise |
|
|
| |
| task = config.get("task") |
| if task is None: |
| task = f"Complete the {environment.replace('_', ' ')} task." |
|
|
| |
| wrapped_env = IsaacLabEnvWrapper( |
| raw_env, |
| episode_length=config.get("episode_length", 300), |
| task=task, |
| render_mode=render_mode, |
| simulation_app=app_launcher, |
| ) |
| logging.info(f"Created: {environment} with {wrapped_env.num_envs} envs, render_mode={render_mode}") |
|
|
| return {environment: {0: wrapped_env}} |
|
|
|
|
| def make_env( |
| n_envs: int = 1, |
| use_async_envs: bool = False, |
| cfg: EnvConfig | None = None, |
| ) -> dict[str, dict[int, gym.vector.VectorEnv]]: |
| """Create IsaacLab Arena environments (EnvHub-compatible API). |
| |
| This function provides the standard EnvHub interface for loading |
| environments from the Hugging Face Hub. Configuration is passed |
| directly via kwargs or loaded from configs/config.yaml. |
| |
| Args: |
| n_envs: Number of parallel environments (default: 1). |
| use_async_envs: Ignored for IsaacLab (GPU-based batched execution). |
| cfg: Configuration object with environment parameters. Required keys: |
| - environment: Environment name or alias (e.g., "gr1_microwave") |
| - headless: Whether to run headless (default: True) |
| - enable_cameras: Enable camera rendering (default: False) |
| - episode_length: Max steps per episode (default: 300) |
| - state_keys: Comma-separated observation keys |
| - camera_keys: Comma-separated camera observation keys |
| - state_dim: Expected state dimension |
| - action_dim: Expected action dimension |
| |
| Returns: |
| Dict mapping environment name to {task_id: VectorEnv}. |
| Format: {suite_name: {0: wrapped_vector_env}} |
| |
| Note: |
| IsaacLab environments use GPU-based batched execution, so |
| `use_async_envs` is ignored. The returned wrapper provides |
| VectorEnv compatibility. |
| """ |
| if n_envs < 1: |
| raise ValueError("`n_envs` must be at least 1") |
|
|
| if not hasattr(cfg, "environment") or cfg.environment is None: |
| raise ValueError( |
| "No 'environment' specified. Pass it via kwargs or create " |
| "configs/config.yaml with environment settings." |
| ) |
| |
| |
| |
| config = { |
| "environment": cfg.environment, |
| "embodiment": cfg.embodiment, |
| "object": cfg.object, |
| "mimic": cfg.mimic, |
| "teleop_device": cfg.teleop_device, |
| "seed": cfg.seed, |
| "device": cfg.device, |
| "disable_fabric": cfg.disable_fabric, |
| "enable_cameras": cfg.enable_cameras, |
| "headless": cfg.headless, |
| "enable_pinocchio": cfg.enable_pinocchio, |
| "episode_length": cfg.episode_length, |
| "state_dim": cfg.state_dim, |
| "action_dim": cfg.action_dim, |
| "camera_height": cfg.camera_height, |
| "camera_width": cfg.camera_width, |
| "video": cfg.video, |
| "video_length": cfg.video_length, |
| "video_interval": cfg.video_interval, |
| "state_keys": cfg.state_keys, |
| "camera_keys": cfg.camera_keys or "", |
| "task": cfg.task, |
| } |
|
|
| logging.info(f"EnvHub make_env: environment={config.get('environment')}, n_envs={n_envs}") |
| logging.info(f"Config: headless={config.get('headless')}, enable_cameras={config.get('enable_cameras')}") |
| logging.info(f"EnvHub Config: {config}") |
|
|
| return _create_isaaclab_env(config, n_envs) |
|
|