|
"""Environment class.""" |
|
|
|
import os |
|
import tempfile |
|
import time |
|
import cv2 |
|
import imageio |
|
|
|
import gym |
|
import numpy as np |
|
from cliport.tasks import cameras |
|
from cliport.utils import pybullet_utils |
|
from cliport.utils import utils |
|
import string |
|
import pybullet as p |
|
import tempfile |
|
import random |
|
import sys |
|
|
|
PLACE_STEP = 0.0003 |
|
PLACE_DELTA_THRESHOLD = 0.005 |
|
|
|
UR5_URDF_PATH = 'ur5/ur5.urdf' |
|
UR5_WORKSPACE_URDF_PATH = 'ur5/workspace.urdf' |
|
PLANE_URDF_PATH = 'plane/plane.urdf' |
|
|
|
|
|
class Environment(gym.Env): |
|
"""OpenAI Gym-style environment class.""" |
|
|
|
def __init__(self, |
|
assets_root, |
|
task=None, |
|
disp=False, |
|
shared_memory=False, |
|
hz=240, |
|
record_cfg=None): |
|
"""Creates OpenAI Gym-style environment with PyBullet. |
|
|
|
Args: |
|
assets_root: root directory of assets. |
|
task: the task to use. If None, the user must call set_task for the |
|
environment to work properly. |
|
disp: show environment with PyBullet's built-in display viewer. |
|
shared_memory: run with shared memory. |
|
hz: PyBullet physics simulation step speed. Set to 480 for deformables. |
|
|
|
Raises: |
|
RuntimeError: if pybullet cannot load fileIOPlugin. |
|
""" |
|
self.curr_video = [] |
|
self.pix_size = 0.003125 |
|
self.obj_ids = {'fixed': [], 'rigid': [], 'deformable': []} |
|
self.objects = self.obj_ids |
|
|
|
self.homej = np.array([-1, -0.5, 0.5, -0.5, -0.5, 0]) * np.pi |
|
self.agent_cams = cameras.RealSenseD415.CONFIG |
|
self.record_cfg = record_cfg |
|
self.save_video = True |
|
self.step_counter = 0 |
|
|
|
self.assets_root = assets_root |
|
|
|
color_tuple = [ |
|
gym.spaces.Box(0, 255, config['image_size'] + (3,), dtype=np.uint8) |
|
for config in self.agent_cams |
|
] |
|
depth_tuple = [ |
|
gym.spaces.Box(0.0, 20.0, config['image_size'], dtype=np.float32) |
|
for config in self.agent_cams |
|
] |
|
self.observation_space = gym.spaces.Dict({ |
|
'color': gym.spaces.Tuple(color_tuple), |
|
'depth': gym.spaces.Tuple(depth_tuple), |
|
}) |
|
self.position_bounds = gym.spaces.Box( |
|
low=np.array([0.25, -0.5, 0.], dtype=np.float32), |
|
high=np.array([0.75, 0.5, 0.28], dtype=np.float32), |
|
shape=(3,), |
|
dtype=np.float32) |
|
self.bounds = np.array([[0.25, 0.75], [-0.5, 0.5], [0, 0.3]]) |
|
|
|
self.action_space = gym.spaces.Dict({ |
|
'pose0': |
|
gym.spaces.Tuple( |
|
(self.position_bounds, |
|
gym.spaces.Box(-1.0, 1.0, shape=(4,), dtype=np.float32))), |
|
'pose1': |
|
gym.spaces.Tuple( |
|
(self.position_bounds, |
|
gym.spaces.Box(-1.0, 1.0, shape=(4,), dtype=np.float32))) |
|
}) |
|
|
|
|
|
disp_option = p.DIRECT |
|
if disp: |
|
disp_option = p.GUI |
|
if shared_memory: |
|
disp_option = p.SHARED_MEMORY |
|
client = p.connect(disp_option) |
|
file_io = p.loadPlugin('fileIOPlugin', physicsClientId=client) |
|
if file_io < 0: |
|
raise RuntimeError('pybullet: cannot load FileIO!') |
|
if file_io >= 0: |
|
p.executePluginCommand( |
|
file_io, |
|
textArgument=assets_root, |
|
intArgs=[p.AddFileIOAction], |
|
physicsClientId=client) |
|
|
|
p.configureDebugVisualizer(p.COV_ENABLE_GUI, 0) |
|
p.setPhysicsEngineParameter(enableFileCaching=0) |
|
p.setAdditionalSearchPath(assets_root) |
|
p.setAdditionalSearchPath(tempfile.gettempdir()) |
|
p.setTimeStep(1. / hz) |
|
|
|
|
|
if disp: |
|
target = p.getDebugVisualizerCamera()[11] |
|
p.resetDebugVisualizerCamera( |
|
cameraDistance=1.1, |
|
cameraYaw=90, |
|
cameraPitch=-25, |
|
cameraTargetPosition=target) |
|
|
|
if task: |
|
self.set_task(task) |
|
|
|
def __del__(self): |
|
if hasattr(self, 'video_writer'): |
|
self.video_writer.close() |
|
|
|
@property |
|
def is_static(self): |
|
"""Return true if objects are no longer moving.""" |
|
v = [np.linalg.norm(p.getBaseVelocity(i)[0]) |
|
for i in self.obj_ids['rigid']] |
|
return all(np.array(v) < 5e-3) |
|
|
|
def fill_dummy_template(self, template): |
|
"""check if there are empty templates that haven't been fulfilled yet. if so. fill in dummy numbers """ |
|
full_template_path = os.path.join(self.assets_root, template) |
|
with open(full_template_path, 'r') as file: |
|
fdata = file.read() |
|
|
|
fill = False |
|
for field in ['DIMH', 'DIMR', 'DIMX', 'DIMY', 'DIMZ', 'DIM']: |
|
|
|
if field in fdata: |
|
default_replace_vals = np.random.uniform(0.03, 0.05, size=(3,)).tolist() |
|
for i in range(len(default_replace_vals)): |
|
fdata = fdata.replace(f'{field}{i}', str(default_replace_vals[i])) |
|
fill = True |
|
|
|
for field in ['HALF']: |
|
|
|
if field in fdata: |
|
default_replace_vals = np.random.uniform(0.01, 0.03, size=(3,)).tolist() |
|
for i in range(len(default_replace_vals)): |
|
fdata = fdata.replace(f'{field}{i}', str(default_replace_vals[i])) |
|
fill = True |
|
|
|
if fill: |
|
alphabet = string.ascii_lowercase + string.digits |
|
rname = ''.join(random.choices(alphabet, k=16)) |
|
tmpdir = tempfile.gettempdir() |
|
template_filename = os.path.split(template)[-1] |
|
fname = os.path.join(tmpdir, f'{template_filename}.{rname}') |
|
with open(fname, 'w') as file: |
|
file.write(fdata) |
|
|
|
|
|
return fname |
|
else: |
|
return template |
|
|
|
def add_object(self, urdf, pose, category='rigid', color=None, **kwargs): |
|
"""List of (fixed, rigid, or deformable) objects in env.""" |
|
fixed_base = 1 if category == 'fixed' else 0 |
|
|
|
if 'template' in urdf: |
|
if not os.path.exists(os.path.join(self.assets_root, urdf)): |
|
urdf = urdf.replace("-template", "") |
|
|
|
urdf = self.fill_dummy_template(urdf) |
|
|
|
if not os.path.exists(os.path.join(self.assets_root, urdf)): |
|
print(f"missing urdf error: {os.path.join(self.assets_root, urdf)}. use dummy block.") |
|
urdf = 'stacking/block.urdf' |
|
|
|
obj_id = pybullet_utils.load_urdf( |
|
p, |
|
os.path.join(self.assets_root, urdf), |
|
pose[0], |
|
pose[1], |
|
useFixedBase=fixed_base) |
|
|
|
if not obj_id is None: |
|
self.obj_ids[category].append(obj_id) |
|
|
|
if color is not None: |
|
if type(color) is str: |
|
color = utils.COLORS[color] |
|
color = color + [1.] |
|
p.changeVisualShape(obj_id, -1, rgbaColor=color) |
|
|
|
if hasattr(self, 'record_cfg') and 'blender_render' in self.record_cfg and self.record_cfg['blender_render']: |
|
|
|
|
|
|
|
print("color:", color) |
|
|
|
self.blender_recorder.register_object(obj_id, os.path.join(self.assets_root, urdf), color=color) |
|
|
|
return obj_id |
|
|
|
def set_color(self, obj_id, color): |
|
p.changeVisualShape(obj_id, -1, rgbaColor=color + [1]) |
|
|
|
def set_object_color(self, *args, **kwargs): |
|
return self.set_color(*args, **kwargs) |
|
|
|
|
|
|
|
|
|
|
|
def seed(self, seed=None): |
|
self._random = np.random.RandomState(seed) |
|
return seed |
|
|
|
def reset(self): |
|
"""Performs common reset functionality for all supported tasks.""" |
|
if not self.task: |
|
raise ValueError('environment task must be set. Call set_task or pass ' |
|
'the task arg in the environment constructor.') |
|
self.obj_ids = {'fixed': [], 'rigid': [], 'deformable': []} |
|
p.resetSimulation(p.RESET_USE_DEFORMABLE_WORLD) |
|
p.setGravity(0, 0, -9.8) |
|
|
|
|
|
p.configureDebugVisualizer(p.COV_ENABLE_RENDERING, 0) |
|
|
|
plane = pybullet_utils.load_urdf(p, os.path.join(self.assets_root, PLANE_URDF_PATH), |
|
[0, 0, -0.001]) |
|
workspace = pybullet_utils.load_urdf( |
|
p, os.path.join(self.assets_root, UR5_WORKSPACE_URDF_PATH), [0.5, 0, 0]) |
|
|
|
|
|
|
|
self.ur5 = pybullet_utils.load_urdf( |
|
p, os.path.join(self.assets_root, UR5_URDF_PATH)) |
|
self.ee = self.task.ee(self.assets_root, self.ur5, 9, self.obj_ids) |
|
self.ee_tip = 10 |
|
|
|
if hasattr(self, 'record_cfg') and 'blender_render' in self.record_cfg and self.record_cfg['blender_render']: |
|
from misc.pyBulletSimRecorder import PyBulletRecorder |
|
self.blender_recorder = PyBulletRecorder() |
|
|
|
self.blender_recorder.register_object(plane, os.path.join(self.assets_root, PLANE_URDF_PATH)) |
|
self.blender_recorder.register_object(workspace, os.path.join(self.assets_root, UR5_WORKSPACE_URDF_PATH)) |
|
self.blender_recorder.register_object(self.ur5, os.path.join(self.assets_root, UR5_URDF_PATH)) |
|
|
|
self.blender_recorder.register_object(self.ee.base, self.ee.base_urdf_path) |
|
if hasattr(self.ee, 'body'): |
|
self.blender_recorder.register_object(self.ee.body, self.ee.urdf_path) |
|
|
|
|
|
|
|
n_joints = p.getNumJoints(self.ur5) |
|
joints = [p.getJointInfo(self.ur5, i) for i in range(n_joints)] |
|
self.joints = [j[0] for j in joints if j[2] == p.JOINT_REVOLUTE] |
|
|
|
|
|
for i in range(len(self.joints)): |
|
p.resetJointState(self.ur5, self.joints[i], self.homej[i]) |
|
|
|
|
|
self.ee.release() |
|
|
|
|
|
self.task.reset(self) |
|
|
|
|
|
p.configureDebugVisualizer(p.COV_ENABLE_RENDERING, 1) |
|
|
|
self.step() |
|
|
|
|
|
|
|
def step(self, action=None): |
|
"""Execute action with specified primitive. |
|
|
|
Args: |
|
action: action to execute. |
|
|
|
Returns: |
|
(obs, reward, done, info) tuple containing MDP step data. |
|
""" |
|
if action is not None: |
|
timeout = self.task.primitive(self.movej, self.movep, self.ee, action['pose0'], action['pose1']) |
|
|
|
|
|
|
|
if timeout: |
|
obs = {'color': (), 'depth': ()} |
|
for config in self.agent_cams: |
|
color, depth, _ = self.render_camera(config) |
|
obs['color'] += (color,) |
|
obs['depth'] += (depth,) |
|
return obs, 0.0, True, self.info |
|
|
|
start_time = time.time() |
|
|
|
while not self.is_static: |
|
self.step_simulation() |
|
if time.time() - start_time > 5: |
|
break |
|
|
|
|
|
reward, info = self.task.reward() if action is not None else (0, {}) |
|
done = self.task.done() |
|
|
|
|
|
info.update(self.info) |
|
|
|
obs = self._get_obs() |
|
|
|
if not os.path.exists(self.record_cfg['save_video_path']): |
|
os.mkdir(self.record_cfg['save_video_path']) |
|
self.video_path = os.path.join(self.record_cfg['save_video_path'], "123.mp4") |
|
video_writer = imageio.get_writer(self.video_path, |
|
fps=self.record_cfg['fps'], |
|
format='FFMPEG', |
|
codec='h264', ) |
|
print(f"has {len(self.curr_video)} frames to save") |
|
for color in self.curr_video: |
|
video_writer.append_data(color) |
|
print("save video to ", self.video_path) |
|
video_writer.close() |
|
self.cur_obs = obs |
|
self.cur_reward = reward |
|
self.cur_done = done |
|
self.cur_info = info |
|
yield "Task Generated ==> Asset Generated ==> API Reviewed ==> Error Reviewed ==> Code Generated ==> Running Simulation", self.generated_code, self.video_path |
|
|
|
|
|
def step_simulation(self): |
|
p.stepSimulation() |
|
self.step_counter += 1 |
|
|
|
if self.save_video and self.step_counter % 5 == 0: |
|
self.add_video_frame() |
|
|
|
def render(self, mode='rgb_array'): |
|
|
|
|
|
if mode != 'rgb_array': |
|
raise NotImplementedError('Only rgb_array implemented') |
|
color, _, _ = self.render_camera(self.agent_cams[0]) |
|
|
|
return color |
|
|
|
def render_camera(self, config, image_size=None, shadow=1): |
|
"""Render RGB-D image with specified camera configuration.""" |
|
if not image_size: |
|
image_size = config['image_size'] |
|
|
|
|
|
lookdir = np.float32([0, 0, 1]).reshape(3, 1) |
|
updir = np.float32([0, -1, 0]).reshape(3, 1) |
|
rotation = p.getMatrixFromQuaternion(config['rotation']) |
|
rotm = np.float32(rotation).reshape(3, 3) |
|
lookdir = (rotm @ lookdir).reshape(-1) |
|
updir = (rotm @ updir).reshape(-1) |
|
lookat = config['position'] + lookdir |
|
focal_len = config['intrinsics'][0] |
|
znear, zfar = config['zrange'] |
|
viewm = p.computeViewMatrix(config['position'], lookat, updir) |
|
fovh = (image_size[0] / 2) / focal_len |
|
fovh = 180 * np.arctan(fovh) * 2 / np.pi |
|
|
|
|
|
aspect_ratio = image_size[1] / image_size[0] |
|
projm = p.computeProjectionMatrixFOV(fovh, aspect_ratio, znear, zfar) |
|
|
|
|
|
_, _, color, depth, segm = p.getCameraImage( |
|
width=image_size[1], |
|
height=image_size[0], |
|
viewMatrix=viewm, |
|
projectionMatrix=projm, |
|
shadow=shadow, |
|
flags=p.ER_SEGMENTATION_MASK_OBJECT_AND_LINKINDEX, |
|
renderer=p.ER_BULLET_HARDWARE_OPENGL) |
|
|
|
|
|
color_image_size = (image_size[0], image_size[1], 4) |
|
color = np.array(color, dtype=np.uint8).reshape(color_image_size) |
|
color = color[:, :, :3] |
|
if config['noise']: |
|
color = np.int32(color) |
|
color += np.int32(self._random.normal(0, 3, image_size)) |
|
color = np.uint8(np.clip(color, 0, 255)) |
|
|
|
|
|
depth_image_size = (image_size[0], image_size[1]) |
|
zbuffer = np.array(depth).reshape(depth_image_size) |
|
depth = (zfar + znear - (2. * zbuffer - 1.) * (zfar - znear)) |
|
depth = (2. * znear * zfar) / depth |
|
if config['noise']: |
|
depth += self._random.normal(0, 0.003, depth_image_size) |
|
|
|
|
|
segm = np.uint8(segm).reshape(depth_image_size) |
|
|
|
return color, depth, segm |
|
|
|
@property |
|
def info(self): |
|
"""Environment info variable with object poses, dimensions, and colors.""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
info = {} |
|
for obj_ids in self.obj_ids.values(): |
|
for obj_id in obj_ids: |
|
pos, rot = p.getBasePositionAndOrientation(obj_id) |
|
dim = p.getVisualShapeData(obj_id)[0][3] |
|
info[obj_id] = (pos, rot, dim) |
|
|
|
info['lang_goal'] = self.get_lang_goal() |
|
return info |
|
|
|
def set_task(self, task): |
|
task.set_assets_root(self.assets_root) |
|
self.task = task |
|
|
|
def get_task_name(self): |
|
return type(self.task).__name__ |
|
|
|
def get_lang_goal(self): |
|
if self.task: |
|
return self.task.get_lang_goal() |
|
else: |
|
raise Exception("No task for was set") |
|
|
|
|
|
|
|
|
|
|
|
def movej(self, targj, speed=0.01, timeout=5): |
|
"""Move UR5 to target joint configuration.""" |
|
if self.save_video: |
|
timeout = timeout * 5 |
|
|
|
t0 = time.time() |
|
while (time.time() - t0) < timeout: |
|
currj = [p.getJointState(self.ur5, i)[0] for i in self.joints] |
|
currj = np.array(currj) |
|
diffj = targj - currj |
|
if all(np.abs(diffj) < 1e-2): |
|
return False |
|
|
|
|
|
norm = np.linalg.norm(diffj) |
|
v = diffj / norm if norm > 0 else 0 |
|
stepj = currj + v * speed |
|
gains = np.ones(len(self.joints)) |
|
p.setJointMotorControlArray( |
|
bodyIndex=self.ur5, |
|
jointIndices=self.joints, |
|
controlMode=p.POSITION_CONTROL, |
|
targetPositions=stepj, |
|
positionGains=gains) |
|
self.step_counter += 1 |
|
self.step_simulation() |
|
|
|
print(f'Warning: movej exceeded {timeout} second timeout. Skipping.') |
|
return True |
|
|
|
def start_rec(self, video_filename): |
|
assert self.record_cfg |
|
|
|
|
|
if not os.path.exists(self.record_cfg['save_video_path']): |
|
os.makedirs(self.record_cfg['save_video_path']) |
|
|
|
|
|
if hasattr(self, 'video_writer'): |
|
self.video_writer.close() |
|
|
|
|
|
self.video_writer = imageio.get_writer(os.path.join(self.record_cfg['save_video_path'], |
|
f"{video_filename}.mp4"), |
|
fps=self.record_cfg['fps'], |
|
format='FFMPEG', |
|
codec='h264',) |
|
p.setRealTimeSimulation(False) |
|
self.save_video = True |
|
|
|
def end_rec(self): |
|
if hasattr(self, 'video_writer'): |
|
self.video_writer.close() |
|
|
|
p.setRealTimeSimulation(True) |
|
self.save_video = False |
|
|
|
def add_video_frame(self): |
|
|
|
config = self.agent_cams[0] |
|
image_size = (self.record_cfg['video_height'], self.record_cfg['video_width']) |
|
color, depth, _ = self.render_camera(config, image_size, shadow=0) |
|
color = np.array(color) |
|
|
|
if hasattr(self.record_cfg, 'blender_render') and self.record_cfg['blender_render']: |
|
|
|
self.blender_recorder.add_keyframe() |
|
|
|
|
|
if self.record_cfg['add_text']: |
|
lang_goal = self.get_lang_goal() |
|
reward = f"Success: {self.task.get_reward():.3f}" |
|
|
|
font = cv2.FONT_HERSHEY_DUPLEX |
|
font_scale = 0.65 |
|
font_thickness = 1 |
|
|
|
|
|
line_length = 60 |
|
for i in range(len(lang_goal) // line_length + 1): |
|
lang_textsize = cv2.getTextSize(lang_goal[i*line_length:(i+1)*line_length], font, font_scale, font_thickness)[0] |
|
lang_textX = (image_size[1] - lang_textsize[0]) // 2 |
|
color = cv2.putText(color, lang_goal[i*line_length:(i+1)*line_length], org=(lang_textX, 570+i*30), |
|
fontScale=font_scale, |
|
fontFace=font, |
|
color=(0, 0, 0), |
|
thickness=font_thickness, lineType=cv2.LINE_AA) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
color = np.array(color) |
|
|
|
if 'add_task_text' in self.record_cfg and self.record_cfg['add_task_text']: |
|
lang_goal = self.get_task_name() |
|
reward = f"Success: {self.task.get_reward():.3f}" |
|
|
|
font = cv2.FONT_HERSHEY_DUPLEX |
|
font_scale = 1 |
|
font_thickness = 2 |
|
|
|
|
|
lang_textsize = cv2.getTextSize(lang_goal, font, font_scale, font_thickness)[0] |
|
lang_textX = (image_size[1] - lang_textsize[0]) // 2 |
|
|
|
color = cv2.putText(color, lang_goal, org=(lang_textX, 600), |
|
fontScale=font_scale, |
|
fontFace=font, |
|
color=(255, 0, 0), |
|
thickness=font_thickness, lineType=cv2.LINE_AA) |
|
|
|
color = np.array(color) |
|
|
|
self.curr_video.append(color) |
|
self.video_writer.append_data(color) |
|
|
|
def movep(self, pose, speed=0.01): |
|
"""Move UR5 to target end effector pose.""" |
|
targj = self.solve_ik(pose) |
|
return self.movej(targj, speed) |
|
|
|
def solve_ik(self, pose): |
|
"""Calculate joint configuration with inverse kinematics.""" |
|
joints = p.calculateInverseKinematics( |
|
bodyUniqueId=self.ur5, |
|
endEffectorLinkIndex=self.ee_tip, |
|
targetPosition=pose[0], |
|
targetOrientation=pose[1], |
|
lowerLimits=[-3 * np.pi / 2, -2.3562, -17, -17, -17, -17], |
|
upperLimits=[-np.pi / 2, 0, 17, 17, 17, 17], |
|
jointRanges=[np.pi, 2.3562, 34, 34, 34, 34], |
|
restPoses=np.float32(self.homej).tolist(), |
|
maxNumIterations=100, |
|
residualThreshold=1e-5) |
|
joints = np.float32(joints) |
|
joints[2:] = (joints[2:] + np.pi) % (2 * np.pi) - np.pi |
|
return joints |
|
|
|
def _get_obs(self): |
|
|
|
obs = {'color': (), 'depth': ()} |
|
for config in self.agent_cams: |
|
color, depth, _ = self.render_camera(config) |
|
obs['color'] += (color,) |
|
obs['depth'] += (depth,) |
|
|
|
return obs |
|
|
|
def get_object_pose(self, obj_id): |
|
return p.getBasePositionAndOrientation(obj_id) |
|
|
|
def get_object_size(self, obj_id): |
|
""" approximate object's size using AABB """ |
|
aabb_min, aabb_max = p.getAABB(obj_id) |
|
|
|
size_x = aabb_max[0] - aabb_min[0] |
|
size_y = aabb_max[1] - aabb_min[1] |
|
size_z = aabb_max[2] - aabb_min[2] |
|
return size_z * size_y * size_x |
|
|
|
|
|
|
|
class EnvironmentNoRotationsWithHeightmap(Environment): |
|
"""Environment that disables any rotations and always passes [0, 0, 0, 1].""" |
|
|
|
def __init__(self, |
|
assets_root, |
|
task=None, |
|
disp=False, |
|
shared_memory=False, |
|
hz=240): |
|
super(EnvironmentNoRotationsWithHeightmap, |
|
self).__init__(assets_root, task, disp, shared_memory, hz) |
|
|
|
heightmap_tuple = [ |
|
gym.spaces.Box(0.0, 20.0, (320, 160, 3), dtype=np.float32), |
|
gym.spaces.Box(0.0, 20.0, (320, 160), dtype=np.float32), |
|
] |
|
self.observation_space = gym.spaces.Dict({ |
|
'heightmap': gym.spaces.Tuple(heightmap_tuple), |
|
}) |
|
self.action_space = gym.spaces.Dict({ |
|
'pose0': gym.spaces.Tuple((self.position_bounds,)), |
|
'pose1': gym.spaces.Tuple((self.position_bounds,)) |
|
}) |
|
|
|
def step(self, action=None): |
|
"""Execute action with specified primitive. |
|
|
|
Args: |
|
action: action to execute. |
|
|
|
Returns: |
|
(obs, reward, done, info) tuple containing MDP step data. |
|
""" |
|
if action is not None: |
|
action = { |
|
'pose0': (action['pose0'][0], [0., 0., 0., 1.]), |
|
'pose1': (action['pose1'][0], [0., 0., 0., 1.]), |
|
} |
|
return super(EnvironmentNoRotationsWithHeightmap, self).step(action) |
|
|
|
def _get_obs(self): |
|
obs = {} |
|
|
|
color_depth_obs = {'color': (), 'depth': ()} |
|
for config in self.agent_cams: |
|
color, depth, _ = self.render_camera(config) |
|
color_depth_obs['color'] += (color,) |
|
color_depth_obs['depth'] += (depth,) |
|
cmap, hmap = utils.get_fused_heightmap(color_depth_obs, self.agent_cams, |
|
self.task.bounds, pix_size=0.003125) |
|
obs['heightmap'] = (cmap, hmap) |
|
return obs |
|
|
|
|