gomoku / DI-engine /dizoo /mujoco /envs /mujoco_disc_env.py
zjowowen's picture
init space
079c32c
import copy
import os
from itertools import product
from typing import Union, List, Optional
import gym
import numpy as np
from easydict import EasyDict
from ding.envs import BaseEnv, BaseEnvTimestep
from ding.envs.common import save_frames_as_gif
from ding.torch_utils import to_ndarray
from ding.utils import ENV_REGISTRY
from .mujoco_wrappers import wrap_mujoco
@ENV_REGISTRY.register('mujoco-disc')
class MujocoDiscEnv(BaseEnv):
"""
Overview:
The modified Mujoco environment with manually discretized action space. For each dimension, equally dividing the
original continuous action into ``each_dim_disc_size`` bins and using their Cartesian product to obtain
handcrafted discrete actions.
"""
@classmethod
def default_config(cls: type) -> EasyDict:
cfg = EasyDict(copy.deepcopy(cls.config))
cfg.cfg_type = cls.__name__ + 'Dict'
return cfg
config = dict(
action_clip=False,
delay_reward_step=0,
replay_path=None,
save_replay_gif=False,
replay_path_gif=None,
)
def __init__(self, cfg: dict) -> None:
self._cfg = cfg
self._action_clip = cfg.action_clip
self._delay_reward_step = cfg.delay_reward_step
self._init_flag = False
self._replay_path = None
self._replay_path_gif = cfg.replay_path_gif
self._save_replay_gif = cfg.save_replay_gif
def reset(self) -> np.ndarray:
if not self._init_flag:
self._env = self._make_env()
self._env.observation_space.dtype = np.float32 # To unify the format of envs in DI-engine
self._observation_space = self._env.observation_space
self._raw_action_space = self._env.action_space
self._reward_space = gym.spaces.Box(
low=self._env.reward_range[0], high=self._env.reward_range[1], shape=(1, ), dtype=np.float32
)
self._init_flag = True
if hasattr(self, '_seed') and hasattr(self, '_dynamic_seed') and self._dynamic_seed:
np_seed = 100 * np.random.randint(1, 1000)
self._env.seed(self._seed + np_seed)
elif hasattr(self, '_seed'):
self._env.seed(self._seed)
if self._replay_path is not None:
self._env = gym.wrappers.RecordVideo(
self._env,
video_folder=self._replay_path,
episode_trigger=lambda episode_id: True,
name_prefix='rl-video-{}'.format(id(self))
)
if self._save_replay_gif:
self._frames = []
obs = self._env.reset()
obs = to_ndarray(obs).astype('float32')
# disc_to_cont: transform discrete action index to original continuous action
self.m = self._raw_action_space.shape[0]
self.n = self._cfg.each_dim_disc_size
self.K = self.n ** self.m
self.disc_to_cont = list(product(*[list(range(self.n)) for _ in range(self.m)]))
self._eval_episode_return = 0.
# the modified discrete action space
self._action_space = gym.spaces.Discrete(self.K)
return obs
def close(self) -> None:
if self._init_flag:
self._env.close()
self._init_flag = False
def seed(self, seed: int, dynamic_seed: bool = True) -> None:
self._seed = seed
self._dynamic_seed = dynamic_seed
np.random.seed(self._seed)
def step(self, action: Union[np.ndarray, list]) -> BaseEnvTimestep:
# disc_to_cont: transform discrete action index to original continuous action
action = [-1 + 2 / self.n * k for k in self.disc_to_cont[int(action)]]
action = to_ndarray(action)
if self._save_replay_gif:
self._frames.append(self._env.render(mode='rgb_array'))
if self._action_clip:
action = np.clip(action, -1, 1)
obs, rew, done, info = self._env.step(action)
self._eval_episode_return += rew
if done:
if self._save_replay_gif:
path = os.path.join(
self._replay_path_gif, '{}_episode_{}.gif'.format(self._cfg.env_id, self._save_replay_count)
)
save_frames_as_gif(self._frames, path)
self._save_replay_count += 1
info['eval_episode_return'] = self._eval_episode_return
obs = to_ndarray(obs).astype(np.float32)
rew = to_ndarray([rew]).astype(np.float32)
return BaseEnvTimestep(obs, rew, done, info)
def _make_env(self):
return wrap_mujoco(
self._cfg.env_id,
norm_obs=self._cfg.get('norm_obs', None),
norm_reward=self._cfg.get('norm_reward', None),
delay_reward_step=self._delay_reward_step
)
def enable_save_replay(self, replay_path: Optional[str] = None) -> None:
if replay_path is None:
replay_path = './video'
self._replay_path = replay_path
self._save_replay = True
self._save_replay_count = 0
def random_action(self) -> np.ndarray:
return self.action_space.sample()
def __repr__(self) -> str:
return "DI-engine modified Mujoco Env({}) with manually discretized action space".format(self._cfg.env_id)
@staticmethod
def create_collector_env_cfg(cfg: dict) -> List[dict]:
collector_cfg = copy.deepcopy(cfg)
collector_env_num = collector_cfg.pop('collector_env_num', 1)
return [collector_cfg for _ in range(collector_env_num)]
@staticmethod
def create_evaluator_env_cfg(cfg: dict) -> List[dict]:
evaluator_cfg = copy.deepcopy(cfg)
evaluator_env_num = evaluator_cfg.pop('evaluator_env_num', 1)
evaluator_cfg.norm_reward.use_norm = False
return [evaluator_cfg for _ in range(evaluator_env_num)]
@property
def observation_space(self) -> gym.spaces.Space:
return self._observation_space
@property
def action_space(self) -> gym.spaces.Space:
return self._action_space
@property
def reward_space(self) -> gym.spaces.Space:
return self._reward_space