|
""" |
|
Author: Minh Pham-Dinh |
|
Created: Feb 4th, 2024 |
|
Last Modified: Feb 7th, 2024 |
|
Email: mhpham26@colby.edu |
|
|
|
Description: |
|
File containing wrappers for different environment types. |
|
""" |
|
|
|
import gymnasium as gym |
|
from dm_control import suite |
|
from dm_control.suite.wrappers import pixels |
|
import numpy as np |
|
import cv2 |
|
import os |
|
from dm_control import suite |
|
from dm_control.rl.control import Environment |
|
|
|
|
|
class ActionRepeat: |
|
def __init__(self, env, repeats): |
|
self.env = env |
|
self.repeats = repeats |
|
|
|
def __getattr__(self, name): |
|
return getattr(self.env, name) |
|
|
|
def step(self, action): |
|
done = False |
|
total_reward = 0 |
|
current_step = 0 |
|
while current_step < self.repeats and not done: |
|
obs, reward, termination, truncation, info = self.env.step(action) |
|
total_reward += reward |
|
current_step += 1 |
|
done = termination or truncation |
|
return obs, total_reward, termination, truncation, info |
|
|
|
|
|
|
|
class NormalizeActions: |
|
""" |
|
A wrapper class that normalizes the action space of an environment. |
|
|
|
Args: |
|
env (gym.Env): The environment to be wrapped. |
|
|
|
Attributes: |
|
_env (gym.Env): The original environment. |
|
_mask (numpy.ndarray): A boolean mask indicating which action dimensions are finite. |
|
_low (numpy.ndarray): The lower bounds of the action space. |
|
_high (numpy.ndarray): The upper bounds of the action space. |
|
""" |
|
|
|
def __init__(self, env): |
|
self._env = env |
|
self._mask = np.logical_and( |
|
np.isfinite(env.action_space.low), |
|
np.isfinite(env.action_space.high)) |
|
self._low = np.where(self._mask, env.action_space.low, -1) |
|
self._high = np.where(self._mask, env.action_space.high, 1) |
|
|
|
def __getattr__(self, name): |
|
""" |
|
Delegate attribute access to the original environment. |
|
|
|
Args: |
|
name (str): The name of the attribute. |
|
|
|
Returns: |
|
Any: The value of the attribute in the original environment. |
|
""" |
|
return getattr(self._env, name) |
|
|
|
@property |
|
def action_space(self): |
|
""" |
|
Get the normalized action space. |
|
|
|
Returns: |
|
gym.spaces.Box: The normalized action space. |
|
""" |
|
low = np.where(self._mask, -np.ones_like(self._low), self._low) |
|
high = np.where(self._mask, np.ones_like(self._low), self._high) |
|
return gym.spaces.Box(low, high, dtype=np.float32) |
|
|
|
def step(self, action): |
|
""" |
|
Take a step in the environment with a normalized action. |
|
|
|
Args: |
|
action (numpy.ndarray): The normalized action. |
|
|
|
Returns: |
|
Tuple: A tuple containing the next state, reward, done flag, and additional information. |
|
""" |
|
original = (action + 1) / 2 * (self._high - self._low) + self._low |
|
original = np.where(self._mask, original, action) |
|
return self._env.step(original) |
|
|
|
|
|
class DMCtoGymWrapper(gym.Env): |
|
""" |
|
Wrapper to convert a DeepMind Control Suite environment to a Gymnasium environment with additional features like recording and episode truncation. |
|
|
|
Args: |
|
domain_name (str): The name of the domain. |
|
task_name (str): The name of the task. |
|
task_kwargs (dict, optional): Additional kwargs for the task. |
|
visualize_reward (bool, optional): Whether to visualize the reward. Defaults to False. |
|
resize (list, optional): New size to resize observations. Defaults to [64, 64]. |
|
record (bool, optional): Whether to record episodes. Defaults to False. |
|
record_freq (int, optional): Frequency (in episodes) to record. Defaults to 100. |
|
record_path (str, optional): Path to save recorded videos. Defaults to '../'. |
|
max_episode_steps (int, optional): Maximum steps per episode for truncation. Defaults to 1000. |
|
""" |
|
def __init__(self, domain_name, task_name, task_kwargs=None, visualize_reward=False, resize=[64,64], record=False, record_freq=100, record_path='../', max_episode_steps=1000, camera=None): |
|
super().__init__() |
|
self.env = suite.load(domain_name, task_name, task_kwargs=task_kwargs, visualize_reward=visualize_reward) |
|
self.episode_count = -1 |
|
self.record = record |
|
self.record_freq = record_freq |
|
self.record_path = record_path |
|
self.max_episode_steps = max_episode_steps |
|
self.current_step = 0 |
|
self.total_reward = 0 |
|
self.recorder = None |
|
|
|
|
|
action_spec = self.env.action_spec() |
|
self.action_space = gym.spaces.Box(low=action_spec.minimum, high=action_spec.maximum, dtype=np.float32) |
|
|
|
|
|
self.env = pixels.Wrapper(self.env, pixels_only=True) |
|
self.resize = resize |
|
self.observation_space = gym.spaces.Box(low=-0.5, high=+0.5, shape=(3, *resize), dtype=np.float32) |
|
|
|
if camera is None: |
|
camera = dict(quadruped=2).get(domain_name, 0) |
|
self._camera = camera |
|
|
|
def step(self, action): |
|
time_step = self.env.step(action) |
|
obs = self._get_obs(self.env) |
|
|
|
reward = time_step.reward if time_step.reward is not None else 0 |
|
self.total_reward += (reward or 0) |
|
self.current_step += 1 |
|
|
|
termination = time_step.last() |
|
truncation = (self.current_step == self.max_episode_steps) |
|
info = {} |
|
if termination or truncation: |
|
info = { |
|
'episode': { |
|
'r': [self.total_reward], |
|
'l': self.current_step |
|
} |
|
} |
|
|
|
if self.recorder: |
|
frame = cv2.cvtColor(self.env.physics.render(camera_id=self._camera), cv2.COLOR_RGB2BGR) |
|
self.recorder.write(frame) |
|
video_file = os.path.join(self.record_path, f"episode_{self.episode_count}.webm") |
|
if termination or truncation: |
|
self._reset_recorder() |
|
info['video_path'] = video_file |
|
|
|
return obs, reward, termination, truncation, info |
|
|
|
def reset(self): |
|
self.current_step = 0 |
|
self.total_reward = 0 |
|
self.episode_count += 1 |
|
|
|
time_step = self.env.reset() |
|
obs = self._get_obs(self.env) |
|
|
|
if self.record and self.episode_count % self.record_freq == 0: |
|
self._start_recording(self.env.physics.render(camera_id=self._camera)) |
|
|
|
return obs, {} |
|
|
|
def _start_recording(self, frame): |
|
if not os.path.exists(self.record_path): |
|
os.makedirs(self.record_path) |
|
video_file = os.path.join(self.record_path, f"episode_{self.episode_count}.webm") |
|
height, width, _ = frame.shape |
|
self.recorder = cv2.VideoWriter(video_file, cv2.VideoWriter_fourcc(*'vp80'), 30, (width, height)) |
|
self.recorder.write(frame) |
|
|
|
def _reset_recorder(self): |
|
if self.recorder: |
|
self.recorder.release() |
|
self.recorder = None |
|
|
|
def _get_obs(self, env): |
|
obs = self.render() |
|
obs = obs/255 - 0.5 |
|
rearranged_obs = obs.transpose([2,0,1]) |
|
return rearranged_obs |
|
|
|
def render(self, mode='rgb_array'): |
|
return self.env.physics.render(*self.resize, camera_id=self._camera) |
|
|
|
|
|
class AtariPreprocess(gym.Wrapper): |
|
""" |
|
A custom Gym wrapper that integrates multiple environment processing steps: |
|
- Records episode statistics and videos. |
|
- Resizes observations to a specified shape. |
|
- Scales and reorders observation channels. |
|
- Scales rewards using the tanh function. |
|
|
|
Parameters: |
|
- env (gym.Env): The original environment to wrap. |
|
- new_obs_size (tuple): The target size for observation resizing (height, width). |
|
- record (bool): If True, enable video recording. |
|
- record_path (str): The directory path where videos will be saved. |
|
- record_freq (int): Frequency (in episodes) at which to record videos. |
|
""" |
|
def __init__(self, env, new_obs_size, record=False, record_path='../videos/', record_freq=100): |
|
super().__init__(env) |
|
self.env = gym.wrappers.RecordEpisodeStatistics(env) |
|
|
|
if record: |
|
self.env = gym.wrappers.RecordVideo(self.env, record_path, episode_trigger=lambda episode_id: episode_id % record_freq == 0) |
|
self.env = gym.wrappers.ResizeObservation(self.env, shape=new_obs_size) |
|
|
|
self.new_obs_size = new_obs_size |
|
self.observation_space = gym.spaces.Box( |
|
low=-0.5, high=0.5, |
|
shape=(3, new_obs_size[0], new_obs_size[1]), |
|
dtype=np.float32 |
|
) |
|
|
|
def step(self, action): |
|
obs, reward, termination, truncation, info = super().step(action) |
|
obs = self.process_observation(obs) |
|
reward = np.tanh(reward) |
|
return obs, reward, termination, truncation, info |
|
|
|
def reset(self, **kwargs): |
|
obs, info = super().reset(**kwargs) |
|
obs = self.process_observation(obs) |
|
return obs, info |
|
|
|
def process_observation(self, observation): |
|
""" |
|
Process and return the observation from the environment. |
|
- Scales pixel values to the range [-0.5, 0.5]. |
|
- Reorders channels to CHW format (channels, height, width). |
|
|
|
Parameters: |
|
- observation (np.ndarray): The original observation from the environment. |
|
|
|
Returns: |
|
- np.ndarray: The processed observation. |
|
""" |
|
if 'pixels' in observation: |
|
observation = observation['pixels'] |
|
observation = observation / 255.0 - 0.5 |
|
observation = np.transpose(observation, (2, 0, 1)) |
|
return observation |