Spaces:
Sleeping
Sleeping
import pybullet | |
from pybullet_utils.bullet_client import BulletClient | |
import pybullet_data | |
import threading | |
from time import sleep | |
import numpy as np | |
import os | |
from consts import BOUNDS, COLORS, PIXEL_SIZE, CORNER_POS | |
from shapely.geometry import box | |
import random | |
# Gripper (Robotiq 2F85) code | |
class Robotiq2F85: | |
"""Gripper handling for Robotiq 2F85.""" | |
def __init__(self, robot, tool, p): | |
self.robot = robot | |
self.tool = tool | |
self._p = p | |
pos = [0.1339999999999999, -0.49199999999872496, 0.5] | |
rot = self._p.getQuaternionFromEuler([np.pi, 0, np.pi]) | |
urdf = 'robotiq_2f_85/robotiq_2f_85.urdf' | |
self.body = self._p.loadURDF(urdf, pos, rot) | |
self.n_joints = self._p.getNumJoints(self.body) | |
self.activated = False | |
# Connect gripper base to robot tool. | |
self._p.createConstraint(self.robot, tool, self.body, 0, jointType=self._p.JOINT_FIXED, jointAxis=[0, 0, 0], parentFramePosition=[0, 0, 0], childFramePosition=[0, 0, -0.07], childFrameOrientation=self._p.getQuaternionFromEuler([0, 0, np.pi / 2])) | |
# Set friction coefficients for gripper fingers. | |
for i in range(self._p.getNumJoints(self.body)): | |
self._p.changeDynamics(self.body, i, lateralFriction=10.0, spinningFriction=1.0, rollingFriction=1.0, frictionAnchor=True) | |
# Start thread to handle additional gripper constraints. | |
self.motor_joint = 1 | |
self.constraints_thread = threading.Thread(target=self.step) | |
self.constraints_thread.daemon = True | |
self.constraints_thread.start() | |
# Control joint positions by enforcing hard contraints on gripper behavior. | |
# Set one joint as the open/close motor joint (other joints should mimic). | |
def step(self): | |
while True: | |
try: | |
currj = [self._p.getJointState(self.body, i)[0] for i in range(self.n_joints)] | |
indj = [6, 3, 8, 5, 10] | |
targj = [currj[1], -currj[1], -currj[1], currj[1], currj[1]] | |
self._p.setJointMotorControlArray(self.body, indj, self._p.POSITION_CONTROL, targj, positionGains=np.ones(5)) | |
except: | |
return | |
sleep(0.001) | |
# Close gripper fingers. | |
def activate(self): | |
self._p.setJointMotorControl2(self.body, self.motor_joint, self._p.VELOCITY_CONTROL, targetVelocity=1, force=10) | |
self.activated = True | |
# Open gripper fingers. | |
def release(self): | |
self._p.setJointMotorControl2(self.body, self.motor_joint, self._p.VELOCITY_CONTROL, targetVelocity=-1, force=10) | |
self.activated = False | |
# If activated and object in gripper: check object contact. | |
# If activated and nothing in gripper: check gripper contact. | |
# If released: check proximity to surface (disabled). | |
def detect_contact(self): | |
obj, _, ray_frac = self.check_proximity() | |
if self.activated: | |
empty = self.grasp_width() < 0.01 | |
cbody = self.body if empty else obj | |
if obj == self.body or obj == 0: | |
return False | |
return self.external_contact(cbody) | |
# else: | |
# return ray_frac < 0.14 or self.external_contact() | |
# Return if body is in contact with something other than gripper | |
def external_contact(self, body=None): | |
if body is None: | |
body = self.body | |
pts = self._p.getContactPoints(bodyA=body) | |
pts = [pt for pt in pts if pt[2] != self.body] | |
return len(pts) > 0 # pylint: disable=g-explicit-length-test | |
def check_grasp(self): | |
while self.moving(): | |
sleep(0.001) | |
success = self.grasp_width() > 0.01 | |
return success | |
def grasp_width(self): | |
lpad = np.array(self._p.getLinkState(self.body, 4)[0]) | |
rpad = np.array(self._p.getLinkState(self.body, 9)[0]) | |
dist = np.linalg.norm(lpad - rpad) - 0.047813 | |
return dist | |
def check_proximity(self): | |
ee_pos = np.array(self._p.getLinkState(self.robot, self.tool)[0]) | |
tool_pos = np.array(self._p.getLinkState(self.body, 0)[0]) | |
vec = (tool_pos - ee_pos) / np.linalg.norm((tool_pos - ee_pos)) | |
ee_targ = ee_pos + vec | |
ray_data = self._p.rayTest(ee_pos, ee_targ)[0] | |
obj, link, ray_frac = ray_data[0], ray_data[1], ray_data[2] | |
return obj, link, ray_frac | |
# Gym-style environment code | |
class PickPlaceEnv(): | |
def __init__(self, render=False, high_res=False, high_frame_rate=False): | |
self.dt = 1/480 | |
self.sim_step = 0 | |
# Configure and start PyBullet | |
# self._p = pybullet.connect(pybullet.DIRECT) | |
self._p = BulletClient(connection_mode=pybullet.DIRECT) | |
self._p.configureDebugVisualizer(self._p.COV_ENABLE_GUI, 0) | |
self._p.setPhysicsEngineParameter(enableFileCaching=0) | |
assets_path = os.path.dirname(os.path.abspath("")) | |
self._p.setAdditionalSearchPath(assets_path) | |
self._p.setAdditionalSearchPath(pybullet_data.getDataPath()) | |
self._p.setTimeStep(self.dt) | |
self.home_joints = (np.pi / 2, -np.pi / 2, np.pi / 2, -np.pi / 2, 3 * np.pi / 2, 0) # Joint angles: (J0, J1, J2, J3, J4, J5). | |
self.home_ee_euler = (np.pi, 0, np.pi) # (RX, RY, RZ) rotation in Euler angles. | |
self.ee_link_id = 9 # Link ID of UR5 end effector. | |
self.tip_link_id = 10 # Link ID of gripper finger tips. | |
self.gripper = None | |
self.render = render | |
self.high_res = high_res | |
self.high_frame_rate = high_frame_rate | |
def reset(self, object_list): | |
self._p.resetSimulation(self._p.RESET_USE_DEFORMABLE_WORLD) | |
self._p.setGravity(0, 0, -9.8) | |
self.cache_video = [] | |
# Temporarily disable rendering to load URDFs faster. | |
self._p.configureDebugVisualizer(self._p.COV_ENABLE_RENDERING, 0) | |
# Add robot. | |
self._p.loadURDF("plane.urdf", [0, 0, -0.001]) | |
self.robot_id = self._p.loadURDF("ur5e/ur5e.urdf", [0, 0, 0], flags=self._p.URDF_USE_MATERIAL_COLORS_FROM_MTL) | |
self.ghost_id = self._p.loadURDF("ur5e/ur5e.urdf", [0, 0, -10]) # For forward kinematics. | |
self.joint_ids = [self._p.getJointInfo(self.robot_id, i) for i in range(self._p.getNumJoints(self.robot_id))] | |
self.joint_ids = [j[0] for j in self.joint_ids if j[2] == self._p.JOINT_REVOLUTE] | |
# Move robot to home configuration. | |
for i in range(len(self.joint_ids)): | |
self._p.resetJointState(self.robot_id, self.joint_ids[i], self.home_joints[i]) | |
# Add gripper. | |
if self.gripper is not None: | |
while self.gripper.constraints_thread.is_alive(): | |
self.constraints_thread_active = False | |
self.gripper = Robotiq2F85(self.robot_id, self.ee_link_id, self._p) | |
self.gripper.release() | |
# Add workspace. | |
plane_shape = self._p.createCollisionShape(self._p.GEOM_BOX, halfExtents=[0.3, 0.3, 0.001]) | |
plane_visual = self._p.createVisualShape(self._p.GEOM_BOX, halfExtents=[0.3, 0.3, 0.001]) | |
plane_id = self._p.createMultiBody(0, plane_shape, plane_visual, basePosition=[0, -0.5, 0]) | |
self._p.changeVisualShape(plane_id, -1, rgbaColor=[0.2, 0.2, 0.2, 1.0]) | |
# Load objects according to config. | |
self.object_list = object_list | |
self.obj_name_to_id = {} | |
obj_xyz = np.zeros((0, 3)) | |
for obj_name in object_list: | |
if ('block' in obj_name) or ('bowl' in obj_name): | |
# Get random position 15cm+ from other objects. | |
while True: | |
rand_x = np.random.uniform(BOUNDS[0, 0] + 0.1, BOUNDS[0, 1] - 0.1) | |
rand_y = np.random.uniform(BOUNDS[1, 0] + 0.1, BOUNDS[1, 1] - 0.1) | |
rand_xyz = np.float32([rand_x, rand_y, 0.03]).reshape(1, 3) | |
if len(obj_xyz) == 0: | |
obj_xyz = np.concatenate((obj_xyz, rand_xyz), axis=0) | |
break | |
else: | |
nn_dist = np.min(np.linalg.norm(obj_xyz - rand_xyz, axis=1)).squeeze() | |
if nn_dist > 0.15: | |
obj_xyz = np.concatenate((obj_xyz, rand_xyz), axis=0) | |
break | |
object_color = COLORS[obj_name.split(' ')[0]] | |
object_type = obj_name.split(' ')[1] | |
object_position = rand_xyz.squeeze() | |
if object_type == 'block': | |
object_shape = self._p.createCollisionShape(self._p.GEOM_BOX, halfExtents=[0.02, 0.02, 0.02]) | |
object_visual = self._p.createVisualShape(self._p.GEOM_BOX, halfExtents=[0.02, 0.02, 0.02]) | |
object_id = self._p.createMultiBody(0.01, object_shape, object_visual, basePosition=object_position) | |
elif object_type == 'bowl': | |
object_position[2] = 0 | |
object_id = self._p.loadURDF("bowl/bowl.urdf", object_position, useFixedBase=1) | |
self._p.changeVisualShape(object_id, -1, rgbaColor=object_color) | |
self.obj_name_to_id[obj_name] = object_id | |
# Re-enable rendering. | |
self._p.configureDebugVisualizer(self._p.COV_ENABLE_RENDERING, 1) | |
for _ in range(200): | |
self._p.stepSimulation() | |
# record object positions at reset | |
self.init_pos = {name: self.get_obj_pos(name) for name in object_list} | |
return self.get_observation() | |
def servoj(self, joints): | |
"""Move to target joint positions with position control.""" | |
self._p.setJointMotorControlArray( | |
bodyIndex=self.robot_id, | |
jointIndices=self.joint_ids, | |
controlMode=self._p.POSITION_CONTROL, | |
targetPositions=joints, | |
positionGains=[0.01]*6) | |
def movep(self, position): | |
"""Move to target end effector position.""" | |
joints = self._p.calculateInverseKinematics( | |
bodyUniqueId=self.robot_id, | |
endEffectorLinkIndex=self.tip_link_id, | |
targetPosition=position, | |
targetOrientation=self._p.getQuaternionFromEuler(self.home_ee_euler), | |
maxNumIterations=100) | |
self.servoj(joints) | |
def get_ee_pos(self): | |
ee_xyz = np.float32(self._p.getLinkState(self.robot_id, self.tip_link_id)[0]) | |
return ee_xyz | |
def step(self, action=None): | |
"""Do pick and place motion primitive.""" | |
pick_pos, place_pos = action['pick'].copy(), action['place'].copy() | |
# Set fixed primitive z-heights. | |
hover_xyz = np.float32([pick_pos[0], pick_pos[1], 0.2]) | |
if pick_pos.shape[-1] == 2: | |
pick_xyz = np.append(pick_pos, 0.025) | |
else: | |
pick_xyz = pick_pos | |
pick_xyz[2] = 0.025 | |
if place_pos.shape[-1] == 2: | |
place_xyz = np.append(place_pos, 0.15) | |
else: | |
place_xyz = place_pos | |
place_xyz[2] = 0.15 | |
# Move to object. | |
ee_xyz = self.get_ee_pos() | |
while np.linalg.norm(hover_xyz - ee_xyz) > 0.01: | |
self.movep(hover_xyz) | |
self.step_sim_and_render() | |
ee_xyz = self.get_ee_pos() | |
while np.linalg.norm(pick_xyz - ee_xyz) > 0.01: | |
self.movep(pick_xyz) | |
self.step_sim_and_render() | |
ee_xyz = self.get_ee_pos() | |
# Pick up object. | |
self.gripper.activate() | |
for _ in range(240): | |
self.step_sim_and_render() | |
while np.linalg.norm(hover_xyz - ee_xyz) > 0.01: | |
self.movep(hover_xyz) | |
self.step_sim_and_render() | |
ee_xyz = self.get_ee_pos() | |
for _ in range(50): | |
self.step_sim_and_render() | |
# Move to place location. | |
while np.linalg.norm(place_xyz - ee_xyz) > 0.01: | |
self.movep(place_xyz) | |
self.step_sim_and_render() | |
ee_xyz = self.get_ee_pos() | |
# Place down object. | |
while (not self.gripper.detect_contact()) and (place_xyz[2] > 0.03): | |
place_xyz[2] -= 0.001 | |
self.movep(place_xyz) | |
for _ in range(3): | |
self.step_sim_and_render() | |
self.gripper.release() | |
for _ in range(240): | |
self.step_sim_and_render() | |
place_xyz[2] = 0.2 | |
ee_xyz = self.get_ee_pos() | |
while np.linalg.norm(place_xyz - ee_xyz) > 0.01: | |
self.movep(place_xyz) | |
self.step_sim_and_render() | |
ee_xyz = self.get_ee_pos() | |
place_xyz = np.float32([0, -0.5, 0.2]) | |
while np.linalg.norm(place_xyz - ee_xyz) > 0.01: | |
self.movep(place_xyz) | |
self.step_sim_and_render() | |
ee_xyz = self.get_ee_pos() | |
observation = self.get_observation() | |
reward = self.get_reward() | |
done = False | |
info = {} | |
return observation, reward, done, info | |
def set_alpha_transparency(self, alpha: float) -> None: | |
for id in range(20): | |
visual_shape_data = self._p.getVisualShapeData(id) | |
for i in range(len(visual_shape_data)): | |
object_id, link_index, _, _, _, _, _, rgba_color = visual_shape_data[i] | |
rgba_color = list(rgba_color[0:3]) + [alpha] | |
self._p.changeVisualShape( | |
self.robot_id, linkIndex=i, rgbaColor=rgba_color) | |
self._p.changeVisualShape( | |
self.gripper.body, linkIndex=i, rgbaColor=rgba_color) | |
def step_sim_and_render(self): | |
self._p.stepSimulation() | |
self.sim_step += 1 | |
interval = 40 if self.high_frame_rate else 60 | |
# Render current image at 8 FPS. | |
if self.sim_step % interval == 0 and self.render: | |
self.cache_video.append(self.get_camera_image()) | |
def get_camera_image(self): | |
if not self.high_res: | |
image_size = (240, 240) | |
intrinsics = (120., 0, 120., 0, 120., 120., 0, 0, 1) | |
else: | |
image_size=(360, 360) | |
intrinsics=(180., 0, 180., 0, 180., 180., 0, 0, 1) | |
color, _, _, _, _ = self.render_image(image_size, intrinsics) | |
return color | |
def get_reward(self): | |
return None | |
def get_observation(self): | |
observation = {} | |
# Render current image. | |
color, depth, position, orientation, intrinsics = self.render_image() | |
# Get heightmaps and colormaps. | |
points = self.get_pointcloud(depth, intrinsics) | |
position = np.float32(position).reshape(3, 1) | |
rotation = self._p.getMatrixFromQuaternion(orientation) | |
rotation = np.float32(rotation).reshape(3, 3) | |
transform = np.eye(4) | |
transform[:3, :] = np.hstack((rotation, position)) | |
points = self.transform_pointcloud(points, transform) | |
heightmap, colormap, xyzmap = self.get_heightmap(points, color, BOUNDS, PIXEL_SIZE) | |
observation["image"] = colormap | |
observation["xyzmap"] = xyzmap | |
return observation | |
def render_image(self, image_size=(720, 720), intrinsics=(360., 0, 360., 0, 360., 360., 0, 0, 1)): | |
# Camera parameters. | |
position = (0, -0.85, 0.4) | |
orientation = (np.pi / 4 + np.pi / 48, np.pi, np.pi) | |
orientation = self._p.getQuaternionFromEuler(orientation) | |
zrange = (0.01, 10.) | |
noise=True | |
# OpenGL camera settings. | |
lookdir = np.float32([0, 0, 1]).reshape(3, 1) | |
updir = np.float32([0, -1, 0]).reshape(3, 1) | |
rotation = self._p.getMatrixFromQuaternion(orientation) | |
rotm = np.float32(rotation).reshape(3, 3) | |
lookdir = (rotm @ lookdir).reshape(-1) | |
updir = (rotm @ updir).reshape(-1) | |
lookat = position + lookdir | |
focal_len = intrinsics[0] | |
znear, zfar = (0.01, 10.) | |
viewm = self._p.computeViewMatrix(position, lookat, updir) | |
fovh = (image_size[0] / 2) / focal_len | |
fovh = 180 * np.arctan(fovh) * 2 / np.pi | |
# Notes: 1) FOV is vertical FOV 2) aspect must be float | |
aspect_ratio = image_size[1] / image_size[0] | |
projm = self._p.computeProjectionMatrixFOV(fovh, aspect_ratio, znear, zfar) | |
# Render with OpenGL camera settings. | |
_, _, color, depth, segm = self._p.getCameraImage( | |
width=image_size[1], | |
height=image_size[0], | |
viewMatrix=viewm, | |
projectionMatrix=projm, | |
shadow=1, | |
flags=self._p.ER_SEGMENTATION_MASK_OBJECT_AND_LINKINDEX, | |
renderer=self._p.ER_BULLET_HARDWARE_OPENGL) | |
# Get color image. | |
color_image_size = (image_size[0], image_size[1], 4) | |
color = np.array(color, dtype=np.uint8).reshape(color_image_size) | |
color = color[:, :, :3] # remove alpha channel | |
if noise: | |
color = np.int32(color) | |
color += np.int32(np.random.normal(0, 3, color.shape)) | |
color = np.uint8(np.clip(color, 0, 255)) | |
# Get depth image. | |
depth_image_size = (image_size[0], image_size[1]) | |
zbuffer = np.float32(depth).reshape(depth_image_size) | |
depth = (zfar + znear - (2 * zbuffer - 1) * (zfar - znear)) | |
depth = (2 * znear * zfar) / depth | |
if noise: | |
depth += np.random.normal(0, 0.003, depth.shape) | |
intrinsics = np.float32(intrinsics).reshape(3, 3) | |
return color, depth, position, orientation, intrinsics | |
def get_pointcloud(self, depth, intrinsics): | |
"""Get 3D pointcloud from perspective depth image. | |
Args: | |
depth: HxW float array of perspective depth in meters. | |
intrinsics: 3x3 float array of camera intrinsics matrix. | |
Returns: | |
points: HxWx3 float array of 3D points in camera coordinates. | |
""" | |
height, width = depth.shape | |
xlin = np.linspace(0, width - 1, width) | |
ylin = np.linspace(0, height - 1, height) | |
px, py = np.meshgrid(xlin, ylin) | |
px = (px - intrinsics[0, 2]) * (depth / intrinsics[0, 0]) | |
py = (py - intrinsics[1, 2]) * (depth / intrinsics[1, 1]) | |
points = np.float32([px, py, depth]).transpose(1, 2, 0) | |
return points | |
def transform_pointcloud(self, points, transform): | |
"""Apply rigid transformation to 3D pointcloud. | |
Args: | |
points: HxWx3 float array of 3D points in camera coordinates. | |
transform: 4x4 float array representing a rigid transformation matrix. | |
Returns: | |
points: HxWx3 float array of transformed 3D points. | |
""" | |
padding = ((0, 0), (0, 0), (0, 1)) | |
homogen_points = np.pad(points.copy(), padding, | |
'constant', constant_values=1) | |
for i in range(3): | |
points[Ellipsis, i] = np.sum(transform[i, :] * homogen_points, axis=-1) | |
return points | |
def get_heightmap(self, points, colors, bounds, pixel_size): | |
"""Get top-down (z-axis) orthographic heightmap image from 3D pointcloud. | |
Args: | |
points: HxWx3 float array of 3D points in world coordinates. | |
colors: HxWx3 uint8 array of values in range 0-255 aligned with points. | |
bounds: 3x2 float array of values (rows: X,Y,Z; columns: min,max) defining | |
region in 3D space to generate heightmap in world coordinates. | |
pixel_size: float defining size of each pixel in meters. | |
Returns: | |
heightmap: HxW float array of height (from lower z-bound) in meters. | |
colormap: HxWx3 uint8 array of backprojected color aligned with heightmap. | |
xyzmap: HxWx3 float array of XYZ points in world coordinates. | |
""" | |
width = int(np.round((bounds[0, 1] - bounds[0, 0]) / pixel_size)) | |
height = int(np.round((bounds[1, 1] - bounds[1, 0]) / pixel_size)) | |
heightmap = np.zeros((height, width), dtype=np.float32) | |
colormap = np.zeros((height, width, colors.shape[-1]), dtype=np.uint8) | |
xyzmap = np.zeros((height, width, 3), dtype=np.float32) | |
# Filter out 3D points that are outside of the predefined bounds. | |
ix = (points[Ellipsis, 0] >= bounds[0, 0]) & (points[Ellipsis, 0] < bounds[0, 1]) | |
iy = (points[Ellipsis, 1] >= bounds[1, 0]) & (points[Ellipsis, 1] < bounds[1, 1]) | |
iz = (points[Ellipsis, 2] >= bounds[2, 0]) & (points[Ellipsis, 2] < bounds[2, 1]) | |
valid = ix & iy & iz | |
points = points[valid] | |
colors = colors[valid] | |
# Sort 3D points by z-value, which works with array assignment to simulate | |
# z-buffering for rendering the heightmap image. | |
iz = np.argsort(points[:, -1]) | |
points, colors = points[iz], colors[iz] | |
px = np.int32(np.floor((points[:, 0] - bounds[0, 0]) / pixel_size)) | |
py = np.int32(np.floor((points[:, 1] - bounds[1, 0]) / pixel_size)) | |
px = np.clip(px, 0, width - 1) | |
py = np.clip(py, 0, height - 1) | |
heightmap[py, px] = points[:, 2] - bounds[2, 0] | |
for c in range(colors.shape[-1]): | |
colormap[py, px, c] = colors[:, c] | |
xyzmap[py, px, c] = points[:, c] | |
colormap = colormap[::-1, :, :] # Flip up-down. | |
xv, yv = np.meshgrid(np.linspace(BOUNDS[0, 0], BOUNDS[0, 1], height), | |
np.linspace(BOUNDS[1, 0], BOUNDS[1, 1], width)) | |
xyzmap[:, :, 0] = xv | |
xyzmap[:, :, 1] = yv | |
xyzmap = xyzmap[::-1, :, :] # Flip up-down. | |
heightmap = heightmap[::-1, :] # Flip up-down. | |
return heightmap, colormap, xyzmap | |
def on_top_of(self, obj_a, obj_b): | |
""" | |
check if obj_a is on top of obj_b | |
condition 1: l2 distance on xy plane is less than a threshold | |
condition 2: obj_a is higher than obj_b | |
""" | |
obj_a_pos = self.get_obj_pos(obj_a) | |
obj_b_pos = self.get_obj_pos(obj_b) | |
xy_dist = np.linalg.norm(obj_a_pos[:2] - obj_b_pos[:2]) | |
if obj_b in CORNER_POS: | |
is_near = xy_dist < 0.06 | |
return is_near | |
elif 'bowl' in obj_b: | |
is_near = xy_dist < 0.06 | |
is_higher = obj_a_pos[2] > obj_b_pos[2] | |
return is_near and is_higher | |
else: | |
is_near = xy_dist < 0.04 | |
is_higher = obj_a_pos[2] > obj_b_pos[2] | |
return is_near and is_higher | |
def get_obj_id(self, obj_name): | |
try: | |
if obj_name in self.obj_name_to_id: | |
obj_id = self.obj_name_to_id[obj_name] | |
else: | |
obj_name = obj_name.replace('circle', 'bowl').replace('square', 'block').replace('small', '').strip() | |
obj_id = self.obj_name_to_id[obj_name] | |
return obj_id | |
except: | |
raise Exception('Object name "{}" not found'.format(obj_name)) | |
def get_obj_pos(self, obj_name): | |
obj_name = obj_name.replace('the', '').replace('_', ' ').strip() | |
if obj_name in CORNER_POS: | |
position = np.float32(np.array(CORNER_POS[obj_name])) | |
else: | |
pick_id = self.get_obj_id(obj_name) | |
pose = self._p.getBasePositionAndOrientation(pick_id) | |
position = np.float32(pose[0]) | |
return position | |
def get_bounding_box(self, obj_name): | |
obj_id = self.get_obj_id(obj_name) | |
return self._p.getAABB(obj_id) | |
class LMP_wrapper(): | |
def __init__(self, env, cfg, render=False): | |
self.env = env | |
self._cfg = cfg | |
self.object_names = list(self._cfg['env']['init_objs']) | |
self._min_xy = np.array(self._cfg['env']['coords']['bottom_left']) | |
self._max_xy = np.array(self._cfg['env']['coords']['top_right']) | |
self._range_xy = self._max_xy - self._min_xy | |
self._table_z = self._cfg['env']['coords']['table_z'] | |
self.render = render | |
def is_obj_visible(self, obj_name): | |
return obj_name in self.object_names | |
def get_obj_names(self): | |
return self.object_names[::] | |
def denormalize_xy(self, pos_normalized): | |
return pos_normalized * self._range_xy + self._min_xy | |
def get_corner_positions(self): | |
unit_square = box(0, 0, 1, 1) | |
normalized_corners = np.array(list(unit_square.exterior.coords))[:4] | |
corners = np.array(([self.denormalize_xy(corner) for corner in normalized_corners])) | |
return corners | |
def get_side_positions(self): | |
side_xs = np.array([0, 0.5, 0.5, 1]) | |
side_ys = np.array([0.5, 0, 1, 0.5]) | |
normalized_side_positions = np.c_[side_xs, side_ys] | |
side_positions = np.array(([self.denormalize_xy(corner) for corner in normalized_side_positions])) | |
return side_positions | |
def get_obj_pos(self, obj_name): | |
# return the xy position of the object in robot base frame | |
return self.env.get_obj_pos(obj_name)[:2] | |
def get_obj_position_np(self, obj_name): | |
return self.get_pos(obj_name) | |
def get_bbox(self, obj_name): | |
# return the axis-aligned object bounding box in robot base frame (not in pixels) | |
# the format is (min_x, min_y, max_x, max_y) | |
bbox = self.env.get_bounding_box(obj_name) | |
return bbox | |
def get_color(self, obj_name): | |
for color, rgb in COLORS.items(): | |
if color in obj_name: | |
return rgb | |
def pick_place(self, pick_pos, place_pos): | |
pick_pos_xyz = np.r_[pick_pos, [self._table_z]] | |
place_pos_xyz = np.r_[place_pos, [self._table_z]] | |
pass | |
def put_first_on_second(self, arg1, arg2): | |
# put the object with obj_name on top of target | |
# target can either be another object name, or it can be an x-y position in robot base frame | |
pick_pos = self.get_obj_pos(arg1) if isinstance(arg1, str) else arg1 | |
place_pos = self.get_obj_pos(arg2) if isinstance(arg2, str) else arg2 | |
self.env.step(action={'pick': pick_pos, 'place': place_pos}) | |
def get_robot_pos(self): | |
# return robot end-effector xy position in robot base frame | |
return self.env.get_ee_pos() | |
def goto_pos(self, position_xy): | |
# move the robot end-effector to the desired xy position while maintaining same z | |
ee_xyz = self.env.get_ee_pos() | |
position_xyz = np.concatenate([position_xy, ee_xyz[-1]]) | |
while np.linalg.norm(position_xyz - ee_xyz) > 0.01: | |
self.env.movep(position_xyz) | |
self.env.step_sim_and_render() | |
ee_xyz = self.env.get_ee_pos() | |
def follow_traj(self, traj): | |
for pos in traj: | |
self.goto_pos(pos) | |
def get_corner_positions(self): | |
normalized_corners = np.array([ | |
[0, 1], | |
[1, 1], | |
[0, 0], | |
[1, 0] | |
]) | |
return np.array(([self.denormalize_xy(corner) for corner in normalized_corners])) | |
def get_side_positions(self): | |
normalized_sides = np.array([ | |
[0.5, 1], | |
[1, 0.5], | |
[0.5, 0], | |
[0, 0.5] | |
]) | |
return np.array(([self.denormalize_xy(side) for side in normalized_sides])) | |
def get_corner_name(self, pos): | |
corner_positions = self.get_corner_positions() | |
corner_idx = np.argmin(np.linalg.norm(corner_positions - pos, axis=1)) | |
return ['top left corner', 'top right corner', 'bottom left corner', 'botom right corner'][corner_idx] | |
def get_side_name(self, pos): | |
side_positions = self.get_side_positions() | |
side_idx = np.argmin(np.linalg.norm(side_positions - pos, axis=1)) | |
return ['top side', 'right side', 'bottom side', 'left side'][side_idx] | |
class DummyClass: | |
def __getattr__(self, attr): | |
return None | |
class VoxPoserWrapper(): | |
def __init__(self, env, cfg, render=False): | |
self.env = env | |
self._cfg = cfg | |
def parse_query_obj(self, *args, **kwargs): | |
print("`parse_query_obj` called.\n") | |
return DummyClass() | |
def get_empty_affordance_map(self, *args, **kwargs): | |
print("`get_empty_affordance_map` called.\n") | |
return DummyClass() | |
def set_voxel_by_radius(self, *args, **kwargs): | |
print("`set_voxel_by_radius` called.\n") | |
def cm2index(self, *args, **kwargs): | |
print("`cm2index` called.\n") | |
def detect(self, *args, **kwargs): | |
print("`detect` called.\n") | |
return DummyClass() | |
def get_empty_avoidance_map(self, *args, **kwargs): | |
print("`get_empty_avoidance_map` called.\n") | |
return DummyClass() | |
def get_empty_gripper_map(self, *args, **kwargs): | |
print("`get_empty_gripper_map` called.\n") | |
return DummyClass() | |
def get_empty_rotation_map(self, *args, **kwargs): | |
print("`get_empty_rotation_map` called.\n") | |
return DummyClass() | |
def vec2quat(self, *args, **kwargs): | |
print("`vec2quat` called.\n") | |
return DummyClass() | |
def euler2quat(self, *args, **kwargs): | |
print("`euler2quat` called.\n") | |
return DummyClass() | |
def quat2euler(self, *args, **kwargs): | |
print("`quat2euler` called.\n") | |
return DummyClass() | |
def qmult(self, *args, **kwargs): | |
print("`qmult` called.\n") | |
return DummyClass() | |
def qinverse(self, *args, **kwargs): | |
print("`qinverse` called.\n") | |
return DummyClass() | |
def get_empty_velocity_map(self, *args, **kwargs): | |
print("`get_empty_velocity_map` called.\n") | |
return DummyClass() | |
def execute(self, *args, **kwargs): | |
print("`execute` called.\n") | |
def reset_to_default_pose(self, *args, **kwargs): | |
print("`reset_to_default_pose` called.\n") | |
class AgibotWrapper(): | |
def __init__(self, env, cfg, render=False): | |
self.env = env | |
self._cfg = cfg | |
def track(self, *args, **kwargs): | |
print("`track` called.") | |
print("detect") | |
print("establish tracking") | |
return DummyClass() | |
def get_target_pose(self, *args, **kwargs): | |
print("`get_target_pose` called.\n") | |
return DummyClass() | |
def get_operation(self, *args, **kwargs): | |
print("`get_operation` called.\n") | |
return DummyClass() | |
def execute(self, movable, *args, target_pose=None, operation=None, **kwargs) -> bool: | |
print("`execute` called.") | |
while not self.lock_object(target_pose): | |
pass | |
print("locked\n") | |
if not self.observe_object_if_moving(movable): | |
print("object is moving, execution exit.\n") | |
return False | |
if target_pose is not None: | |
contact_ret = self.contact_object(target_pose) | |
print("contact_object:", "success" if contact_ret else "failed") | |
if not contact_ret: | |
print("execution failed\n") | |
return False | |
if operation is not None: | |
operation_ret = self.operate_object(operation) | |
print("operate_object:", "success" if operation_ret else "failed") | |
if not operation_ret: | |
print("execution failed\n") | |
return False | |
print("execution done\n") | |
return True | |
def lock_object(self, *args, **kwargs): | |
print("`lock_object` called.\n") | |
sleep(1.0) | |
return True | |
def observe_object_if_moving(self, *args, **kwargs): | |
return random.choice([True, True, False]) | |
def contact_object(self, *args, **kwargs): | |
print("`contact_object` called.") | |
return random.choice([True, True, False]) | |
def operate_object(self, *args, **kwargs): | |
print("`operate_object` called.") | |
return random.choice([True, True, False]) | |
def reset_to_default_pose(self, *args, **kwargs): | |
print("`reset_to_default_pose` called.\n") | |