|
import os |
|
import fnmatch |
|
import json |
|
|
|
import h5py |
|
import yaml |
|
import cv2 |
|
import numpy as np |
|
|
|
from configs.state_vec import STATE_VEC_IDX_MAPPING |
|
TABLETOP_6D_INDICES_NAMES = [ |
|
'left_eef_pos_x','left_eef_pos_y','left_eef_pos_z','left_eef_angle_0','left_eef_angle_1','left_eef_angle_2','left_eef_angle_3','left_eef_angle_4','left_eef_angle_5','left_gripper_open','right_eef_pos_x','right_eef_pos_y','right_eef_pos_z','right_eef_angle_0','right_eef_angle_1','right_eef_angle_2','right_eef_angle_3','right_eef_angle_4','right_eef_angle_5','right_gripper_open'] |
|
TABLETOP_6D_INDICES = [STATE_VEC_IDX_MAPPING[n] for n in TABLETOP_6D_INDICES_NAMES] |
|
|
|
class TabletopHDF5VLADataset: |
|
""" |
|
This class is used to sample episodes from the embododiment dataset |
|
stored in HDF5. |
|
""" |
|
def __init__(self, task_name) -> None: |
|
|
|
|
|
dataset_name = task_name |
|
HDF5_DIR = f"/data5/jellyho/tabletop/{dataset_name}/" |
|
self.DATASET_NAME = dataset_name |
|
|
|
self.file_paths = [] |
|
for root, _, files in os.walk(HDF5_DIR): |
|
for filename in fnmatch.filter(files, '*.hdf5'): |
|
file_path = os.path.join(root, filename) |
|
self.file_paths.append(file_path) |
|
|
|
|
|
with open('configs/base.yaml', 'r') as file: |
|
config = yaml.safe_load(file) |
|
self.CHUNK_SIZE = config['common']['action_chunk_size'] |
|
self.IMG_HISORY_SIZE = config['common']['img_history_size'] |
|
self.STATE_DIM = config['common']['state_dim'] |
|
|
|
|
|
episode_lens = [] |
|
for file_path in self.file_paths: |
|
valid, res = self.parse_hdf5_file_state_only(file_path) |
|
_len = res['state'].shape[0] if valid else 0 |
|
episode_lens.append(_len) |
|
self.episode_sample_weights = np.array(episode_lens) / np.sum(episode_lens) |
|
|
|
def __len__(self): |
|
return len(self.file_paths) |
|
|
|
def get_dataset_name(self): |
|
return self.DATASET_NAME |
|
|
|
def get_item(self, index: int=None, state_only=False): |
|
"""Get a training sample at a random timestep. |
|
|
|
Args: |
|
index (int, optional): the index of the episode. |
|
If not provided, a random episode will be selected. |
|
state_only (bool, optional): Whether to return only the state. |
|
In this way, the sample will contain a complete trajectory rather |
|
than a single timestep. Defaults to False. |
|
|
|
Returns: |
|
sample (dict): a dictionary containing the training sample. |
|
""" |
|
while True: |
|
if index is None: |
|
file_path = np.random.choice(self.file_paths, p=self.episode_sample_weights) |
|
else: |
|
file_path = self.file_paths[index] |
|
valid, sample = self.parse_hdf5_file(file_path) \ |
|
if not state_only else self.parse_hdf5_file_state_only(file_path) |
|
if valid: |
|
return sample |
|
else: |
|
index = np.random.randint(0, len(self.file_paths)) |
|
|
|
def parse_hdf5_file(self, file_path): |
|
"""[Modify] Parse a hdf5 file to generate a training sample at |
|
a random timestep. |
|
|
|
Args: |
|
file_path (str): the path to the hdf5 file |
|
|
|
Returns: |
|
valid (bool): whether the episode is valid, which is useful for filtering. |
|
If False, this episode will be dropped. |
|
dict: a dictionary containing the training sample, |
|
{ |
|
"meta": { |
|
"dataset_name": str, # the name of your dataset. |
|
"#steps": int, # the number of steps in the episode, |
|
# also the total timesteps. |
|
"instruction": str # the language instruction for this episode. |
|
}, |
|
"step_id": int, # the index of the sampled step, |
|
# also the timestep t. |
|
"state": ndarray, # state[t], (1, STATE_DIM). |
|
"state_std": ndarray, # std(state[:]), (STATE_DIM,). |
|
"state_mean": ndarray, # mean(state[:]), (STATE_DIM,). |
|
"state_norm": ndarray, # norm(state[:]), (STATE_DIM,). |
|
"actions": ndarray, # action[t:t+CHUNK_SIZE], (CHUNK_SIZE, STATE_DIM). |
|
"state_indicator", ndarray, # indicates the validness of each dim, (STATE_DIM,). |
|
"cam_high": ndarray, # external camera image, (IMG_HISORY_SIZE, H, W, 3) |
|
# or (IMG_HISORY_SIZE, 0, 0, 0) if unavailable. |
|
"cam_high_mask": ndarray, # indicates the validness of each timestep, (IMG_HISORY_SIZE,) boolean array. |
|
# For the first IMAGE_HISTORY_SIZE-1 timesteps, the mask should be False. |
|
"cam_left_wrist": ndarray, # left wrist camera image, (IMG_HISORY_SIZE, H, W, 3). |
|
# or (IMG_HISORY_SIZE, 0, 0, 0) if unavailable. |
|
"cam_left_wrist_mask": ndarray, |
|
"cam_right_wrist": ndarray, # right wrist camera image, (IMG_HISORY_SIZE, H, W, 3). |
|
# or (IMG_HISORY_SIZE, 0, 0, 0) if unavailable. |
|
# If only one wrist, make it right wrist, plz. |
|
"cam_right_wrist_mask": ndarray |
|
} or None if the episode is invalid. |
|
""" |
|
with h5py.File(file_path, 'r') as f: |
|
states = f['observations']['states']['ee_6d_pos'][:] |
|
actions = f['actions']['ee_6d_pos'][:] |
|
num_steps = states.shape[0] |
|
|
|
if num_steps < 20: |
|
return False, None |
|
|
|
|
|
step_id = np.random.randint(0, num_steps) |
|
|
|
|
|
if self.DATASET_NAME == 'aloha_box_into_pot_easy': |
|
instruction = f['observations']['states']['language_instruction'][0].decode('utf-8') |
|
else: |
|
instruction = f"lang_embed/{self.DATASET_NAME}.pt" |
|
|
|
|
|
meta = { |
|
"dataset_name": self.DATASET_NAME, |
|
"#steps": num_steps, |
|
"step_id": step_id, |
|
"instruction": instruction |
|
} |
|
|
|
|
|
states = states / np.array( |
|
[[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]] |
|
) |
|
actions = actions[step_id:step_id+self.CHUNK_SIZE] / np.array( |
|
[[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]] |
|
) |
|
|
|
|
|
state = states[step_id:step_id+1] |
|
state_std = np.std(states, axis=0) |
|
state_mean = np.mean(states, axis=0) |
|
state_norm = np.sqrt(np.mean(states**2, axis=0)) |
|
|
|
if actions.shape[0] < self.CHUNK_SIZE: |
|
|
|
actions = np.concatenate([ |
|
actions, |
|
np.tile(actions[-1:], (self.CHUNK_SIZE-actions.shape[0], 1)) |
|
], axis=0) |
|
|
|
|
|
def fill_in_state(values): |
|
uni_vec = np.zeros(values.shape[:-1] + (self.STATE_DIM,)) |
|
uni_vec[..., TABLETOP_6D_INDICES] = values |
|
return uni_vec |
|
state = fill_in_state(state) |
|
state_indicator = fill_in_state(np.ones_like(state_std)) |
|
state_std = fill_in_state(state_std) |
|
state_mean = fill_in_state(state_mean) |
|
state_norm = fill_in_state(state_norm) |
|
|
|
|
|
actions = fill_in_state(actions) |
|
|
|
|
|
def parse_img(key): |
|
imgs = [] |
|
for i in range(max(step_id-self.IMG_HISORY_SIZE+1, 0), step_id+1): |
|
img = f['observations']['images'][key][i] |
|
|
|
imgs.append(img) |
|
|
|
imgs = np.stack(imgs) |
|
if imgs.shape[0] < self.IMG_HISORY_SIZE: |
|
|
|
imgs = np.concatenate([ |
|
np.tile(imgs[:1], (self.IMG_HISORY_SIZE-imgs.shape[0], 1, 1, 1)), |
|
imgs |
|
], axis=0) |
|
return imgs |
|
|
|
cam_high = parse_img('back') |
|
|
|
valid_len = min(step_id + 1, self.IMG_HISORY_SIZE) |
|
cam_high_mask = np.array( |
|
[False] * (self.IMG_HISORY_SIZE - valid_len) + [True] * valid_len |
|
) |
|
cam_left_wrist = parse_img('wrist_left') |
|
cam_left_wrist_mask = cam_high_mask.copy() |
|
cam_right_wrist = parse_img('wrist_right') |
|
cam_right_wrist_mask = cam_high_mask.copy() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return True, { |
|
"meta": meta, |
|
"state": state, |
|
"state_std": state_std, |
|
"state_mean": state_mean, |
|
"state_norm": state_norm, |
|
"actions": actions, |
|
"state_indicator": state_indicator, |
|
"cam_high": cam_high, |
|
"cam_high_mask": cam_high_mask, |
|
"cam_left_wrist": cam_left_wrist, |
|
"cam_left_wrist_mask": cam_left_wrist_mask, |
|
"cam_right_wrist": cam_right_wrist, |
|
"cam_right_wrist_mask": cam_right_wrist_mask |
|
} |
|
|
|
def parse_hdf5_file_state_only(self, file_path): |
|
"""[Modify] Parse a hdf5 file to generate a state trajectory. |
|
|
|
Args: |
|
file_path (str): the path to the hdf5 file |
|
|
|
Returns: |
|
valid (bool): whether the episode is valid, which is useful for filtering. |
|
If False, this episode will be dropped. |
|
dict: a dictionary containing the training sample, |
|
{ |
|
"state": ndarray, # state[:], (T, STATE_DIM). |
|
"action": ndarray, # action[:], (T, STATE_DIM). |
|
} or None if the episode is invalid. |
|
""" |
|
with h5py.File(file_path, 'r') as f: |
|
states = f['observations']['states']['ee_6d_pos'][:] |
|
actions = f['actions']['ee_6d_pos'][:] |
|
num_steps = states.shape[0] |
|
|
|
step_id = np.random.randint(0, num_steps) |
|
|
|
|
|
states = states / np.array( |
|
[[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]] |
|
) |
|
actions = actions[step_id:step_id+self.CHUNK_SIZE] / np.array( |
|
[[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]] |
|
) |
|
|
|
|
|
def fill_in_state(values): |
|
uni_vec = np.zeros(values.shape[:-1] + (self.STATE_DIM,)) |
|
uni_vec[..., TABLETOP_6D_INDICES] = values |
|
return uni_vec |
|
state = fill_in_state(states) |
|
action = fill_in_state(actions) |
|
|
|
|
|
return True, { |
|
"state": state, |
|
"action": action |
|
} |
|
|
|
class AnubisHDF5VLADataset: |
|
""" |
|
This class is used to sample episodes from the embododiment dataset |
|
stored in HDF5. |
|
""" |
|
def __init__(self, task_name) -> None: |
|
|
|
|
|
dataset_name = task_name |
|
HDF5_DIR = f"/data5/jellyho/anubis_hdf5/{dataset_name}/" |
|
self.DATASET_NAME = dataset_name |
|
|
|
self.file_paths = [] |
|
for root, _, files in os.walk(HDF5_DIR): |
|
for filename in fnmatch.filter(files, '*.hdf5'): |
|
file_path = os.path.join(root, filename) |
|
self.file_paths.append(file_path) |
|
|
|
|
|
with open('configs/base.yaml', 'r') as file: |
|
config = yaml.safe_load(file) |
|
self.CHUNK_SIZE = config['common']['action_chunk_size'] |
|
self.IMG_HISORY_SIZE = config['common']['img_history_size'] |
|
self.STATE_DIM = config['common']['state_dim'] |
|
|
|
|
|
episode_lens = [] |
|
for file_path in self.file_paths: |
|
valid, res = self.parse_hdf5_file_state_only(file_path) |
|
_len = res['state'].shape[0] if valid else 0 |
|
episode_lens.append(_len) |
|
self.episode_sample_weights = np.array(episode_lens) / np.sum(episode_lens) |
|
|
|
def __len__(self): |
|
return len(self.file_paths) |
|
|
|
def get_dataset_name(self): |
|
return self.DATASET_NAME |
|
|
|
def get_item(self, index: int=None, state_only=False): |
|
"""Get a training sample at a random timestep. |
|
|
|
Args: |
|
index (int, optional): the index of the episode. |
|
If not provided, a random episode will be selected. |
|
state_only (bool, optional): Whether to return only the state. |
|
In this way, the sample will contain a complete trajectory rather |
|
than a single timestep. Defaults to False. |
|
|
|
Returns: |
|
sample (dict): a dictionary containing the training sample. |
|
""" |
|
while True: |
|
if index is None: |
|
file_path = np.random.choice(self.file_paths, p=self.episode_sample_weights) |
|
else: |
|
file_path = self.file_paths[index] |
|
valid, sample = self.parse_hdf5_file(file_path) \ |
|
if not state_only else self.parse_hdf5_file_state_only(file_path) |
|
if valid: |
|
return sample |
|
else: |
|
index = np.random.randint(0, len(self.file_paths)) |
|
|
|
def parse_hdf5_file(self, file_path): |
|
"""[Modify] Parse a hdf5 file to generate a training sample at |
|
a random timestep. |
|
|
|
Args: |
|
file_path (str): the path to the hdf5 file |
|
|
|
Returns: |
|
valid (bool): whether the episode is valid, which is useful for filtering. |
|
If False, this episode will be dropped. |
|
dict: a dictionary containing the training sample, |
|
{ |
|
"meta": { |
|
"dataset_name": str, # the name of your dataset. |
|
"#steps": int, # the number of steps in the episode, |
|
# also the total timesteps. |
|
"instruction": str # the language instruction for this episode. |
|
}, |
|
"step_id": int, # the index of the sampled step, |
|
# also the timestep t. |
|
"state": ndarray, # state[t], (1, STATE_DIM). |
|
"state_std": ndarray, # std(state[:]), (STATE_DIM,). |
|
"state_mean": ndarray, # mean(state[:]), (STATE_DIM,). |
|
"state_norm": ndarray, # norm(state[:]), (STATE_DIM,). |
|
"actions": ndarray, # action[t:t+CHUNK_SIZE], (CHUNK_SIZE, STATE_DIM). |
|
"state_indicator", ndarray, # indicates the validness of each dim, (STATE_DIM,). |
|
"cam_high": ndarray, # external camera image, (IMG_HISORY_SIZE, H, W, 3) |
|
# or (IMG_HISORY_SIZE, 0, 0, 0) if unavailable. |
|
"cam_high_mask": ndarray, # indicates the validness of each timestep, (IMG_HISORY_SIZE,) boolean array. |
|
# For the first IMAGE_HISTORY_SIZE-1 timesteps, the mask should be False. |
|
"cam_left_wrist": ndarray, # left wrist camera image, (IMG_HISORY_SIZE, H, W, 3). |
|
# or (IMG_HISORY_SIZE, 0, 0, 0) if unavailable. |
|
"cam_left_wrist_mask": ndarray, |
|
"cam_right_wrist": ndarray, # right wrist camera image, (IMG_HISORY_SIZE, H, W, 3). |
|
# or (IMG_HISORY_SIZE, 0, 0, 0) if unavailable. |
|
# If only one wrist, make it right wrist, plz. |
|
"cam_right_wrist_mask": ndarray |
|
} or None if the episode is invalid. |
|
""" |
|
with h5py.File(file_path, 'r') as f: |
|
states = f['observation']['eef_pose'][:] |
|
actions = f['action']['eef_pose'][:] |
|
num_steps = states.shape[0] |
|
|
|
if num_steps < 20: |
|
return False, None |
|
|
|
|
|
step_id = np.random.randint(0, num_steps) |
|
|
|
|
|
if self.DATASET_NAME == 'aloha_box_into_pot_easy': |
|
instruction = f['observations']['states']['language_instruction'][0].decode('utf-8') |
|
else: |
|
instruction = f"lang_embed/{self.DATASET_NAME}.pt" |
|
|
|
|
|
meta = { |
|
"dataset_name": self.DATASET_NAME, |
|
"#steps": num_steps, |
|
"step_id": step_id, |
|
"instruction": instruction |
|
} |
|
|
|
|
|
states = states / np.array( |
|
[[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]] |
|
) |
|
actions = actions[step_id:step_id+self.CHUNK_SIZE] / np.array( |
|
[[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]] |
|
) |
|
|
|
|
|
state = states[step_id:step_id+1] |
|
state_std = np.std(states, axis=0) |
|
state_mean = np.mean(states, axis=0) |
|
state_norm = np.sqrt(np.mean(states**2, axis=0)) |
|
|
|
if actions.shape[0] < self.CHUNK_SIZE: |
|
|
|
actions = np.concatenate([ |
|
actions, |
|
np.tile(actions[-1:], (self.CHUNK_SIZE-actions.shape[0], 1)) |
|
], axis=0) |
|
|
|
|
|
def fill_in_state(values): |
|
uni_vec = np.zeros(values.shape[:-1] + (self.STATE_DIM,)) |
|
uni_vec[..., TABLETOP_6D_INDICES] = values |
|
return uni_vec |
|
state = fill_in_state(state) |
|
state_indicator = fill_in_state(np.ones_like(state_std)) |
|
state_std = fill_in_state(state_std) |
|
state_mean = fill_in_state(state_mean) |
|
state_norm = fill_in_state(state_norm) |
|
|
|
|
|
actions = fill_in_state(actions) |
|
|
|
|
|
def parse_img(key): |
|
imgs = [] |
|
for i in range(max(step_id-self.IMG_HISORY_SIZE+1, 0), step_id+1): |
|
img = f['observation'][key][i] |
|
|
|
imgs.append(img) |
|
|
|
imgs = np.stack(imgs) |
|
if imgs.shape[0] < self.IMG_HISORY_SIZE: |
|
|
|
imgs = np.concatenate([ |
|
np.tile(imgs[:1], (self.IMG_HISORY_SIZE-imgs.shape[0], 1, 1, 1)), |
|
imgs |
|
], axis=0) |
|
return imgs |
|
|
|
cam_high = parse_img('agentview_image') |
|
|
|
valid_len = min(step_id + 1, self.IMG_HISORY_SIZE) |
|
cam_high_mask = np.array( |
|
[False] * (self.IMG_HISORY_SIZE - valid_len) + [True] * valid_len |
|
) |
|
cam_left_wrist = parse_img('wrist_left_image') |
|
cam_left_wrist_mask = cam_high_mask.copy() |
|
cam_right_wrist = parse_img('wrist_right_image') |
|
cam_right_wrist_mask = cam_high_mask.copy() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return True, { |
|
"meta": meta, |
|
"state": state, |
|
"state_std": state_std, |
|
"state_mean": state_mean, |
|
"state_norm": state_norm, |
|
"actions": actions, |
|
"state_indicator": state_indicator, |
|
"cam_high": cam_high, |
|
"cam_high_mask": cam_high_mask, |
|
"cam_left_wrist": cam_left_wrist, |
|
"cam_left_wrist_mask": cam_left_wrist_mask, |
|
"cam_right_wrist": cam_right_wrist, |
|
"cam_right_wrist_mask": cam_right_wrist_mask |
|
} |
|
|
|
def parse_hdf5_file_state_only(self, file_path): |
|
"""[Modify] Parse a hdf5 file to generate a state trajectory. |
|
|
|
Args: |
|
file_path (str): the path to the hdf5 file |
|
|
|
Returns: |
|
valid (bool): whether the episode is valid, which is useful for filtering. |
|
If False, this episode will be dropped. |
|
dict: a dictionary containing the training sample, |
|
{ |
|
"state": ndarray, # state[:], (T, STATE_DIM). |
|
"action": ndarray, # action[:], (T, STATE_DIM). |
|
} or None if the episode is invalid. |
|
""" |
|
with h5py.File(file_path, 'r') as f: |
|
states = f['observation']['eef_pose'][:] |
|
actions = f['action']['eef_pose'][:] |
|
num_steps = states.shape[0] |
|
|
|
step_id = np.random.randint(0, num_steps) |
|
|
|
|
|
states = states / np.array( |
|
[[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]] |
|
) |
|
actions = actions[step_id:step_id+self.CHUNK_SIZE] / np.array( |
|
[[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]] |
|
) |
|
|
|
|
|
def fill_in_state(values): |
|
uni_vec = np.zeros(values.shape[:-1] + (self.STATE_DIM,)) |
|
uni_vec[..., TABLETOP_6D_INDICES] = values |
|
return uni_vec |
|
state = fill_in_state(states) |
|
action = fill_in_state(actions) |
|
|
|
|
|
return True, { |
|
"state": state, |
|
"action": action |
|
} |
|
|
|
if __name__ == "__main__": |
|
ds = TabletopHDF5VLADataset() |
|
for i in range(len(ds)): |
|
print(f"Processing episode {i}/{len(ds)}...") |
|
ds.get_item(i) |
|
|