Spaces:
Sleeping
Sleeping
File size: 12,195 Bytes
2d9a728 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 |
"""Environments using kitchen and Franka robot."""
import logging
import sys
from pathlib import Path
sys.path.append((Path(__file__).parent.parent / 'third_party' / 'relay-policy-learning' / 'adept_envs').__str__())
import adept_envs
from adept_envs.franka.kitchen_multitask_v0 import KitchenTaskRelaxV1
import os
import numpy as np
from dm_control.mujoco import engine
OBS_ELEMENT_INDICES = {
"bottom burner": np.array([11, 12]),
"top burner": np.array([15, 16]),
"light switch": np.array([17, 18]),
"slide cabinet": np.array([19]),
"hinge cabinet": np.array([20, 21]),
"microwave": np.array([22]),
"kettle": np.array([23, 24, 25, 26, 27, 28, 29]),
}
OBS_ELEMENT_GOALS = {
"bottom burner": np.array([-0.88, -0.01]),
"top burner": np.array([-0.92, -0.01]),
"light switch": np.array([-0.69, -0.05]),
"slide cabinet": np.array([0.37]),
"hinge cabinet": np.array([0.0, 1.45]),
"microwave": np.array([-0.75]),
"kettle": np.array([-0.23, 0.75, 1.62, 0.99, 0.0, 0.0, -0.06]),
}
BONUS_THRESH = 0.3
logging.basicConfig(
level="INFO",
format="%(asctime)s [%(levelname)s] %(message)s",
filemode="w",
)
logger = logging.getLogger()
XPOS_NAMES = {
"light switch" : "lightswitchroot",
"slide cabinet" : "slidelink",
"microwave" : "microdoorroot",
"kettle" : "kettle",
}
class KitchenBase(KitchenTaskRelaxV1):
# A string of element names. The robot's task is then to modify each of
# these elements appropriately.
TASK_ELEMENTS = []
ALL_TASKS = [
"bottom burner",
"top burner",
"light switch",
"slide cabinet",
"hinge cabinet",
"microwave",
"kettle",
]
REMOVE_TASKS_WHEN_COMPLETE = True
TERMINATE_ON_TASK_COMPLETE = True
TERMINATE_ON_WRONG_COMPLETE = False
COMPLETE_IN_ANY_ORDER = (
True # This allows for the tasks to be completed in arbitrary order.
)
GRIPPER_DISTANCE_REW = False
def __init__(
self, dense=True, dataset_url=None, ref_max_score=None, ref_min_score=None, **kwargs
):
self.tasks_to_complete = list(self.TASK_ELEMENTS)
self.goal_masking = True
self.dense = dense
self.use_grasp_rewards = False
super(KitchenBase, self).__init__(**kwargs)
def set_goal_masking(self, goal_masking=True):
"""Sets goal masking for goal-conditioned approaches (like RPL)."""
self.goal_masking = goal_masking
def _get_task_goal(self, task=None, actually_return_goal=False):
if task is None:
task = ["microwave", "kettle", "bottom burner", "light switch"]
new_goal = np.zeros_like(self.goal)
if self.goal_masking and not actually_return_goal:
return new_goal
for element in task:
element_idx = OBS_ELEMENT_INDICES[element]
element_goal = OBS_ELEMENT_GOALS[element]
new_goal[element_idx] = element_goal
return new_goal
def reset_model(self):
self.tasks_to_complete = list(self.TASK_ELEMENTS)
return super(KitchenBase, self).reset_model()
def _get_reward_n_score(self, obs_dict):
reward_dict, score = super(KitchenBase, self)._get_reward_n_score(obs_dict)
next_q_obs = obs_dict["qp"]
next_obj_obs = obs_dict["obj_qp"]
idx_offset = len(next_q_obs)
completions = []
dense = 0
if self.GRIPPER_DISTANCE_REW:
assert len(self.tasks_to_complete) == 1
element = next(iter(self.tasks_to_complete))
gripper_pos = (self.sim.named.data.xpos['panda0_leftfinger'] + self.sim.named.data.xpos['panda0_rightfinger']) / 2
object_pos = self.sim.named.data.xpos[XPOS_NAMES[element]]
gripper_obj_dist = np.linalg.norm(object_pos - gripper_pos)
if self.dense:
reward_dict["bonus"] = -gripper_obj_dist
reward_dict["r_total"] = -gripper_obj_dist
score = -gripper_obj_dist
else:
reward_dict["bonus"] = gripper_obj_dist < 0.15
reward_dict["r_total"] = gripper_obj_dist < 0.15
score = gripper_obj_dist < 0.15
return reward_dict, score
for element in self.tasks_to_complete:
element_idx = OBS_ELEMENT_INDICES[element]
distance = np.linalg.norm(
next_obj_obs[..., element_idx - idx_offset] - OBS_ELEMENT_GOALS[element]
)
dense += -1 * distance # reward must be negative distance for RL
is_grasped = True
if not self.initializing and self.use_grasp_rewards:
if element == "slide cabinet":
is_grasped = False
for i in range(1, 6):
obj_pos = self.get_site_xpos("schandle{}".format(i))
left_pad = self.get_site_xpos("leftpad")
right_pad = self.get_site_xpos("rightpad")
within_sphere_left = np.linalg.norm(obj_pos - left_pad) < 0.07
within_sphere_right = np.linalg.norm(obj_pos - right_pad) < 0.07
right = right_pad[0] < obj_pos[0]
left = obj_pos[0] < left_pad[0]
if (
right
and left
and within_sphere_right
and within_sphere_left
):
is_grasped = True
if element == "top left burner":
is_grasped = False
obj_pos = self.get_site_xpos("tlbhandle")
left_pad = self.get_site_xpos("leftpad")
right_pad = self.get_site_xpos("rightpad")
within_sphere_left = np.linalg.norm(obj_pos - left_pad) < 0.035
within_sphere_right = np.linalg.norm(obj_pos - right_pad) < 0.04
right = right_pad[0] < obj_pos[0]
left = obj_pos[0] < left_pad[0]
if within_sphere_right and within_sphere_left and right and left:
is_grasped = True
if element == "microwave":
is_grasped = False
for i in range(1, 6):
obj_pos = self.get_site_xpos("mchandle{}".format(i))
left_pad = self.get_site_xpos("leftpad")
right_pad = self.get_site_xpos("rightpad")
within_sphere_left = np.linalg.norm(obj_pos - left_pad) < 0.05
within_sphere_right = np.linalg.norm(obj_pos - right_pad) < 0.05
if (
right_pad[0] < obj_pos[0]
and obj_pos[0] < left_pad[0]
and within_sphere_right
and within_sphere_left
):
is_grasped = True
if element == "hinge cabinet":
is_grasped = False
for i in range(1, 6):
obj_pos = self.get_site_xpos("hchandle{}".format(i))
left_pad = self.get_site_xpos("leftpad")
right_pad = self.get_site_xpos("rightpad")
within_sphere_left = np.linalg.norm(obj_pos - left_pad) < 0.06
within_sphere_right = np.linalg.norm(obj_pos - right_pad) < 0.06
if (
right_pad[0] < obj_pos[0]
and obj_pos[0] < left_pad[0]
and within_sphere_right
):
is_grasped = True
if element == "light switch":
is_grasped = False
for i in range(1, 4):
obj_pos = self.get_site_xpos("lshandle{}".format(i))
left_pad = self.get_site_xpos("leftpad")
right_pad = self.get_site_xpos("rightpad")
within_sphere_left = np.linalg.norm(obj_pos - left_pad) < 0.045
within_sphere_right = np.linalg.norm(obj_pos - right_pad) < 0.03
if within_sphere_right and within_sphere_left:
is_grasped = True
complete = distance < BONUS_THRESH # and is_grasped
if complete:
completions.append(element)
if self.REMOVE_TASKS_WHEN_COMPLETE:
[self.tasks_to_complete.remove(element) for element in completions]
bonus = float(len(completions))
reward_dict["bonus"] = bonus
reward_dict["r_total"] = bonus
if self.dense:
reward_dict["r_total"] = dense
score = bonus
return reward_dict, score
def step(self, a, b=None):
obs, reward, done, env_info = super(KitchenBase, self).step(a, b=b)
if self.TERMINATE_ON_TASK_COMPLETE:
done = not self.tasks_to_complete
if self.TERMINATE_ON_WRONG_COMPLETE:
all_goal = self._get_task_goal(task=self.ALL_TASKS)
for wrong_task in list(set(self.ALL_TASKS) - set(self.TASK_ELEMENTS)):
element_idx = OBS_ELEMENT_INDICES[wrong_task]
distance = np.linalg.norm(obs[..., element_idx] - all_goal[element_idx])
complete = distance < BONUS_THRESH
if complete:
done = True
break
env_info["completed_tasks"] = set(self.TASK_ELEMENTS) - set(
self.tasks_to_complete
)
return obs, reward, done, env_info
def get_goal(self):
"""Loads goal state from dataset for goal-conditioned approaches (like RPL)."""
raise NotImplementedError
def _split_data_into_seqs(self, data):
"""Splits dataset object into list of sequence dicts."""
seq_end_idxs = np.where(data["terminals"])[0]
start = 0
seqs = []
for end_idx in seq_end_idxs:
seqs.append(
dict(
states=data["observations"][start : end_idx + 1],
actions=data["actions"][start : end_idx + 1],
)
)
start = end_idx + 1
return seqs
def render(self, mode='rgb_array', resolution=(64,64)):
if mode =='rgb_array':
camera = engine.MovableCamera(self.sim, *resolution)
camera.set_pose(distance=2.2, lookat=[-0.2, .5, 2.], azimuth=70, elevation=-35)
img = camera.render()
return img
else:
super(KitchenTaskRelaxV1, self).render()
class KitchenSlideV0(KitchenBase):
TASK_ELEMENTS = ["slide cabinet",]
COMPLETE_IN_ANY_ORDER = False
class KitchenHingeV0(KitchenBase):
TASK_ELEMENTS = ["hinge cabinet",]
COMPLETE_IN_ANY_ORDER = False
class KitchenLightV0(KitchenBase):
TASK_ELEMENTS = ["light switch",]
COMPLETE_IN_ANY_ORDER = False
class KitchenKettleV0(KitchenBase):
TASK_ELEMENTS = ["kettle",]
COMPLETE_IN_ANY_ORDER = False
class KitchenMicrowaveV0(KitchenBase):
TASK_ELEMENTS = ["microwave",]
COMPLETE_IN_ANY_ORDER = False
class KitchenBurnerV0(KitchenBase):
TASK_ELEMENTS = ["bottom burner",]
COMPLETE_IN_ANY_ORDER = False
class KitchenTopBurnerV0(KitchenBase):
TASK_ELEMENTS = ["top burner",]
COMPLETE_IN_ANY_ORDER = False
class KitchenMicrowaveKettleBottomBurnerLightV0(KitchenBase):
TASK_ELEMENTS = ["microwave", "kettle", "bottom burner", "light switch"]
COMPLETE_IN_ANY_ORDER = False
class KitchenMicrowaveKettleLightSliderV0(KitchenBase):
TASK_ELEMENTS = ["microwave", "kettle", "light switch", "slide cabinet"]
COMPLETE_IN_ANY_ORDER = False
class KitchenKettleMicrowaveLightSliderV0(KitchenBase):
TASK_ELEMENTS = ["kettle", "microwave", "light switch", "slide cabinet"]
COMPLETE_IN_ANY_ORDER = False
class KitchenAllV0(KitchenBase):
TASK_ELEMENTS = KitchenBase.ALL_TASKS |