zjowowen's picture
init space
079c32c
raw
history blame
8.61 kB
import copy
import os
from typing import Union, List, Optional
import gym
import numpy as np
import torch
from easydict import EasyDict
from ding.envs import BaseEnv, BaseEnvTimestep
from ding.envs.common import save_frames_as_gif
from ding.torch_utils import to_ndarray
from ding.utils import ENV_REGISTRY
from .mujoco_wrappers import wrap_mujoco
@ENV_REGISTRY.register('mujoco')
class MujocoEnv(BaseEnv):
@classmethod
def default_config(cls: type) -> EasyDict:
cfg = EasyDict(copy.deepcopy(cls.config))
cfg.cfg_type = cls.__name__ + 'Dict'
return cfg
config = dict(
action_clip=False,
delay_reward_step=0,
replay_path=None,
save_replay_gif=False,
replay_path_gif=None,
action_bins_per_branch=None,
)
def __init__(self, cfg: dict) -> None:
self._cfg = cfg
self._action_clip = cfg.action_clip
self._delay_reward_step = cfg.delay_reward_step
self._init_flag = False
self._replay_path = None
self._replay_path_gif = cfg.replay_path_gif
self._save_replay_gif = cfg.save_replay_gif
self._action_bins_per_branch = cfg.action_bins_per_branch
def map_action(self, action: Union[np.ndarray, list]) -> Union[np.ndarray, list]:
"""
Overview:
Map the discretized action index to the action in the original action space.
Arguments:
- action (:obj:`np.ndarray or list`): The discretized action index. \
The value ranges is {0, 1, ..., self._action_bins_per_branch - 1}.
Returns:
- outputs (:obj:`list`): The action in the original action space. \
The value ranges is [-1, 1].
Examples:
>>> inputs = [2, 0, 4]
>>> self._action_bins_per_branch = 5
>>> outputs = map_action(inputs)
>>> assert isinstance(outputs, list) and outputs == [0.0, -1.0, 1.0]
"""
return [2 * x / (self._action_bins_per_branch - 1) - 1 for x in action]
def reset(self) -> np.ndarray:
if not self._init_flag:
self._env = self._make_env()
if self._replay_path is not None:
self._env = gym.wrappers.RecordVideo(
self._env,
video_folder=self._replay_path,
episode_trigger=lambda episode_id: True,
name_prefix='rl-video-{}'.format(id(self))
)
self._env.observation_space.dtype = np.float32 # To unify the format of envs in DI-engine
self._observation_space = self._env.observation_space
self._action_space = self._env.action_space
self._reward_space = gym.spaces.Box(
low=self._env.reward_range[0], high=self._env.reward_range[1], shape=(1, ), dtype=np.float32
)
self._init_flag = True
if hasattr(self, '_seed') and hasattr(self, '_dynamic_seed') and self._dynamic_seed:
np_seed = 100 * np.random.randint(1, 1000)
self._env.seed(self._seed + np_seed)
elif hasattr(self, '_seed'):
self._env.seed(self._seed)
obs = self._env.reset()
obs = to_ndarray(obs).astype('float32')
self._eval_episode_return = 0.
return obs
def close(self) -> None:
if self._init_flag:
self._env.close()
self._init_flag = False
def seed(self, seed: int, dynamic_seed: bool = True) -> None:
self._seed = seed
self._dynamic_seed = dynamic_seed
np.random.seed(self._seed)
def step(self, action: Union[np.ndarray, list]) -> BaseEnvTimestep:
if self._action_bins_per_branch:
action = self.map_action(action)
action = to_ndarray(action)
if self._save_replay_gif:
self._frames.append(self._env.render(mode='rgb_array'))
if self._action_clip:
action = np.clip(action, -1, 1)
obs, rew, done, info = self._env.step(action)
self._eval_episode_return += rew
if done:
if self._save_replay_gif:
path = os.path.join(
self._replay_path_gif, '{}_episode_{}.gif'.format(self._cfg.env_id, self._save_replay_count)
)
save_frames_as_gif(self._frames, path)
self._save_replay_count += 1
info['eval_episode_return'] = self._eval_episode_return
obs = to_ndarray(obs).astype(np.float32)
rew = to_ndarray([rew]).astype(np.float32)
return BaseEnvTimestep(obs, rew, done, info)
def _make_env(self):
return wrap_mujoco(
self._cfg.env_id,
norm_obs=self._cfg.get('norm_obs', None),
norm_reward=self._cfg.get('norm_reward', None),
delay_reward_step=self._delay_reward_step
)
def enable_save_replay(self, replay_path: Optional[str] = None) -> None:
if replay_path is None:
replay_path = './video'
self._replay_path = replay_path
def random_action(self) -> np.ndarray:
return self.action_space.sample()
def __repr__(self) -> str:
return "DI-engine Mujoco Env({})".format(self._cfg.env_id)
@staticmethod
def create_collector_env_cfg(cfg: dict) -> List[dict]:
collector_cfg = copy.deepcopy(cfg)
collector_env_num = collector_cfg.pop('collector_env_num', 1)
return [collector_cfg for _ in range(collector_env_num)]
@staticmethod
def create_evaluator_env_cfg(cfg: dict) -> List[dict]:
evaluator_cfg = copy.deepcopy(cfg)
evaluator_env_num = evaluator_cfg.pop('evaluator_env_num', 1)
evaluator_cfg.norm_reward.use_norm = False
return [evaluator_cfg for _ in range(evaluator_env_num)]
@property
def observation_space(self) -> gym.spaces.Space:
return self._observation_space
@property
def action_space(self) -> gym.spaces.Space:
return self._action_space
@property
def reward_space(self) -> gym.spaces.Space:
return self._reward_space
@ENV_REGISTRY.register('mbmujoco')
class MBMujocoEnv(MujocoEnv):
def termination_fn(self, next_obs: torch.Tensor) -> torch.Tensor:
"""
Overview:
This function determines whether each state is a terminated state.
.. note::
This is a collection of termination functions for mujocos used in MBPO (arXiv: 1906.08253),\
directly copied from MBPO repo https://github.com/jannerm/mbpo/tree/master/mbpo/static.
"""
assert len(next_obs.shape) == 2
if self._cfg.env_id == "Hopper-v2":
height = next_obs[:, 0]
angle = next_obs[:, 1]
not_done = torch.isfinite(next_obs).all(-1) \
* (torch.abs(next_obs[:, 1:]) < 100).all(-1) \
* (height > .7) \
* (torch.abs(angle) < .2)
done = ~not_done
return done
elif self._cfg.env_id == "Walker2d-v2":
height = next_obs[:, 0]
angle = next_obs[:, 1]
not_done = (height > 0.8) \
* (height < 2.0) \
* (angle > -1.0) \
* (angle < 1.0)
done = ~not_done
return done
elif 'walker_' in self._cfg.env_id:
torso_height = next_obs[:, -2]
torso_ang = next_obs[:, -1]
if 'walker_7' in self._cfg.env_id or 'walker_5' in self._cfg.env_id:
offset = 0.
else:
offset = 0.26
not_done = (torso_height > 0.8 - offset) \
* (torso_height < 2.0 - offset) \
* (torso_ang > -1.0) \
* (torso_ang < 1.0)
done = ~not_done
return done
elif self._cfg.env_id == "HalfCheetah-v3":
done = torch.zeros_like(next_obs.sum(-1)).bool()
return done
elif self._cfg.env_id in ['Ant-v2', 'AntTruncatedObs-v2']:
x = next_obs[:, 0]
not_done = torch.isfinite(next_obs).all(axis=-1) \
* (x >= 0.2) \
* (x <= 1.0)
done = ~not_done
return done
elif self._cfg.env_id in ['Humanoid-v2', 'HumanoidTruncatedObs-v2']:
z = next_obs[:, 0]
done = (z < 1.0) + (z > 2.0)
return done
else:
raise KeyError("not implemented env_id: {}".format(self._cfg.env_id))