Spaces:
Running
Running
# Copyright 2018 The TensorFlow Authors All Rights Reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
# ============================================================================== | |
"""Adapted from rllab maze_env.py.""" | |
import os | |
import tempfile | |
import xml.etree.ElementTree as ET | |
import math | |
import numpy as np | |
import gym | |
from environments import maze_env_utils | |
# Directory that contains mujoco xml files. | |
MODEL_DIR = 'environments/assets' | |
class MazeEnv(gym.Env): | |
MODEL_CLASS = None | |
MAZE_HEIGHT = None | |
MAZE_SIZE_SCALING = None | |
def __init__( | |
self, | |
maze_id=None, | |
maze_height=0.5, | |
maze_size_scaling=8, | |
n_bins=0, | |
sensor_range=3., | |
sensor_span=2 * math.pi, | |
observe_blocks=False, | |
put_spin_near_agent=False, | |
top_down_view=False, | |
manual_collision=False, | |
*args, | |
**kwargs): | |
self._maze_id = maze_id | |
model_cls = self.__class__.MODEL_CLASS | |
if model_cls is None: | |
raise "MODEL_CLASS unspecified!" | |
xml_path = os.path.join(MODEL_DIR, model_cls.FILE) | |
tree = ET.parse(xml_path) | |
worldbody = tree.find(".//worldbody") | |
self.MAZE_HEIGHT = height = maze_height | |
self.MAZE_SIZE_SCALING = size_scaling = maze_size_scaling | |
self._n_bins = n_bins | |
self._sensor_range = sensor_range * size_scaling | |
self._sensor_span = sensor_span | |
self._observe_blocks = observe_blocks | |
self._put_spin_near_agent = put_spin_near_agent | |
self._top_down_view = top_down_view | |
self._manual_collision = manual_collision | |
self.MAZE_STRUCTURE = structure = maze_env_utils.construct_maze(maze_id=self._maze_id) | |
self.elevated = any(-1 in row for row in structure) # Elevate the maze to allow for falling. | |
self.blocks = any( | |
any(maze_env_utils.can_move(r) for r in row) | |
for row in structure) # Are there any movable blocks? | |
torso_x, torso_y = self._find_robot() | |
self._init_torso_x = torso_x | |
self._init_torso_y = torso_y | |
self._init_positions = [ | |
(x - torso_x, y - torso_y) | |
for x, y in self._find_all_robots()] | |
self._xy_to_rowcol = lambda x, y: (2 + (y + size_scaling / 2) / size_scaling, | |
2 + (x + size_scaling / 2) / size_scaling) | |
self._view = np.zeros([5, 5, 3]) # walls (immovable), chasms (fall), movable blocks | |
height_offset = 0. | |
if self.elevated: | |
# Increase initial z-pos of ant. | |
height_offset = height * size_scaling | |
torso = tree.find(".//body[@name='torso']") | |
torso.set('pos', '0 0 %.2f' % (0.75 + height_offset)) | |
if self.blocks: | |
# If there are movable blocks, change simulation settings to perform | |
# better contact detection. | |
default = tree.find(".//default") | |
default.find('.//geom').set('solimp', '.995 .995 .01') | |
self.movable_blocks = [] | |
for i in range(len(structure)): | |
for j in range(len(structure[0])): | |
struct = structure[i][j] | |
if struct == 'r' and self._put_spin_near_agent: | |
struct = maze_env_utils.Move.SpinXY | |
if self.elevated and struct not in [-1]: | |
# Create elevated platform. | |
ET.SubElement( | |
worldbody, "geom", | |
name="elevated_%d_%d" % (i, j), | |
pos="%f %f %f" % (j * size_scaling - torso_x, | |
i * size_scaling - torso_y, | |
height / 2 * size_scaling), | |
size="%f %f %f" % (0.5 * size_scaling, | |
0.5 * size_scaling, | |
height / 2 * size_scaling), | |
type="box", | |
material="", | |
contype="1", | |
conaffinity="1", | |
rgba="0.9 0.9 0.9 1", | |
) | |
if struct == 1: # Unmovable block. | |
# Offset all coordinates so that robot starts at the origin. | |
ET.SubElement( | |
worldbody, "geom", | |
name="block_%d_%d" % (i, j), | |
pos="%f %f %f" % (j * size_scaling - torso_x, | |
i * size_scaling - torso_y, | |
height_offset + | |
height / 2 * size_scaling), | |
size="%f %f %f" % (0.5 * size_scaling, | |
0.5 * size_scaling, | |
height / 2 * size_scaling), | |
type="box", | |
material="", | |
contype="1", | |
conaffinity="1", | |
rgba="0.4 0.4 0.4 1", | |
) | |
elif maze_env_utils.can_move(struct): # Movable block. | |
# The "falling" blocks are shrunk slightly and increased in mass to | |
# ensure that it can fall easily through a gap in the platform blocks. | |
name = "movable_%d_%d" % (i, j) | |
self.movable_blocks.append((name, struct)) | |
falling = maze_env_utils.can_move_z(struct) | |
spinning = maze_env_utils.can_spin(struct) | |
x_offset = 0.25 * size_scaling if spinning else 0.0 | |
y_offset = 0.0 | |
shrink = 0.1 if spinning else 0.99 if falling else 1.0 | |
height_shrink = 0.1 if spinning else 1.0 | |
movable_body = ET.SubElement( | |
worldbody, "body", | |
name=name, | |
pos="%f %f %f" % (j * size_scaling - torso_x + x_offset, | |
i * size_scaling - torso_y + y_offset, | |
height_offset + | |
height / 2 * size_scaling * height_shrink), | |
) | |
ET.SubElement( | |
movable_body, "geom", | |
name="block_%d_%d" % (i, j), | |
pos="0 0 0", | |
size="%f %f %f" % (0.5 * size_scaling * shrink, | |
0.5 * size_scaling * shrink, | |
height / 2 * size_scaling * height_shrink), | |
type="box", | |
material="", | |
mass="0.001" if falling else "0.0002", | |
contype="1", | |
conaffinity="1", | |
rgba="0.9 0.1 0.1 1" | |
) | |
if maze_env_utils.can_move_x(struct): | |
ET.SubElement( | |
movable_body, "joint", | |
armature="0", | |
axis="1 0 0", | |
damping="0.0", | |
limited="true" if falling else "false", | |
range="%f %f" % (-size_scaling, size_scaling), | |
margin="0.01", | |
name="movable_x_%d_%d" % (i, j), | |
pos="0 0 0", | |
type="slide" | |
) | |
if maze_env_utils.can_move_y(struct): | |
ET.SubElement( | |
movable_body, "joint", | |
armature="0", | |
axis="0 1 0", | |
damping="0.0", | |
limited="true" if falling else "false", | |
range="%f %f" % (-size_scaling, size_scaling), | |
margin="0.01", | |
name="movable_y_%d_%d" % (i, j), | |
pos="0 0 0", | |
type="slide" | |
) | |
if maze_env_utils.can_move_z(struct): | |
ET.SubElement( | |
movable_body, "joint", | |
armature="0", | |
axis="0 0 1", | |
damping="0.0", | |
limited="true", | |
range="%f 0" % (-height_offset), | |
margin="0.01", | |
name="movable_z_%d_%d" % (i, j), | |
pos="0 0 0", | |
type="slide" | |
) | |
if maze_env_utils.can_spin(struct): | |
ET.SubElement( | |
movable_body, "joint", | |
armature="0", | |
axis="0 0 1", | |
damping="0.0", | |
limited="false", | |
name="spinable_%d_%d" % (i, j), | |
pos="0 0 0", | |
type="ball" | |
) | |
torso = tree.find(".//body[@name='torso']") | |
geoms = torso.findall(".//geom") | |
for geom in geoms: | |
if 'name' not in geom.attrib: | |
raise Exception("Every geom of the torso must have a name " | |
"defined") | |
_, file_path = tempfile.mkstemp(text=True, suffix='.xml') | |
tree.write(file_path) | |
self.wrapped_env = model_cls(*args, file_path=file_path, **kwargs) | |
def get_ori(self): | |
return self.wrapped_env.get_ori() | |
def get_top_down_view(self): | |
self._view = np.zeros_like(self._view) | |
def valid(row, col): | |
return self._view.shape[0] > row >= 0 and self._view.shape[1] > col >= 0 | |
def update_view(x, y, d, row=None, col=None): | |
if row is None or col is None: | |
x = x - self._robot_x | |
y = y - self._robot_y | |
th = self._robot_ori | |
row, col = self._xy_to_rowcol(x, y) | |
update_view(x, y, d, row=row, col=col) | |
return | |
row, row_frac, col, col_frac = int(row), row % 1, int(col), col % 1 | |
if row_frac < 0: | |
row_frac += 1 | |
if col_frac < 0: | |
col_frac += 1 | |
if valid(row, col): | |
self._view[row, col, d] += ( | |
(min(1., row_frac + 0.5) - max(0., row_frac - 0.5)) * | |
(min(1., col_frac + 0.5) - max(0., col_frac - 0.5))) | |
if valid(row - 1, col): | |
self._view[row - 1, col, d] += ( | |
(max(0., 0.5 - row_frac)) * | |
(min(1., col_frac + 0.5) - max(0., col_frac - 0.5))) | |
if valid(row + 1, col): | |
self._view[row + 1, col, d] += ( | |
(max(0., row_frac - 0.5)) * | |
(min(1., col_frac + 0.5) - max(0., col_frac - 0.5))) | |
if valid(row, col - 1): | |
self._view[row, col - 1, d] += ( | |
(min(1., row_frac + 0.5) - max(0., row_frac - 0.5)) * | |
(max(0., 0.5 - col_frac))) | |
if valid(row, col + 1): | |
self._view[row, col + 1, d] += ( | |
(min(1., row_frac + 0.5) - max(0., row_frac - 0.5)) * | |
(max(0., col_frac - 0.5))) | |
if valid(row - 1, col - 1): | |
self._view[row - 1, col - 1, d] += ( | |
(max(0., 0.5 - row_frac)) * max(0., 0.5 - col_frac)) | |
if valid(row - 1, col + 1): | |
self._view[row - 1, col + 1, d] += ( | |
(max(0., 0.5 - row_frac)) * max(0., col_frac - 0.5)) | |
if valid(row + 1, col + 1): | |
self._view[row + 1, col + 1, d] += ( | |
(max(0., row_frac - 0.5)) * max(0., col_frac - 0.5)) | |
if valid(row + 1, col - 1): | |
self._view[row + 1, col - 1, d] += ( | |
(max(0., row_frac - 0.5)) * max(0., 0.5 - col_frac)) | |
# Draw ant. | |
robot_x, robot_y = self.wrapped_env.get_body_com("torso")[:2] | |
self._robot_x = robot_x | |
self._robot_y = robot_y | |
self._robot_ori = self.get_ori() | |
structure = self.MAZE_STRUCTURE | |
size_scaling = self.MAZE_SIZE_SCALING | |
height = self.MAZE_HEIGHT | |
# Draw immovable blocks and chasms. | |
for i in range(len(structure)): | |
for j in range(len(structure[0])): | |
if structure[i][j] == 1: # Wall. | |
update_view(j * size_scaling - self._init_torso_x, | |
i * size_scaling - self._init_torso_y, | |
0) | |
if structure[i][j] == -1: # Chasm. | |
update_view(j * size_scaling - self._init_torso_x, | |
i * size_scaling - self._init_torso_y, | |
1) | |
# Draw movable blocks. | |
for block_name, block_type in self.movable_blocks: | |
block_x, block_y = self.wrapped_env.get_body_com(block_name)[:2] | |
update_view(block_x, block_y, 2) | |
return self._view | |
def get_range_sensor_obs(self): | |
"""Returns egocentric range sensor observations of maze.""" | |
robot_x, robot_y, robot_z = self.wrapped_env.get_body_com("torso")[:3] | |
ori = self.get_ori() | |
structure = self.MAZE_STRUCTURE | |
size_scaling = self.MAZE_SIZE_SCALING | |
height = self.MAZE_HEIGHT | |
segments = [] | |
# Get line segments (corresponding to outer boundary) of each immovable | |
# block or drop-off. | |
for i in range(len(structure)): | |
for j in range(len(structure[0])): | |
if structure[i][j] in [1, -1]: # There's a wall or drop-off. | |
cx = j * size_scaling - self._init_torso_x | |
cy = i * size_scaling - self._init_torso_y | |
x1 = cx - 0.5 * size_scaling | |
x2 = cx + 0.5 * size_scaling | |
y1 = cy - 0.5 * size_scaling | |
y2 = cy + 0.5 * size_scaling | |
struct_segments = [ | |
((x1, y1), (x2, y1)), | |
((x2, y1), (x2, y2)), | |
((x2, y2), (x1, y2)), | |
((x1, y2), (x1, y1)), | |
] | |
for seg in struct_segments: | |
segments.append(dict( | |
segment=seg, | |
type=structure[i][j], | |
)) | |
# Get line segments (corresponding to outer boundary) of each movable | |
# block within the agent's z-view. | |
for block_name, block_type in self.movable_blocks: | |
block_x, block_y, block_z = self.wrapped_env.get_body_com(block_name)[:3] | |
if (block_z + height * size_scaling / 2 >= robot_z and | |
robot_z >= block_z - height * size_scaling / 2): # Block in view. | |
x1 = block_x - 0.5 * size_scaling | |
x2 = block_x + 0.5 * size_scaling | |
y1 = block_y - 0.5 * size_scaling | |
y2 = block_y + 0.5 * size_scaling | |
struct_segments = [ | |
((x1, y1), (x2, y1)), | |
((x2, y1), (x2, y2)), | |
((x2, y2), (x1, y2)), | |
((x1, y2), (x1, y1)), | |
] | |
for seg in struct_segments: | |
segments.append(dict( | |
segment=seg, | |
type=block_type, | |
)) | |
sensor_readings = np.zeros((self._n_bins, 3)) # 3 for wall, drop-off, block | |
for ray_idx in range(self._n_bins): | |
ray_ori = (ori - self._sensor_span * 0.5 + | |
(2 * ray_idx + 1.0) / (2 * self._n_bins) * self._sensor_span) | |
ray_segments = [] | |
# Get all segments that intersect with ray. | |
for seg in segments: | |
p = maze_env_utils.ray_segment_intersect( | |
ray=((robot_x, robot_y), ray_ori), | |
segment=seg["segment"]) | |
if p is not None: | |
ray_segments.append(dict( | |
segment=seg["segment"], | |
type=seg["type"], | |
ray_ori=ray_ori, | |
distance=maze_env_utils.point_distance(p, (robot_x, robot_y)), | |
)) | |
if len(ray_segments) > 0: | |
# Find out which segment is intersected first. | |
first_seg = sorted(ray_segments, key=lambda x: x["distance"])[0] | |
seg_type = first_seg["type"] | |
idx = (0 if seg_type == 1 else # Wall. | |
1 if seg_type == -1 else # Drop-off. | |
2 if maze_env_utils.can_move(seg_type) else # Block. | |
None) | |
if first_seg["distance"] <= self._sensor_range: | |
sensor_readings[ray_idx][idx] = (self._sensor_range - first_seg["distance"]) / self._sensor_range | |
return sensor_readings | |
def _get_obs(self): | |
wrapped_obs = self.wrapped_env._get_obs() | |
if self._top_down_view: | |
view = [self.get_top_down_view().flat] | |
else: | |
view = [] | |
if self._observe_blocks: | |
additional_obs = [] | |
for block_name, block_type in self.movable_blocks: | |
additional_obs.append(self.wrapped_env.get_body_com(block_name)) | |
wrapped_obs = np.concatenate([wrapped_obs[:3]] + additional_obs + | |
[wrapped_obs[3:]]) | |
range_sensor_obs = self.get_range_sensor_obs() | |
return np.concatenate([wrapped_obs, | |
range_sensor_obs.flat] + | |
view + [[self.t * 0.001]]) | |
def reset(self): | |
self.t = 0 | |
self.trajectory = [] | |
self.wrapped_env.reset() | |
if len(self._init_positions) > 1: | |
xy = random.choice(self._init_positions) | |
self.wrapped_env.set_xy(xy) | |
return self._get_obs() | |
def viewer(self): | |
return self.wrapped_env.viewer | |
def render(self, *args, **kwargs): | |
return self.wrapped_env.render(*args, **kwargs) | |
def observation_space(self): | |
shape = self._get_obs().shape | |
high = np.inf * np.ones(shape) | |
low = -high | |
return gym.spaces.Box(low, high) | |
def action_space(self): | |
return self.wrapped_env.action_space | |
def _find_robot(self): | |
structure = self.MAZE_STRUCTURE | |
size_scaling = self.MAZE_SIZE_SCALING | |
for i in range(len(structure)): | |
for j in range(len(structure[0])): | |
if structure[i][j] == 'r': | |
return j * size_scaling, i * size_scaling | |
assert False, 'No robot in maze specification.' | |
def _find_all_robots(self): | |
structure = self.MAZE_STRUCTURE | |
size_scaling = self.MAZE_SIZE_SCALING | |
coords = [] | |
for i in range(len(structure)): | |
for j in range(len(structure[0])): | |
if structure[i][j] == 'r': | |
coords.append((j * size_scaling, i * size_scaling)) | |
return coords | |
def _is_in_collision(self, pos): | |
x, y = pos | |
structure = self.MAZE_STRUCTURE | |
size_scaling = self.MAZE_SIZE_SCALING | |
for i in range(len(structure)): | |
for j in range(len(structure[0])): | |
if structure[i][j] == 1: | |
minx = j * size_scaling - size_scaling * 0.5 - self._init_torso_x | |
maxx = j * size_scaling + size_scaling * 0.5 - self._init_torso_x | |
miny = i * size_scaling - size_scaling * 0.5 - self._init_torso_y | |
maxy = i * size_scaling + size_scaling * 0.5 - self._init_torso_y | |
if minx <= x <= maxx and miny <= y <= maxy: | |
return True | |
return False | |
def step(self, action): | |
self.t += 1 | |
if self._manual_collision: | |
old_pos = self.wrapped_env.get_xy() | |
inner_next_obs, inner_reward, done, info = self.wrapped_env.step(action) | |
new_pos = self.wrapped_env.get_xy() | |
if self._is_in_collision(new_pos): | |
self.wrapped_env.set_xy(old_pos) | |
else: | |
inner_next_obs, inner_reward, done, info = self.wrapped_env.step(action) | |
next_obs = self._get_obs() | |
done = False | |
return next_obs, inner_reward, done, info | |