LeRobot documentation

Environment Processors

Hugging Face's logo
Join the Hugging Face community

and get access to the augmented documentation experience

to get started

Environment Processors

Environment processors are a critical layer in LeRobot’s data processing architecture that handle environment-specific transformations, separate from policy-specific processing. This separation of concerns enables cleaner code, better modularity, and easier experimentation with different environments and policies.

Why Environment Processors?

When working with different robot environments (LIBERO, MetaWorld, Aloha, etc.), each environment often has unique data formats, coordinate systems, and conventions that need standardization before policy processing. Without environment processors, these transformations would be:

  1. Hardcoded in environment code - Making it difficult to experiment with different state representations
  2. Duplicated across policies - Each policy would need to handle environment-specific quirks
  3. Mixed with policy logic - Violating separation of concerns and making debugging harder

Environment processors solve this by providing a dedicated processing layer between raw environment observations and policy inputs.

The Processing Pipeline

Here’s how data flows through the complete processing pipeline during evaluation:

# In lerobot_eval.py rollout() function:

# 1. Raw environment observation (numpy arrays, various formats)
raw_observation = env.step(action)

# 2. Convert numpy to torch, normalize images [0,1]
observation = preprocess_observation(raw_observation)

# 3. Add task metadata (for multi-task environments)
observation = add_envs_task(env, observation)

# 4. ENVIRONMENT-SPECIFIC preprocessing (NEW!)
#    - Flatten robot states
#    - Rotate images to match dataset conventions
#    - Handle environment-specific coordinate systems
observation = env_preprocessor(observation)

# 5. POLICY-SPECIFIC preprocessing
#    - Normalize with dataset statistics
#    - Add batch dimensions
#    - Move to GPU
#    - Tokenize language instructions
observation = preprocessor(observation)

# 6. Policy inference
action = policy.select_action(observation)

# 7. POLICY-SPECIFIC postprocessing
#    - Unnormalize actions
#    - Remove batch dimensions
action = postprocessor(action)

# 8. ENVIRONMENT-SPECIFIC postprocessing (NEW!)
#    - Convert action formats if needed
#    - Apply environment-specific constraints
action_transition = {"action": action}
action_transition = env_postprocessor(action_transition)
action = action_transition["action"]

# 9. Execute in environment
env.step(action)

The Benefits

1. Separation of Concerns

Environment processors handle transformations specific to the environment’s data format, while policy processors handle transformations specific to the model’s requirements.

# ❌ Before: Mixed concerns
class LiberoVLAPolicy:
    def preprocess(self, obs):
        # Environment-specific: Flatten robot state (shouldn't be in policy!)
        state = self._flatten_robot_state(obs["robot_state"])
        # Policy-specific: Normalize with dataset stats
        state = self.normalizer(state)
        return state

# ✅ After: Clear separation
# Environment processor: Handles LIBERO's nested robot state
env_preprocessor = LiberoProcessorStep()  # Flattens robot_state

# Policy processor: Handles model requirements
policy_preprocessor = NormalizerProcessorStep(stats=dataset_stats)

2. Flexibility and Reusability

The same policy can work with different environment processors, and the same environment processor can work with different policies:

# Use SmolVLA policy with LIBERO environment
libero_preprocessor, libero_postprocessor = make_env_pre_post_processors(libero_cfg)
smolvla_preprocessor, smolvla_postprocessor = make_pre_post_processors(smolvla_cfg)

# Or use ACT policy with the same LIBERO environment
libero_preprocessor, libero_postprocessor = make_env_pre_post_processors(libero_cfg)
act_preprocessor, act_postprocessor = make_pre_post_processors(act_cfg)

3. Easier Experimentation

Want to try different state representations for LIBERO? Just create a new processor:

# Original: 8D state (pos + quat→axisangle + gripper)
@ProcessorStepRegistry.register("libero_processor")
class LiberoProcessorStep(ObservationProcessorStep):
    def _process_observation(self, obs):
        eef_pos = robot_state["eef"]["pos"]          # 3D
        eef_axisangle = quat2axisangle(quat)         # 3D
        gripper = robot_state["gripper"]["qpos"]     # 2D
        state = torch.cat([eef_pos, eef_axisangle, gripper], dim=-1)  # 8D
        return state

# Experiment: Add velocity for better control
@ProcessorStepRegistry.register("libero_velocity_processor")
class LiberoVelocityProcessorStep(ObservationProcessorStep):
    def _process_observation(self, obs):
        # Include velocities for 14D state
        eef_pos = robot_state["eef"]["pos"]          # 3D
        eef_axisangle = quat2axisangle(quat)         # 3D
        eef_vel = robot_state["eef"]["vel"]          # 3D  (NEW)
        gripper_pos = robot_state["gripper"]["qpos"] # 2D
        gripper_vel = robot_state["gripper"]["qvel"] # 3D  (NEW)
        state = torch.cat([eef_pos, eef_axisangle, eef_vel,
                          gripper_pos, gripper_vel], dim=-1)  # 14D
        return state

4. Cleaner Environment Code

Environments expose all available data without needing to know what downstream models will use:

# LIBERO environment exposes full robot state
observation = {
    "pixels": {"image": img, "image2": img2},
    "robot_state": {
        "eef": {"pos": ..., "quat": ..., "vel": ..., "mat": ..., "axisangle": ...},
        "gripper": {"qpos": ..., "qvel": ...},
        "joints": {"pos": ..., "vel": ...}
    }
}

# Environment processor decides what to use
# Policy processor handles model-specific transformations

Using Environment Processors

Factory Function

The make_env_pre_post_processors function follows the same pattern as make_pre_post_processors for policies:

from lerobot.envs.factory import make_env_pre_post_processors
from lerobot.envs.configs import LiberoEnv, PushtEnv

# For LIBERO: Returns LiberoProcessorStep in preprocessor
libero_cfg = LiberoEnv(task="libero_spatial", camera_name=["agentview"])
env_preprocessor, env_postprocessor = make_env_pre_post_processors(libero_cfg)

# For other environments: Returns identity processors (no-op)
pusht_cfg = PushtEnv()
env_preprocessor, env_postprocessor = make_env_pre_post_processors(pusht_cfg)

Implementation in envs/factory.py

def make_env_pre_post_processors(
    env_cfg: EnvConfig,
) -> tuple[
    PolicyProcessorPipeline[dict[str, Any], dict[str, Any]],
    PolicyProcessorPipeline[dict[str, Any], dict[str, Any]],
]:
    """
    Create preprocessor and postprocessor pipelines for environment observations.

    Args:
        env_cfg: The configuration of the environment.

    Returns:
        A tuple containing:
            - preprocessor: Pipeline that processes environment observations
            - postprocessor: Pipeline that processes environment outputs
    """
    # For LIBERO environments, add the LiberoProcessorStep to preprocessor
    if isinstance(env_cfg, LiberoEnv) or "libero" in env_cfg.type:
        preprocessor = PolicyProcessorPipeline(steps=[LiberoProcessorStep()])
    else:
        # For all other environments, return an identity preprocessor
        preprocessor = PolicyProcessorPipeline(steps=[])

    # Postprocessor is currently identity for all environments
    # Future: Could add environment-specific action transformations
    postprocessor = PolicyProcessorPipeline(steps=[])

    return preprocessor, postprocessor

Integration in Evaluation

In lerobot_eval.py, the environment processors are created once and used throughout:

def eval_main(cfg: EvalPipelineConfig):
    # Create environment
    envs = make_env(cfg.env, n_envs=cfg.eval.batch_size)

    # Create policy
    policy = make_policy(cfg=cfg.policy, env_cfg=cfg.env)

    # Create policy processors
    preprocessor, postprocessor = make_pre_post_processors(
        policy_cfg=cfg.policy,
        pretrained_path=cfg.policy.pretrained_path,
    )

    # Create environment processors (NEW!)
    env_preprocessor, env_postprocessor = make_env_pre_post_processors(env_cfg=cfg.env)

    # Run evaluation with both processor types
    eval_policy_all(
        envs=envs,
        policy=policy,
        env_preprocessor=env_preprocessor,      # Environment-specific
        env_postprocessor=env_postprocessor,    # Environment-specific
        preprocessor=preprocessor,              # Policy-specific
        postprocessor=postprocessor,            # Policy-specific
        n_episodes=cfg.eval.n_episodes,
    )

Example: LIBERO Environment Processor

The LiberoProcessorStep demonstrates a real-world environment processor:

from lerobot.processor.pipeline import ObservationProcessorStep

@dataclass
@ProcessorStepRegistry.register(name="libero_processor")
class LiberoProcessorStep(ObservationProcessorStep):
    """
    Processes LIBERO observations into the LeRobot format.

    **State Processing:**
    - Extracts end-effector position (3D)
    - Converts quaternion to axis-angle representation (3D)
    - Extracts gripper joint positions (2D)
    - Concatenates into 8D state vector

    **Image Processing:**
    - Rotates images 180° to match HuggingFaceVLA/libero convention
    """

    def _process_observation(self, observation):
        processed_obs = observation.copy()

        # Process images: Flip 180° for camera convention
        for key in list(processed_obs.keys()):
            if key.startswith("observation.images."):
                img = processed_obs[key]
                img = torch.flip(img, dims=[2, 3])  # Flip H and W
                processed_obs[key] = img

        # Process robot_state: Flatten to 8D vector
        if "observation.robot_state" in processed_obs:
            robot_state = processed_obs.pop("observation.robot_state")

            eef_pos = robot_state["eef"]["pos"]           # (B, 3)
            eef_quat = robot_state["eef"]["quat"]         # (B, 4)
            gripper_qpos = robot_state["gripper"]["qpos"] # (B, 2)

            # Convert quaternion to axis-angle
            eef_axisangle = self._quat2axisangle(eef_quat)  # (B, 3)

            # Concatenate into single state vector
            state = torch.cat((eef_pos, eef_axisangle, gripper_qpos), dim=-1)
            state = state.float()

            processed_obs["observation.state"] = state

        return processed_obs

Why These Transformations?

  1. Image Rotation: The HuggingFaceVLA/libero dataset has images rotated 180° from the raw LIBERO simulator. The processor handles this convention mismatch so policies trained on the dataset work seamlessly.

  2. State Flattening: The raw LIBERO environment exposes nested dictionaries with all available state information (position, quaternion, velocity, matrix representation, etc.). The processor:

    • Selects the relevant components (pos, quat, gripper)
    • Converts quaternion to axis-angle (more suitable for learning)
    • Flattens to a single 8D vector that policies expect
  3. Flexibility: The environment still exposes all raw data. If you want to try different state representations (e.g., including velocities, using matrix representation instead of axis-angle), you can create a new processor without modifying the environment code.

Adding Environment Processors for New Environments

To add environment processors for a new environment:

1. Create the Processor Step

# In src/lerobot/processor/env_processor.py

@dataclass
@ProcessorStepRegistry.register(name="myenv_processor")
class MyEnvProcessorStep(ObservationProcessorStep):
    """Process observations from MyEnv."""

    def _process_observation(self, observation):
        processed = observation.copy()

        # Your environment-specific transformations
        if "myenv.specific.state" in processed:
            state = processed.pop("myenv.specific.state")
            # Transform to standard format
            processed["observation.state"] = self._transform_state(state)

        return processed

2. Update the Factory

# In src/lerobot/envs/factory.py

def make_env_pre_post_processors(env_cfg: EnvConfig):
    if isinstance(env_cfg, LiberoEnv) or "libero" in env_cfg.type:
        preprocessor = PolicyProcessorPipeline(steps=[LiberoProcessorStep()])
    elif isinstance(env_cfg, MyEnvConfig) or "myenv" in env_cfg.type:
        preprocessor = PolicyProcessorPipeline(steps=[MyEnvProcessorStep()])
    else:
        preprocessor = PolicyProcessorPipeline(steps=[])

    postprocessor = PolicyProcessorPipeline(steps=[])
    return preprocessor, postprocessor

3. Use in Evaluation

No changes needed! The evaluation script automatically uses the appropriate processor:

lerobot-eval \
    --policy.path=lerobot/my_policy \
    --env.type=myenv \  # Automatically uses MyEnvProcessorStep
    --eval.n_episodes=10

Future: Environment Postprocessors

Currently, postprocessors are identity (no-op) for all environments. Future use cases include:

Action Space Transformations

@dataclass
class MyEnvActionPostprocessor(ProcessorStep):
    """Convert policy actions to environment-specific format."""

    def __call__(self, transition: EnvTransition) -> EnvTransition:
        action = transition["action"]

        # Example: Convert from Cartesian to joint space
        if self.action_space == "joint":
            action = self.ik_solver(action)

        # Example: Apply environment-specific safety limits
        action = torch.clamp(action, self.min_action, self.max_action)

        transition["action"] = action
        return transition

Coordinate System Conversions

@dataclass
class CoordinateTransformPostprocessor(ProcessorStep):
    """Transform actions between coordinate systems."""

    def __call__(self, transition: EnvTransition) -> EnvTransition:
        action = transition["action"]

        # Example: Policy outputs in world frame, env expects base frame
        action = self.world_to_base_transform(action)

        transition["action"] = action
        return transition

Best Practices

  1. Keep environment processors simple: They should only handle environment-specific data format issues, not complex learning-related transformations.

  2. Use policy processors for model requirements: Normalization, batching, device placement, and tokenization belong in policy processors.

  3. Expose all data from environments: Let processors decide what to use rather than hardcoding choices in the environment.

  4. Document conventions: Clearly document any coordinate system conventions, camera orientations, or data formats that your processor handles.

  5. Test independently: Environment processors should be testable without loading full policies or environments.

Summary

Environment processors provide a clean separation between environment-specific data transformations and policy-specific model requirements. This architecture:

  • ✅ Enables easy experimentation with different state representations
  • ✅ Allows policies to work seamlessly across different environments
  • ✅ Keeps environment code focused on simulation/hardware interface
  • ✅ Makes processor pipelines more maintainable and debuggable
  • ✅ Follows the single responsibility principle

The key insight: Environments define data formats, processors standardize them, policies consume standardized data. Each layer has a clear, focused responsibility.

Update on GitHub