|
from typing import List |
|
|
|
import copy |
|
import numpy as np |
|
import gym |
|
from gym import spaces |
|
from gym.utils import seeding |
|
|
|
from ding.envs import BaseEnvTimestep |
|
from ding.utils import ENV_REGISTRY |
|
|
|
|
|
@ENV_REGISTRY.register('maze') |
|
class Maze(gym.Env): |
|
""" |
|
Environment with random maze layouts. The ASCII representation of the mazes include the following objects: |
|
- `<SPACE>`: empty |
|
- `x`: wall |
|
- `S`: the start location (optional) |
|
- `T`: the target location. |
|
""" |
|
KEY_EMPTY = 0 |
|
KEY_WALL = 1 |
|
KEY_TARGET = 2 |
|
KEY_START = 3 |
|
ASCII_MAP = { |
|
KEY_EMPTY: ' ', |
|
KEY_WALL: 'x', |
|
KEY_TARGET: 'T', |
|
KEY_START: 'S', |
|
} |
|
|
|
def __init__( |
|
self, |
|
cfg, |
|
): |
|
self._size = cfg.size |
|
self._init_flag = False |
|
self._random_start = True |
|
self._seed = None |
|
self._step = 0 |
|
|
|
def reset(self): |
|
self.active_init() |
|
obs = self._get_obs() |
|
self._step = 0 |
|
return self.process_states(obs, self.get_maze_map()) |
|
|
|
def seed(self, seed: int, dynamic_seed: bool = True) -> None: |
|
self._seed = seed |
|
self._dynamic_seed = dynamic_seed |
|
np.random.seed(self._seed) |
|
|
|
def active_init( |
|
self, |
|
tabular_obs=False, |
|
reward_fn=lambda x, y, tx, ty: 1 if (x == tx and y == ty) else 0, |
|
done_fn=lambda x, y, tx, ty: x == tx and y == ty |
|
): |
|
self._maze = self.generate_maze(self.size, self._seed, 'tunnel') |
|
self._num_maze_keys = len(Maze.ASCII_MAP.keys()) |
|
nav_map = self.maze_to_ascii(self._maze) |
|
self._map = nav_map |
|
self._tabular_obs = tabular_obs |
|
self._reward_fn = reward_fn |
|
self._done_fn = done_fn |
|
if self._reward_fn is None: |
|
self._reward_fn = lambda x, y, tx, ty: float(x == tx and y == ty) |
|
if self._done_fn is None: |
|
self._done_fn = lambda x, y, tx, ty: False |
|
|
|
self._max_x = len(self._map) |
|
if not self._max_x: |
|
raise ValueError('Invalid map.') |
|
self._max_y = len(self._map[0]) |
|
if not all(len(m) == self._max_y for m in self._map): |
|
raise ValueError('Invalid map.') |
|
self._start_x, self._start_y = self._find_initial_point() |
|
self._target_x, self._target_y = self._find_target_point() |
|
self._x, self._y = self._start_x, self._start_y |
|
|
|
self._n_state = self._max_x * self._max_y |
|
self._n_action = 4 |
|
|
|
if self._tabular_obs: |
|
self.observation_space = spaces.Discrete(self._n_state) |
|
else: |
|
self.observation_space = spaces.Box(low=0.0, high=np.inf, shape=(16, 16, 3)) |
|
|
|
self.action_space = spaces.Discrete(self._n_action) |
|
self.reward_space = spaces.Box(low=0, high=1, shape=(1, ), dtype=np.float32) |
|
|
|
def random_start(self): |
|
init_x, init_y = self._x, self._y |
|
while True: |
|
self._x = self.np_random.integers(self._max_x) |
|
self._y = self.np_random.integers(self._max_y) |
|
if self._map[self._x][self._y] != 'x': |
|
break |
|
ret = copy.deepcopy(self.process_states(self._get_obs(), self.get_maze_map())) |
|
self._x, self._y = init_x, init_y |
|
return ret |
|
|
|
def close(self) -> None: |
|
if self._init_flag: |
|
self._env.close() |
|
self._init_flag = False |
|
|
|
@property |
|
def num_maze_keys(self): |
|
return self._num_maze_keys |
|
|
|
@property |
|
def size(self): |
|
return self._size |
|
|
|
def process_states(self, observations, maze_maps): |
|
"""Returns [B, W, W, 3] binary values. Channels are (wall; goal; obs)""" |
|
loc = np.eye(self._size * self._size, dtype=np.int64)[observations[0] * self._size + observations[1]] |
|
loc = np.reshape(loc, [self._size, self._size]) |
|
maze_maps = maze_maps.astype(np.int64) |
|
|
|
states = np.concatenate([maze_maps, loc[Ellipsis, None]], axis=-1, dtype=np.int64) |
|
return states |
|
|
|
def get_maze_map(self, stacked=True): |
|
if not stacked: |
|
return self._maze.copy() |
|
wall = self._maze.copy() |
|
target_x, target_y = self.target_location |
|
assert wall[target_x][target_y] == Maze.KEY_TARGET |
|
wall[target_x][target_y] = 0 |
|
target = np.zeros((self._size, self._size)) |
|
target[target_x][target_y] = 1 |
|
assert wall[self._start_x][self._start_y] == Maze.KEY_START |
|
wall[self._start_x][self._start_y] = 0 |
|
return np.stack([wall, target], axis=-1) |
|
|
|
def generate_maze(self, size, seed, wall_type): |
|
rng, _ = seeding.np_random(seed) |
|
maze = np.full((size, size), fill_value=Maze.KEY_EMPTY, dtype=int) |
|
|
|
if wall_type == 'none': |
|
maze[[0, -1], :] = Maze.KEY_WALL |
|
maze[:, [0, -1]] = Maze.KEY_WALL |
|
elif wall_type == 'tunnel': |
|
self.sample_wall(maze, rng) |
|
elif wall_type.startswith('blocks:'): |
|
maze[[0, -1], :] = Maze.KEY_WALL |
|
maze[:, [0, -1]] = Maze.KEY_WALL |
|
self.sample_blocks(maze, rng, int(wall_type.split(':')[-1])) |
|
else: |
|
raise ValueError('Unknown wall type: %s' % wall_type) |
|
|
|
loc_target = self.sample_location(maze, rng) |
|
maze[loc_target] = Maze.KEY_TARGET |
|
|
|
loc_start = self.sample_location(maze, rng) |
|
maze[loc_start] = Maze.KEY_START |
|
self._start_x, self._start_y = loc_start |
|
|
|
return maze |
|
|
|
def sample_blocks(self, maze, rng, num_blocks): |
|
"""Sample single-block 'wall' or 'obstacles'.""" |
|
for _ in range(num_blocks): |
|
loc = self.sample_location(maze, rng) |
|
maze[loc] = Maze.KEY_WALL |
|
|
|
def sample_wall( |
|
self, maze, rng, shortcut_prob=0.1, inner_wall_thickness=1, outer_wall_thickness=1, corridor_thickness=2 |
|
): |
|
room = maze |
|
|
|
|
|
room[:] = Maze.KEY_WALL |
|
|
|
|
|
|
|
delta = inner_wall_thickness + corridor_thickness |
|
dx = [delta, -delta, 0, 0] |
|
dy = [0, 0, delta, -delta] |
|
|
|
def get_loc_type(y, x): |
|
|
|
if (y < outer_wall_thickness or y + corridor_thickness - 1 >= room.shape[0] - outer_wall_thickness): |
|
return 'invalid' |
|
if (x < outer_wall_thickness or x + corridor_thickness - 1 >= room.shape[1] - outer_wall_thickness): |
|
return 'invalid' |
|
|
|
if room[y, x] == Maze.KEY_EMPTY: |
|
return 'occupied' |
|
return 'valid' |
|
|
|
def connect_pixel(y, x, ny, nx): |
|
pixel = Maze.KEY_EMPTY |
|
if ny == y: |
|
room[y:y + corridor_thickness, min(x, nx):max(x, nx) + corridor_thickness] = pixel |
|
else: |
|
room[min(y, ny):max(y, ny) + corridor_thickness, x:x + corridor_thickness] = pixel |
|
|
|
def carve_passage_from(y, x): |
|
room[y, x] = Maze.KEY_EMPTY |
|
for direction in rng.permutation(len(dx)): |
|
ny = y + dy[direction] |
|
nx = x + dx[direction] |
|
|
|
loc_type = get_loc_type(ny, nx) |
|
if loc_type == 'invalid': |
|
continue |
|
elif loc_type == 'valid': |
|
connect_pixel(y, x, ny, nx) |
|
|
|
carve_passage_from(ny, nx) |
|
else: |
|
|
|
|
|
|
|
if rng.random() < shortcut_prob: |
|
connect_pixel(y, x, ny, nx) |
|
|
|
carve_passage_from(outer_wall_thickness, outer_wall_thickness) |
|
|
|
def sample_location(self, maze, rng): |
|
for _ in range(1000): |
|
x, y = rng.integers(low=1, high=self._size, size=2) |
|
if maze[x, y] == Maze.KEY_EMPTY: |
|
return x, y |
|
raise ValueError('Cannot sample empty location, make maze bigger?') |
|
|
|
@staticmethod |
|
def key_to_ascii(key): |
|
if key in Maze.ASCII_MAP: |
|
return Maze.ASCII_MAP[key] |
|
assert (Maze.KEY_OBJ <= key < Maze.KEY_OBJ + Maze.MAX_OBJ_TYPES) |
|
return chr(ord('1') + key - Maze.KEY_OBJ) |
|
|
|
def maze_to_ascii(self, maze): |
|
return [[Maze.key_to_ascii(x) for x in row] for row in maze] |
|
|
|
def tabular_obs_action(self, status_obs, action, include_maze_layout=False): |
|
tabular_obs = self.get_tabular_obs(status_obs) |
|
multiplier = self._n_action |
|
if include_maze_layout: |
|
multiplier += self._num_maze_keys |
|
return multiplier * tabular_obs + action |
|
|
|
@staticmethod |
|
def create_collector_env_cfg(cfg: dict) -> List[dict]: |
|
collector_env_num = cfg.pop('collector_env_num') |
|
cfg = copy.deepcopy(cfg) |
|
cfg.is_train = True |
|
return [cfg for _ in range(collector_env_num)] |
|
|
|
@staticmethod |
|
def create_evaluator_env_cfg(cfg: dict) -> List[dict]: |
|
evaluator_env_num = cfg.pop('evaluator_env_num') |
|
cfg = copy.deepcopy(cfg) |
|
cfg.is_train = False |
|
return [cfg for _ in range(evaluator_env_num)] |
|
|
|
@property |
|
def nav_map(self): |
|
return self._map |
|
|
|
@property |
|
def n_state(self): |
|
return self._n_state |
|
|
|
@property |
|
def n_action(self): |
|
return self._n_action |
|
|
|
@property |
|
def target_location(self): |
|
return self._target_x, self._target_y |
|
|
|
@property |
|
def tabular_obs(self): |
|
return self._tabular_obs |
|
|
|
def _find_initial_point(self): |
|
for x in range(self._max_x): |
|
for y in range(self._max_y): |
|
if self._map[x][y] == 'S': |
|
break |
|
if self._map[x][y] == 'S': |
|
break |
|
else: |
|
return None, None |
|
|
|
return x, y |
|
|
|
def _find_target_point(self): |
|
for x in range(self._max_x): |
|
for y in range(self._max_y): |
|
if self._map[x][y] == 'T': |
|
break |
|
if self._map[x][y] == 'T': |
|
break |
|
else: |
|
raise ValueError('Target point not found in map.') |
|
|
|
return x, y |
|
|
|
def _get_obs(self): |
|
if self._tabular_obs: |
|
return self._x * self._max_y + self._y |
|
else: |
|
return np.array([self._x, self._y]) |
|
|
|
def get_tabular_obs(self, status_obs): |
|
return self._max_y * status_obs[..., 0] + status_obs[..., 1] |
|
|
|
def get_xy(self, state): |
|
x = state / self._max_y |
|
y = state % self._max_y |
|
return x, y |
|
|
|
def step(self, action): |
|
last_x, last_y = self._x, self._y |
|
if action == 0: |
|
if self._x < self._max_x - 1: |
|
self._x += 1 |
|
elif action == 1: |
|
if self._y < self._max_y - 1: |
|
self._y += 1 |
|
elif action == 2: |
|
if self._x > 0: |
|
self._x -= 1 |
|
elif action == 3: |
|
if self._y > 0: |
|
self._y -= 1 |
|
|
|
if self._map[self._x][self._y] == 'x': |
|
self._x, self._y = last_x, last_y |
|
self._step += 1 |
|
reward = self._reward_fn(self._x, self._y, self._target_x, self._target_y) |
|
done = self._done_fn(self._x, self._y, self._target_x, self._target_y) |
|
info = {} |
|
if self._step > 100: |
|
done = True |
|
if done: |
|
info['final_eval_reward'] = reward |
|
info['eval_episode_return'] = reward |
|
return BaseEnvTimestep(self.process_states(self._get_obs(), self.get_maze_map()), reward, done, info) |
|
|
|
|
|
def get_value_map(env): |
|
"""Returns [W, W, A] one-hot VI actions.""" |
|
target_location = env.target_location |
|
nav_map = env.nav_map |
|
current_points = [target_location] |
|
chosen_actions = {target_location: 0} |
|
visited_points = {target_location: True} |
|
|
|
while current_points: |
|
next_points = [] |
|
for point_x, point_y in current_points: |
|
for (action, (next_point_x, next_point_y)) in [(0, (point_x - 1, point_y)), (1, (point_x, point_y - 1)), |
|
(2, (point_x + 1, point_y)), (3, (point_x, point_y + 1))]: |
|
|
|
if (next_point_x, next_point_y) in visited_points: |
|
continue |
|
|
|
if not (0 <= next_point_x < len(nav_map) and 0 <= next_point_y < len(nav_map[next_point_x])): |
|
continue |
|
|
|
if nav_map[next_point_x][next_point_y] == 'x': |
|
continue |
|
|
|
next_points.append((next_point_x, next_point_y)) |
|
visited_points[(next_point_x, next_point_y)] = True |
|
chosen_actions[(next_point_x, next_point_y)] = action |
|
current_points = next_points |
|
|
|
value_map = np.zeros([env.size, env.size, env.n_action]) |
|
for (x, y), action in chosen_actions.items(): |
|
value_map[x][y][action] = 1 |
|
return value_map |
|
|