RoboticsDiffusionTransformer / data /hdf5_vla_dataset.py
euijinrnd's picture
Add files using upload-large-folder tool
9de9fbf verified
import os
import fnmatch
import json
import h5py
import yaml
import cv2
import numpy as np
from configs.state_vec import STATE_VEC_IDX_MAPPING
TABLETOP_6D_INDICES_NAMES = [
'left_eef_pos_x','left_eef_pos_y','left_eef_pos_z','left_eef_angle_0','left_eef_angle_1','left_eef_angle_2','left_eef_angle_3','left_eef_angle_4','left_eef_angle_5','left_gripper_open','right_eef_pos_x','right_eef_pos_y','right_eef_pos_z','right_eef_angle_0','right_eef_angle_1','right_eef_angle_2','right_eef_angle_3','right_eef_angle_4','right_eef_angle_5','right_gripper_open']
TABLETOP_6D_INDICES = [STATE_VEC_IDX_MAPPING[n] for n in TABLETOP_6D_INDICES_NAMES]
class TabletopHDF5VLADataset:
"""
This class is used to sample episodes from the embododiment dataset
stored in HDF5.
"""
def __init__(self, task_name) -> None:
# [Modify] The path to the HDF5 dataset directory
# Each HDF5 file contains one episode
dataset_name = task_name
HDF5_DIR = f"/data5/jellyho/tabletop/{dataset_name}/"
self.DATASET_NAME = dataset_name
self.file_paths = []
for root, _, files in os.walk(HDF5_DIR):
for filename in fnmatch.filter(files, '*.hdf5'):
file_path = os.path.join(root, filename)
self.file_paths.append(file_path)
# Load the config
with open('configs/base.yaml', 'r') as file:
config = yaml.safe_load(file)
self.CHUNK_SIZE = config['common']['action_chunk_size']
self.IMG_HISORY_SIZE = config['common']['img_history_size']
self.STATE_DIM = config['common']['state_dim']
# Get each episode's len
episode_lens = []
for file_path in self.file_paths:
valid, res = self.parse_hdf5_file_state_only(file_path)
_len = res['state'].shape[0] if valid else 0
episode_lens.append(_len)
self.episode_sample_weights = np.array(episode_lens) / np.sum(episode_lens)
def __len__(self):
return len(self.file_paths)
def get_dataset_name(self):
return self.DATASET_NAME
def get_item(self, index: int=None, state_only=False):
"""Get a training sample at a random timestep.
Args:
index (int, optional): the index of the episode.
If not provided, a random episode will be selected.
state_only (bool, optional): Whether to return only the state.
In this way, the sample will contain a complete trajectory rather
than a single timestep. Defaults to False.
Returns:
sample (dict): a dictionary containing the training sample.
"""
while True:
if index is None:
file_path = np.random.choice(self.file_paths, p=self.episode_sample_weights)
else:
file_path = self.file_paths[index]
valid, sample = self.parse_hdf5_file(file_path) \
if not state_only else self.parse_hdf5_file_state_only(file_path)
if valid:
return sample
else:
index = np.random.randint(0, len(self.file_paths))
def parse_hdf5_file(self, file_path):
"""[Modify] Parse a hdf5 file to generate a training sample at
a random timestep.
Args:
file_path (str): the path to the hdf5 file
Returns:
valid (bool): whether the episode is valid, which is useful for filtering.
If False, this episode will be dropped.
dict: a dictionary containing the training sample,
{
"meta": {
"dataset_name": str, # the name of your dataset.
"#steps": int, # the number of steps in the episode,
# also the total timesteps.
"instruction": str # the language instruction for this episode.
},
"step_id": int, # the index of the sampled step,
# also the timestep t.
"state": ndarray, # state[t], (1, STATE_DIM).
"state_std": ndarray, # std(state[:]), (STATE_DIM,).
"state_mean": ndarray, # mean(state[:]), (STATE_DIM,).
"state_norm": ndarray, # norm(state[:]), (STATE_DIM,).
"actions": ndarray, # action[t:t+CHUNK_SIZE], (CHUNK_SIZE, STATE_DIM).
"state_indicator", ndarray, # indicates the validness of each dim, (STATE_DIM,).
"cam_high": ndarray, # external camera image, (IMG_HISORY_SIZE, H, W, 3)
# or (IMG_HISORY_SIZE, 0, 0, 0) if unavailable.
"cam_high_mask": ndarray, # indicates the validness of each timestep, (IMG_HISORY_SIZE,) boolean array.
# For the first IMAGE_HISTORY_SIZE-1 timesteps, the mask should be False.
"cam_left_wrist": ndarray, # left wrist camera image, (IMG_HISORY_SIZE, H, W, 3).
# or (IMG_HISORY_SIZE, 0, 0, 0) if unavailable.
"cam_left_wrist_mask": ndarray,
"cam_right_wrist": ndarray, # right wrist camera image, (IMG_HISORY_SIZE, H, W, 3).
# or (IMG_HISORY_SIZE, 0, 0, 0) if unavailable.
# If only one wrist, make it right wrist, plz.
"cam_right_wrist_mask": ndarray
} or None if the episode is invalid.
"""
with h5py.File(file_path, 'r') as f:
states = f['observations']['states']['ee_6d_pos'][:]
actions = f['actions']['ee_6d_pos'][:]
num_steps = states.shape[0]
# [Optional] We drop too-short episode
if num_steps < 20:
return False, None
# We randomly sample a timestep
step_id = np.random.randint(0, num_steps)
# You can also use precomputed language embeddings (recommended)
if self.DATASET_NAME == 'aloha_box_into_pot_easy':
instruction = f['observations']['states']['language_instruction'][0].decode('utf-8')
else:
instruction = f"lang_embed/{self.DATASET_NAME}.pt"
# Assemble the meta
meta = {
"dataset_name": self.DATASET_NAME,
"#steps": num_steps,
"step_id": step_id,
"instruction": instruction
}
# Rescale gripper to [0, 1]
states = states / np.array(
[[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]]
)
actions = actions[step_id:step_id+self.CHUNK_SIZE] / np.array(
[[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]]
)
# Parse the state and action
state = states[step_id:step_id+1]
state_std = np.std(states, axis=0)
state_mean = np.mean(states, axis=0)
state_norm = np.sqrt(np.mean(states**2, axis=0))
if actions.shape[0] < self.CHUNK_SIZE:
# Pad the actions using the last action
actions = np.concatenate([
actions,
np.tile(actions[-1:], (self.CHUNK_SIZE-actions.shape[0], 1))
], axis=0)
# Fill the state/action into the unified vector
def fill_in_state(values):
uni_vec = np.zeros(values.shape[:-1] + (self.STATE_DIM,))
uni_vec[..., TABLETOP_6D_INDICES] = values
return uni_vec
state = fill_in_state(state)
state_indicator = fill_in_state(np.ones_like(state_std))
state_std = fill_in_state(state_std)
state_mean = fill_in_state(state_mean)
state_norm = fill_in_state(state_norm)
# If action's format is different from state's,
# you may implement fill_in_action()
actions = fill_in_state(actions)
# Parse the images
def parse_img(key):
imgs = []
for i in range(max(step_id-self.IMG_HISORY_SIZE+1, 0), step_id+1):
img = f['observations']['images'][key][i]
# imgs.append(cv2.imdecode(np.frombuffer(img, np.uint8), cv2.IMREAD_COLOR))
imgs.append(img)
# print(imgs)
imgs = np.stack(imgs)
if imgs.shape[0] < self.IMG_HISORY_SIZE:
# Pad the images using the first image
imgs = np.concatenate([
np.tile(imgs[:1], (self.IMG_HISORY_SIZE-imgs.shape[0], 1, 1, 1)),
imgs
], axis=0)
return imgs
# `cam_high` is the external camera image
cam_high = parse_img('back')
# For step_id = first_idx - 1, the valid_len should be one
valid_len = min(step_id + 1, self.IMG_HISORY_SIZE)
cam_high_mask = np.array(
[False] * (self.IMG_HISORY_SIZE - valid_len) + [True] * valid_len
)
cam_left_wrist = parse_img('wrist_left')
cam_left_wrist_mask = cam_high_mask.copy()
cam_right_wrist = parse_img('wrist_right')
cam_right_wrist_mask = cam_high_mask.copy()
# print(cam_left_wrist is not None, cam_right_wrist is not None, cam_high is not None)
# Return the resulting sample
# For unavailable images, return zero-shape arrays, i.e., (IMG_HISORY_SIZE, 0, 0, 0)
# E.g., return np.zeros((self.IMG_HISORY_SIZE, 0, 0, 0)) for the key "cam_left_wrist",
# if the left-wrist camera is unavailable on your robot
return True, {
"meta": meta,
"state": state,
"state_std": state_std,
"state_mean": state_mean,
"state_norm": state_norm,
"actions": actions,
"state_indicator": state_indicator,
"cam_high": cam_high,
"cam_high_mask": cam_high_mask,
"cam_left_wrist": cam_left_wrist,
"cam_left_wrist_mask": cam_left_wrist_mask,
"cam_right_wrist": cam_right_wrist,
"cam_right_wrist_mask": cam_right_wrist_mask
}
def parse_hdf5_file_state_only(self, file_path):
"""[Modify] Parse a hdf5 file to generate a state trajectory.
Args:
file_path (str): the path to the hdf5 file
Returns:
valid (bool): whether the episode is valid, which is useful for filtering.
If False, this episode will be dropped.
dict: a dictionary containing the training sample,
{
"state": ndarray, # state[:], (T, STATE_DIM).
"action": ndarray, # action[:], (T, STATE_DIM).
} or None if the episode is invalid.
"""
with h5py.File(file_path, 'r') as f:
states = f['observations']['states']['ee_6d_pos'][:]
actions = f['actions']['ee_6d_pos'][:]
num_steps = states.shape[0]
step_id = np.random.randint(0, num_steps)
# Rescale gripper to [0, 1]
states = states / np.array(
[[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]]
)
actions = actions[step_id:step_id+self.CHUNK_SIZE] / np.array(
[[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]]
)
# Fill the state/action into the unified vector
def fill_in_state(values):
uni_vec = np.zeros(values.shape[:-1] + (self.STATE_DIM,))
uni_vec[..., TABLETOP_6D_INDICES] = values
return uni_vec
state = fill_in_state(states)
action = fill_in_state(actions)
# Return the resulting sample
return True, {
"state": state,
"action": action
}
class AnubisHDF5VLADataset:
"""
This class is used to sample episodes from the embododiment dataset
stored in HDF5.
"""
def __init__(self, task_name) -> None:
# [Modify] The path to the HDF5 dataset directory
# Each HDF5 file contains one episode
dataset_name = task_name
HDF5_DIR = f"/data5/jellyho/anubis_hdf5/{dataset_name}/"
self.DATASET_NAME = dataset_name
self.file_paths = []
for root, _, files in os.walk(HDF5_DIR):
for filename in fnmatch.filter(files, '*.hdf5'):
file_path = os.path.join(root, filename)
self.file_paths.append(file_path)
# Load the config
with open('configs/base.yaml', 'r') as file:
config = yaml.safe_load(file)
self.CHUNK_SIZE = config['common']['action_chunk_size']
self.IMG_HISORY_SIZE = config['common']['img_history_size']
self.STATE_DIM = config['common']['state_dim']
# Get each episode's len
episode_lens = []
for file_path in self.file_paths:
valid, res = self.parse_hdf5_file_state_only(file_path)
_len = res['state'].shape[0] if valid else 0
episode_lens.append(_len)
self.episode_sample_weights = np.array(episode_lens) / np.sum(episode_lens)
def __len__(self):
return len(self.file_paths)
def get_dataset_name(self):
return self.DATASET_NAME
def get_item(self, index: int=None, state_only=False):
"""Get a training sample at a random timestep.
Args:
index (int, optional): the index of the episode.
If not provided, a random episode will be selected.
state_only (bool, optional): Whether to return only the state.
In this way, the sample will contain a complete trajectory rather
than a single timestep. Defaults to False.
Returns:
sample (dict): a dictionary containing the training sample.
"""
while True:
if index is None:
file_path = np.random.choice(self.file_paths, p=self.episode_sample_weights)
else:
file_path = self.file_paths[index]
valid, sample = self.parse_hdf5_file(file_path) \
if not state_only else self.parse_hdf5_file_state_only(file_path)
if valid:
return sample
else:
index = np.random.randint(0, len(self.file_paths))
def parse_hdf5_file(self, file_path):
"""[Modify] Parse a hdf5 file to generate a training sample at
a random timestep.
Args:
file_path (str): the path to the hdf5 file
Returns:
valid (bool): whether the episode is valid, which is useful for filtering.
If False, this episode will be dropped.
dict: a dictionary containing the training sample,
{
"meta": {
"dataset_name": str, # the name of your dataset.
"#steps": int, # the number of steps in the episode,
# also the total timesteps.
"instruction": str # the language instruction for this episode.
},
"step_id": int, # the index of the sampled step,
# also the timestep t.
"state": ndarray, # state[t], (1, STATE_DIM).
"state_std": ndarray, # std(state[:]), (STATE_DIM,).
"state_mean": ndarray, # mean(state[:]), (STATE_DIM,).
"state_norm": ndarray, # norm(state[:]), (STATE_DIM,).
"actions": ndarray, # action[t:t+CHUNK_SIZE], (CHUNK_SIZE, STATE_DIM).
"state_indicator", ndarray, # indicates the validness of each dim, (STATE_DIM,).
"cam_high": ndarray, # external camera image, (IMG_HISORY_SIZE, H, W, 3)
# or (IMG_HISORY_SIZE, 0, 0, 0) if unavailable.
"cam_high_mask": ndarray, # indicates the validness of each timestep, (IMG_HISORY_SIZE,) boolean array.
# For the first IMAGE_HISTORY_SIZE-1 timesteps, the mask should be False.
"cam_left_wrist": ndarray, # left wrist camera image, (IMG_HISORY_SIZE, H, W, 3).
# or (IMG_HISORY_SIZE, 0, 0, 0) if unavailable.
"cam_left_wrist_mask": ndarray,
"cam_right_wrist": ndarray, # right wrist camera image, (IMG_HISORY_SIZE, H, W, 3).
# or (IMG_HISORY_SIZE, 0, 0, 0) if unavailable.
# If only one wrist, make it right wrist, plz.
"cam_right_wrist_mask": ndarray
} or None if the episode is invalid.
"""
with h5py.File(file_path, 'r') as f:
states = f['observation']['eef_pose'][:]
actions = f['action']['eef_pose'][:]
num_steps = states.shape[0]
# [Optional] We drop too-short episode
if num_steps < 20:
return False, None
# We randomly sample a timestep
step_id = np.random.randint(0, num_steps)
# You can also use precomputed language embeddings (recommended)
if self.DATASET_NAME == 'aloha_box_into_pot_easy':
instruction = f['observations']['states']['language_instruction'][0].decode('utf-8')
else:
instruction = f"lang_embed/{self.DATASET_NAME}.pt"
# Assemble the meta
meta = {
"dataset_name": self.DATASET_NAME,
"#steps": num_steps,
"step_id": step_id,
"instruction": instruction
}
# Rescale gripper to [0, 1]
states = states / np.array(
[[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]]
)
actions = actions[step_id:step_id+self.CHUNK_SIZE] / np.array(
[[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]]
)
# Parse the state and action
state = states[step_id:step_id+1]
state_std = np.std(states, axis=0)
state_mean = np.mean(states, axis=0)
state_norm = np.sqrt(np.mean(states**2, axis=0))
if actions.shape[0] < self.CHUNK_SIZE:
# Pad the actions using the last action
actions = np.concatenate([
actions,
np.tile(actions[-1:], (self.CHUNK_SIZE-actions.shape[0], 1))
], axis=0)
# Fill the state/action into the unified vector
def fill_in_state(values):
uni_vec = np.zeros(values.shape[:-1] + (self.STATE_DIM,))
uni_vec[..., TABLETOP_6D_INDICES] = values
return uni_vec
state = fill_in_state(state)
state_indicator = fill_in_state(np.ones_like(state_std))
state_std = fill_in_state(state_std)
state_mean = fill_in_state(state_mean)
state_norm = fill_in_state(state_norm)
# If action's format is different from state's,
# you may implement fill_in_action()
actions = fill_in_state(actions)
# Parse the images
def parse_img(key):
imgs = []
for i in range(max(step_id-self.IMG_HISORY_SIZE+1, 0), step_id+1):
img = f['observation'][key][i]
# imgs.append(cv2.imdecode(np.frombuffer(img, np.uint8), cv2.IMREAD_COLOR))
imgs.append(img)
# print(imgs)
imgs = np.stack(imgs)
if imgs.shape[0] < self.IMG_HISORY_SIZE:
# Pad the images using the first image
imgs = np.concatenate([
np.tile(imgs[:1], (self.IMG_HISORY_SIZE-imgs.shape[0], 1, 1, 1)),
imgs
], axis=0)
return imgs
# `cam_high` is the external camera image
cam_high = parse_img('agentview_image')
# For step_id = first_idx - 1, the valid_len should be one
valid_len = min(step_id + 1, self.IMG_HISORY_SIZE)
cam_high_mask = np.array(
[False] * (self.IMG_HISORY_SIZE - valid_len) + [True] * valid_len
)
cam_left_wrist = parse_img('wrist_left_image')
cam_left_wrist_mask = cam_high_mask.copy()
cam_right_wrist = parse_img('wrist_right_image')
cam_right_wrist_mask = cam_high_mask.copy()
# print(cam_left_wrist is not None, cam_right_wrist is not None, cam_high is not None)
# Return the resulting sample
# For unavailable images, return zero-shape arrays, i.e., (IMG_HISORY_SIZE, 0, 0, 0)
# E.g., return np.zeros((self.IMG_HISORY_SIZE, 0, 0, 0)) for the key "cam_left_wrist",
# if the left-wrist camera is unavailable on your robot
return True, {
"meta": meta,
"state": state,
"state_std": state_std,
"state_mean": state_mean,
"state_norm": state_norm,
"actions": actions,
"state_indicator": state_indicator,
"cam_high": cam_high,
"cam_high_mask": cam_high_mask,
"cam_left_wrist": cam_left_wrist,
"cam_left_wrist_mask": cam_left_wrist_mask,
"cam_right_wrist": cam_right_wrist,
"cam_right_wrist_mask": cam_right_wrist_mask
}
def parse_hdf5_file_state_only(self, file_path):
"""[Modify] Parse a hdf5 file to generate a state trajectory.
Args:
file_path (str): the path to the hdf5 file
Returns:
valid (bool): whether the episode is valid, which is useful for filtering.
If False, this episode will be dropped.
dict: a dictionary containing the training sample,
{
"state": ndarray, # state[:], (T, STATE_DIM).
"action": ndarray, # action[:], (T, STATE_DIM).
} or None if the episode is invalid.
"""
with h5py.File(file_path, 'r') as f:
states = f['observation']['eef_pose'][:]
actions = f['action']['eef_pose'][:]
num_steps = states.shape[0]
step_id = np.random.randint(0, num_steps)
# Rescale gripper to [0, 1]
states = states / np.array(
[[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]]
)
actions = actions[step_id:step_id+self.CHUNK_SIZE] / np.array(
[[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]]
)
# Fill the state/action into the unified vector
def fill_in_state(values):
uni_vec = np.zeros(values.shape[:-1] + (self.STATE_DIM,))
uni_vec[..., TABLETOP_6D_INDICES] = values
return uni_vec
state = fill_in_state(states)
action = fill_in_state(actions)
# Return the resulting sample
return True, {
"state": state,
"action": action
}
if __name__ == "__main__":
ds = TabletopHDF5VLADataset()
for i in range(len(ds)):
print(f"Processing episode {i}/{len(ds)}...")
ds.get_item(i)