hma / sim /robomimic /robomimic_runner.py
LeroyWaa's picture
draft
246c106
# --------------------------------------------------------
# Licensed under The MIT License [see LICENSE for details]
# --------------------------------------------------------
import numpy as np
from robomimic.envs.env_robosuite import EnvRobosuite
import os
import IPython
import robomimic.utils.file_utils as FileUtils
import robomimic.utils.env_utils as EnvUtils
import robomimic.utils.obs_utils as ObsUtils
from matplotlib import pyplot as plt
import torch
import h5py
from tqdm import tqdm
from sim.robomimic.robomimic_wrapper import RobomimicLowdimWrapper
import cv2
import sys
from collections import OrderedDict
import traceback
OBS_KEYS = ["object", "robot0_eef_pos", "robot0_eef_quat", "robot0_gripper_qpos"]
RESOLUTION = (128, 128)
global_idx = 0
if sys.platform == "linux" and os.getenv("DISPLAY") is None:
try:
from pyvirtualdisplay import Display
import signal
def handler(signum, frame):
raise TimeoutError("Timeout while starting virtual display")
signal.signal(signal.SIGALRM, handler)
signal.alarm(5)
virtual_display = Display(visible=0, size=(1400, 900))
virtual_display.start()
signal.alarm(0)
except Exception as e:
print(f"When instantiate pyvirtualdisplay: {e}")
def render_step(env, state):
env.env.env.sim.set_state_from_flattened(state)
env.env.env.sim.forward()
img = env.render()
img = cv2.resize(img, RESOLUTION)
return img
def convert_img_dataset(
dataset_dir="data/robomimic/datasets", env_names=None, gui=False, episode_num_pertask=2000, **kwargs
):
# convert to a list of episodes that can be added to replay buffer
for env_name in env_names:
dataset_dir_task = os.path.join(dataset_dir, env_name, "ph", "image.hdf5")
env_meta = FileUtils.get_env_metadata_from_dataset(dataset_dir_task)
env_meta["use_image_obs"] = True
env = create_env(env_meta=env_meta, obs_keys=OBS_KEYS)
env = RobomimicLowdimWrapper(env=env)
with h5py.File(dataset_dir_task) as file:
demos = file["data"]
for i in tqdm(range(episode_num_pertask), desc="Loading hdf5 to ReplayBuffer"):
if f"demo_{i}" not in demos:
continue
demo = demos[f"demo_{i}"]
obs = demo["obs"]
states = demo["states"]
action = demo["actions"][:].astype(np.float32)
step_obs = np.concatenate([obs[key] for key in OBS_KEYS], axis=-1).astype(np.float32)
steps = []
for a, o, s in zip(action, step_obs, states):
# break into step dict
image = render_step(env, s)
step = {
"observation": {"state": o, "image": image},
"action": a,
"language_instruction": f"{env_name}",
}
steps.append(OrderedDict(step))
data_dict = {"steps": steps}
yield data_dict
def writer_for(tag, fps, res, src_folder="data/robomimic_policy_videos"):
if not os.path.exists(src_folder):
os.mkdir(src_folder)
return cv2.VideoWriter(
f"{src_folder}/{tag}.mp4",
cv2.VideoWriter_fourcc("m", "p", "4", "v"),
fps,
res,
)
def create_env(env_meta, obs_keys):
ObsUtils.initialize_obs_modality_mapping_from_dict({"image": obs_keys})
env = EnvUtils.create_env_from_metadata(
env_meta=env_meta,
render=False,
render_offscreen=True,
use_image_obs=True,
)
return env
@torch.no_grad()
def learner_trajectory_generator(env, policy, camera_name="view_1", env_name="lift"):
"""generate a trajectory rollout from the policy and a environment"""
o = env.reset()
# env.env.env.sim.set_state_from_flattened(state)
img = env.render()
prompt_image = np.tile(
img, (policy.prompt_horizon, 1, 1, 1)
).astype(np.uint8)
prompt_action = np.zeros(
(policy.prompt_horizon, policy.action_stride, 7) # robomimic dof=7
)
policy.set_initial_state((prompt_image, prompt_action))
policy.reset()
img = env.render()
img = cv2.resize(img, RESOLUTION).astype(np.uint8)
step_data = {"state": o, "image": img, "language_instruction": f"{env_name}"} # B x T
max_path_length = 100
action_dim = len(env.action_space.low)
i = 0
while i < max_path_length:
actions = policy.generate_action(step_data)
for a in actions:
i += 1
o, r, done, info = env.step(a)
img = env.render()
img = cv2.resize(img, RESOLUTION).astype(np.uint8)
ret = [o, r, done, info, img]
step_data = OrderedDict({"state": o, "image": img, "language_instruction": f"{env_name}"}) #
yield ret
@torch.no_grad()
def dp_learner_trajectory_generator(env, policy, max_length=100, camera_name="view_1", env_name="lift"):
"""generate a trajectory rollout from the policy and a environment"""
DP_RES = 84
# o = env.reset()
# # env.env.env.sim.set_state_from_flattened(state)
# img = env.render()
# prompt_image = np.tile(
# img, (policy.prompt_horizon, 1, 1, 1)
# ).astype(np.uint8)
# prompt_action = np.zeros(
# (policy.prompt_horizon, policy.action_stride, 7) # robomimic dof=7
# )
# policy.set_initial_state((prompt_image, prompt_action))
policy.reset()
o = env.reset()
# env.env.env.sim.set_state_from_flattened(state)
img = env.render()
img = cv2.resize(img, (DP_RES, DP_RES)).astype(np.uint8)
obs_dict = {"agentview_image": img.transpose(2, 0, 1)} # (C, H, W)
obs_buff = {
k: v[np.newaxis].repeat(policy.n_obs_steps, axis=0)
for k, v in obs_dict.items()
}
for _ in range(max_length):
traj = policy.generate_action({
k: torch.from_numpy(v).to(device=policy.device, dtype=policy.dtype).unsqueeze(0)
for k, v in obs_buff.items()
})['action'].squeeze(0).detach().cpu().numpy()
for a in traj:
o, r, done, info = env.step(a)
img = env.render()
img = cv2.resize(img, (DP_RES, DP_RES)).astype(np.uint8)
ret = [o, r, done, info, img]
obs_dict = {"agentview_image": img.transpose(2, 0, 1)}
obs_buff = {
k: np.roll(v, shift=-1, axis=0)
for k, v in obs_buff.items()
}
for k, v in obs_dict.items():
obs_buff[k][-1] = v
yield ret
class RolloutRunner:
"""evaluate policy rollouts"""
def __init__(self, env_names, episode_num=200, save_video=False):
self.env_names = env_names
self.save_video = save_video
self.episode_num = episode_num
self.envs = []
self.env_names = []
for env_name in env_names:
dataset_dir = "data/robomimic/datasets"
dataset_dir_task = os.path.join(dataset_dir, env_name, "ph", "image.hdf5")
env_meta = FileUtils.get_env_metadata_from_dataset(dataset_dir_task)
env = create_env(env_meta=env_meta, obs_keys=OBS_KEYS)
env = RobomimicLowdimWrapper(env=env) # should be kitchen all
self.envs.append(env)
self.env_names.append(env_name)
def step_render(self, curr_state, actions, freq=1):
"""
render a sequence of frames given current state and a sequence of actions
"""
images = []
prompt_steps = len(actions)
env = self.envs[0]
env.env.env.sim.set_state_from_flattened(curr_state)
env.env.env.sim.forward()
img = env.render()
img = cv2.resize(img, RESOLUTION)
# first image
images.append(img)
for action in actions:
# action expands based on the frequency
action = action.reshape(freq, -1)
for sub_action in action:
env.step(sub_action)
img = env.render()
img = cv2.resize(img, RESOLUTION)
images.append(img)
# visualize all list of these images
fig, ax = plt.subplots(1, prompt_steps + 1, figsize=(20, 20))
for i, img in enumerate(images):
ax[i].imshow(img)
ax[i].axis("off")
# plt.savefig("output/sim_video_mix/test.png")
return images
@torch.no_grad()
def run(
self, policy, save_video=False, gui=False, video_postfix="", seed=233, env_name=None, episode_num=-1, **kwargs
):
quit_on_success = True
if episode_num == -1:
episode_num = self.episode_num
for env_idx, env in enumerate(self.envs):
env.seed(seed)
curr_env_name = self.env_names[env_idx]
if env_name is not None:
if str(env_name[0]) != str(curr_env_name):
continue
print(f"selected env name: {env_name} currente env: {curr_env_name}")
if self.save_video:
writer = writer_for(
f"{self.env_names[env_idx]}_{video_postfix}",
10,
RESOLUTION,
src_folder="data/robomimic_policy_videos",
)
print(f"save to video file {self.env_names[env_idx]}_{video_postfix}")
total_success = 0
total_reward = 0
pbar = tqdm(range(episode_num), position=1, leave=True)
for i in pbar:
try:
eps_reward = 0
traj_length = 0
for o, r, done, info, img in learner_trajectory_generator(env, policy):
traj_length += 1
eps_reward += r
if self.save_video and i <= 10:
if gui:
cv2.imshow("img", img)
cv2.waitKey(1)
img = cv2.resize(img, RESOLUTION)
writer.write(img[..., [2, 1, 0]])
if done or eps_reward >= 1:
break
actual_reward = eps_reward >= 1
pbar.set_description(f"success: {actual_reward}")
total_success += actual_reward
total_reward += actual_reward
except Exception as e:
print(traceback.format_exc())
continue
return total_success / episode_num, total_reward / episode_num
if __name__ == "__main__":
# generate for all tasks
runner = RolloutRunner(["all"], 200)