zjowowen's picture
init space
079c32c
from typing import Any, Union, List
from collections import namedtuple
from easydict import EasyDict
import gym
import copy
import numpy as np
from overcooked_ai_py.mdp.actions import Action, Direction
from overcooked_ai_py.mdp.overcooked_mdp import PlayerState, OvercookedGridworld, OvercookedState, ObjectState, \
SoupState, Recipe
from overcooked_ai_py.mdp.overcooked_env import OvercookedEnv, DEFAULT_ENV_PARAMS
from ding.envs import BaseEnv
from ding.utils import ENV_REGISTRY, deep_merge_dicts
OvercookEnvTimestep = namedtuple('OvercookEnvTimestep', ['obs', 'reward', 'done', 'info'])
# n, s = Direction.NORTH, Direction.SOUTH
# e, w = Direction.EAST, Direction.WEST
# stay, interact = Action.STAY, Action.INTERACT
# Action.ALL_ACTIONS: [n, s, e, w, stay, interact]
@ENV_REGISTRY.register('overcooked')
class OvercookEnv(BaseEnv):
config = EasyDict(
dict(
env_name="cramped_room",
horizon=400,
concat_obs=False,
action_mask=True,
shape_reward=True,
)
)
def __init__(self, cfg) -> None:
self._cfg = deep_merge_dicts(self.config, cfg)
self._env_name = self._cfg.env_name
self._horizon = self._cfg.horizon
self._concat_obs = self._cfg.concat_obs
self._action_mask = self._cfg.action_mask
self._shape_reward = self._cfg.shape_reward
self.mdp = OvercookedGridworld.from_layout_name(self._env_name)
self.base_env = OvercookedEnv.from_mdp(self.mdp, horizon=self._horizon, info_level=0)
# rightnow overcook environment encoding only support 2 agent game
self.agent_num = 2
self.action_dim = len(Action.ALL_ACTIONS)
self.action_space = gym.spaces.Discrete(len(Action.ALL_ACTIONS))
# set up obs shape
featurize_fn = lambda mdp, state: mdp.lossless_state_encoding(state)
self.featurize_fn = featurize_fn
dummy_mdp = self.base_env.mdp
dummy_state = dummy_mdp.get_standard_start_state()
obs_shape = self.featurize_fn(dummy_mdp, dummy_state)[0].shape # (5, 4, 26)
obs_shape = (obs_shape[-1], *obs_shape[:-1]) # permute channel first
if self._concat_obs:
obs_shape = (obs_shape[0] * 2, *obs_shape[1:])
else:
obs_shape = (2, ) + obs_shape
self.observation_space = gym.spaces.Box(low=0, high=1, shape=obs_shape, dtype=np.int64)
if self._action_mask:
self.observation_space = gym.spaces.Dict(
{
'agent_state': self.observation_space,
'action_mask': gym.spaces.Box(
low=0, high=1, shape=(self.agent_num, self.action_dim), dtype=np.int64
)
}
)
self.reward_space = gym.spaces.Box(low=0, high=100, shape=(1, ), dtype=np.float32)
def seed(self, seed: int, dynamic_seed: bool = True) -> None:
self._seed = seed
self._dynamic_seed = dynamic_seed
np.random.seed(self._seed)
def close(self) -> None:
# Note: the real env instance only has a empty close method, only pas
pass
def random_action(self):
return [self.action_space.sample() for _ in range(self.agent_num)]
def step(self, action):
assert all(self.action_space.contains(a) for a in action), "%r (%s) invalid" % (action, type(action))
agent_action, other_agent_action = [Action.INDEX_TO_ACTION[a] for a in action]
if self.agent_idx == 0:
joint_action = (agent_action, other_agent_action)
else:
joint_action = (other_agent_action, agent_action)
next_state, reward, done, env_info = self.base_env.step(joint_action)
reward = np.array([float(reward)])
self._eval_episode_return += reward
if self._shape_reward:
self._eval_episode_return += sum(env_info['shaped_r_by_agent'])
reward += sum(env_info['shaped_r_by_agent'])
ob_p0, ob_p1 = self.featurize_fn(self.mdp, next_state)
ob_p0, ob_p1 = self.obs_preprocess(ob_p0), self.obs_preprocess(ob_p1)
if self.agent_idx == 0:
both_agents_ob = [ob_p0, ob_p1]
else:
both_agents_ob = [ob_p1, ob_p0]
if self._concat_obs:
both_agents_ob = np.concatenate(both_agents_ob)
else:
both_agents_ob = np.stack(both_agents_ob)
env_info["policy_agent_idx"] = self.agent_idx
env_info["eval_episode_return"] = self._eval_episode_return
env_info["other_agent_env_idx"] = 1 - self.agent_idx
action_mask = self.get_action_mask()
if self._action_mask:
obs = {
"agent_state": both_agents_ob,
# "overcooked_state": self.base_env.state,
"action_mask": action_mask
}
else:
obs = both_agents_ob
return OvercookEnvTimestep(obs, reward, done, env_info)
def obs_preprocess(self, obs):
obs = obs.transpose(2, 0, 1)
return obs
def reset(self):
self.base_env.reset()
self._eval_episode_return = 0
self.mdp = self.base_env.mdp
# random init agent index
self.agent_idx = np.random.choice([0, 1])
ob_p0, ob_p1 = self.featurize_fn(self.mdp, self.base_env.state)
ob_p0, ob_p1 = self.obs_preprocess(ob_p0), self.obs_preprocess(ob_p1)
if self.agent_idx == 0:
both_agents_ob = [ob_p0, ob_p1]
else:
both_agents_ob = [ob_p1, ob_p0]
if self._concat_obs:
both_agents_ob = np.concatenate(both_agents_ob)
else:
both_agents_ob = np.stack(both_agents_ob)
action_mask = self.get_action_mask()
if self._action_mask:
obs = {"agent_state": both_agents_ob, "action_mask": action_mask}
else:
obs = both_agents_ob
return obs
def get_available_actions(self):
return self.mdp.get_actions(self.base_env.state)
def get_action_mask(self):
available_actions = self.get_available_actions()
action_masks = np.zeros((self.agent_num, self.action_dim)).astype(np.int64)
for i in range(self.action_dim):
if Action.INDEX_TO_ACTION[i] in available_actions[0]:
action_masks[0][i] = 1
if Action.INDEX_TO_ACTION[i] in available_actions[1]:
action_masks[1][i] = 1
return action_masks
def __repr__(self):
return "DI-engine Overcooked Env"
@ENV_REGISTRY.register('overcooked_game')
class OvercookGameEnv(BaseEnv):
config = EasyDict(
dict(
env_name="cramped_room",
horizon=400,
concat_obs=False,
action_mask=False,
shape_reward=True,
)
)
def __init__(self, cfg) -> None:
self._cfg = deep_merge_dicts(self.config, cfg)
self._env_name = self._cfg.env_name
self._horizon = self._cfg.horizon
self._concat_obs = self._cfg.concat_obs
self._action_mask = self._cfg.action_mask
self._shape_reward = self._cfg.shape_reward
self.mdp = OvercookedGridworld.from_layout_name(self._env_name)
self.base_env = OvercookedEnv.from_mdp(self.mdp, horizon=self._horizon, info_level=0)
# rightnow overcook environment encoding only support 2 agent game
self.agent_num = 2
self.action_dim = len(Action.ALL_ACTIONS)
self.action_space = gym.spaces.Discrete(len(Action.ALL_ACTIONS))
# set up obs shape
featurize_fn = lambda mdp, state: mdp.lossless_state_encoding(state)
self.featurize_fn = featurize_fn
dummy_mdp = self.base_env.mdp
dummy_state = dummy_mdp.get_standard_start_state()
obs_shape = self.featurize_fn(dummy_mdp, dummy_state)[0].shape # (5, 4, 26)
obs_shape = (obs_shape[-1], *obs_shape[:-1]) # permute channel first
if self._concat_obs:
obs_shape = (obs_shape[0] * 2, *obs_shape[1:])
else:
obs_shape = (2, ) + obs_shape
self.observation_space = gym.spaces.Box(low=0, high=1, shape=obs_shape, dtype=np.int64)
if self._action_mask:
self.observation_space = gym.spaces.Dict(
{
'agent_state': self.observation_space,
'action_mask': gym.spaces.Box(
low=0, high=1, shape=(self.agent_num, self.action_dim), dtype=np.int64
)
}
)
self.reward_space = gym.spaces.Box(low=0, high=100, shape=(1, ), dtype=np.float32)
def seed(self, seed: int, dynamic_seed: bool = True) -> None:
self._seed = seed
self._dynamic_seed = dynamic_seed
np.random.seed(self._seed)
def close(self) -> None:
# Note: the real env instance only has a empty close method, only pass
pass
def random_action(self):
return [self.action_space.sample() for _ in range(self.agent_num)]
def step(self, action):
assert all(self.action_space.contains(a) for a in action), "%r (%s) invalid" % (action, type(action))
agent_action, other_agent_action = [Action.INDEX_TO_ACTION[a] for a in action]
if self.agent_idx == 0:
joint_action = (agent_action, other_agent_action)
else:
joint_action = (other_agent_action, agent_action)
next_state, reward, done, env_info = self.base_env.step(joint_action)
reward = np.array([float(reward)])
self._eval_episode_return += reward
if self._shape_reward:
self._eval_episode_return += sum(env_info['shaped_r_by_agent'])
reward += sum(env_info['shaped_r_by_agent'])
ob_p0, ob_p1 = self.featurize_fn(self.mdp, next_state)
ob_p0, ob_p1 = self.obs_preprocess(ob_p0), self.obs_preprocess(ob_p1)
if self.agent_idx == 0:
both_agents_ob = [ob_p0, ob_p1]
else:
both_agents_ob = [ob_p1, ob_p0]
if self._concat_obs:
both_agents_ob = np.concatenate(both_agents_ob)
else:
both_agents_ob = np.stack(both_agents_ob)
env_info["policy_agent_idx"] = self.agent_idx
env_info["eval_episode_return"] = self._eval_episode_return
env_info["other_agent_env_idx"] = 1 - self.agent_idx
action_mask = self.get_action_mask()
if self._action_mask:
obs = {"agent_state": both_agents_ob, "action_mask": action_mask}
else:
obs = both_agents_ob
return OvercookEnvTimestep(obs, reward, done, env_info)
def obs_preprocess(self, obs):
obs = obs.transpose(2, 0, 1)
return obs
def reset(self):
self.base_env.reset()
self._eval_episode_return = 0
self.mdp = self.base_env.mdp
# random init agent index
self.agent_idx = np.random.choice([0, 1])
#fix init agent index
self.agent_idx = 0
ob_p0, ob_p1 = self.featurize_fn(self.mdp, self.base_env.state)
ob_p0, ob_p1 = self.obs_preprocess(ob_p0), self.obs_preprocess(ob_p1)
if self.agent_idx == 0:
both_agents_ob = [ob_p0, ob_p1]
else:
both_agents_ob = [ob_p1, ob_p0]
if self._concat_obs:
both_agents_ob = np.concatenate(both_agents_ob)
else:
both_agents_ob = np.stack(both_agents_ob)
action_mask = self.get_action_mask()
if self._action_mask:
obs = {"agent_state": both_agents_ob, "action_mask": action_mask}
else:
obs = both_agents_ob
return obs
def get_available_actions(self):
return self.mdp.get_actions(self.base_env.state)
def get_action_mask(self):
available_actions = self.get_available_actions()
action_masks = np.zeros((self.agent_num, self.action_dim)).astype(np.int64)
for i in range(self.action_dim):
if Action.INDEX_TO_ACTION[i] in available_actions[0]:
action_masks[0][i] = 1
if Action.INDEX_TO_ACTION[i] in available_actions[1]:
action_masks[1][i] = 1
return action_masks
def __repr__(self):
return "DI-engine Overcooked GameEnv"