A2C playing BreakoutNoFrameskip-v4 from https://github.com/sgoodfriend/rl-algo-impls/tree/983cb75e43e51cf4ef57f177194ab9a4a1a8808b
233c511
from typing import Tuple, TypeVar | |
import gym | |
import numpy as np | |
from numpy.typing import NDArray | |
from rl_algo_impls.wrappers.vectorable_wrapper import ( | |
VecotarableWrapper, | |
single_observation_space, | |
) | |
RunningMeanStdSelf = TypeVar("RunningMeanStdSelf", bound="RunningMeanStd") | |
class RunningMeanStd: | |
def __init__(self, episilon: float = 1e-4, shape: Tuple[int, ...] = ()) -> None: | |
self.mean = np.zeros(shape, np.float64) | |
self.var = np.ones(shape, np.float64) | |
self.count = episilon | |
def update(self, x: NDArray) -> None: | |
batch_mean = np.mean(x, axis=0) | |
batch_var = np.var(x, axis=0) | |
batch_count = x.shape[0] | |
delta = batch_mean - self.mean | |
total_count = self.count + batch_count | |
self.mean += delta * batch_count / total_count | |
m_a = self.var * self.count | |
m_b = batch_var * batch_count | |
M2 = m_a + m_b + np.square(delta) * self.count * batch_count / total_count | |
self.var = M2 / total_count | |
self.count = total_count | |
def save(self, path: str) -> None: | |
np.savez_compressed( | |
path, | |
mean=self.mean, | |
var=self.var, | |
count=self.count, | |
) | |
def load(self, path: str) -> None: | |
data = np.load(path) | |
self.mean = data["mean"] | |
self.var = data["var"] | |
self.count = data["count"] | |
def load_from(self: RunningMeanStdSelf, existing: RunningMeanStdSelf) -> None: | |
self.mean = np.copy(existing.mean) | |
self.var = np.copy(existing.var) | |
self.count = np.copy(existing.count) | |
NormalizeObservationSelf = TypeVar( | |
"NormalizeObservationSelf", bound="NormalizeObservation" | |
) | |
class NormalizeObservation(VecotarableWrapper): | |
def __init__( | |
self, | |
env: gym.Env, | |
training: bool = True, | |
epsilon: float = 1e-8, | |
clip: float = 10.0, | |
) -> None: | |
super().__init__(env) | |
self.rms = RunningMeanStd(shape=single_observation_space(env).shape) | |
self.training = training | |
self.epsilon = epsilon | |
self.clip = clip | |
def step(self, action): | |
obs, reward, done, info = self.env.step(action) | |
return self.normalize(obs), reward, done, info | |
def reset(self, **kwargs): | |
obs = self.env.reset(**kwargs) | |
return self.normalize(obs) | |
def normalize(self, obs: NDArray) -> NDArray: | |
obs_array = np.array([obs]) if not self.is_vector_env else obs | |
if self.training: | |
self.rms.update(obs_array) | |
normalized = np.clip( | |
(obs_array - self.rms.mean) / np.sqrt(self.rms.var + self.epsilon), | |
-self.clip, | |
self.clip, | |
) | |
return normalized[0] if not self.is_vector_env else normalized | |
def save(self, path: str) -> None: | |
self.rms.save(path) | |
def load(self, path: str) -> None: | |
self.rms.load(path) | |
def load_from( | |
self: NormalizeObservationSelf, existing: NormalizeObservationSelf | |
) -> None: | |
self.rms.load_from(existing.rms) | |
NormalizeRewardSelf = TypeVar("NormalizeRewardSelf", bound="NormalizeReward") | |
class NormalizeReward(VecotarableWrapper): | |
def __init__( | |
self, | |
env: gym.Env, | |
training: bool = True, | |
gamma: float = 0.99, | |
epsilon: float = 1e-8, | |
clip: float = 10.0, | |
) -> None: | |
super().__init__(env) | |
self.rms = RunningMeanStd(shape=()) | |
self.training = training | |
self.gamma = gamma | |
self.epsilon = epsilon | |
self.clip = clip | |
self.returns = np.zeros(self.num_envs) | |
def step(self, action): | |
obs, reward, done, info = self.env.step(action) | |
if not self.is_vector_env: | |
reward = np.array([reward]) | |
reward = self.normalize(reward) | |
if not self.is_vector_env: | |
reward = reward[0] | |
dones = done if self.is_vector_env else np.array([done]) | |
self.returns[dones] = 0 | |
return obs, reward, done, info | |
def reset(self, **kwargs): | |
self.returns = np.zeros(self.num_envs) | |
return self.env.reset(**kwargs) | |
def normalize(self, rewards): | |
if self.training: | |
self.returns = self.returns * self.gamma + rewards | |
self.rms.update(self.returns) | |
return np.clip( | |
rewards / np.sqrt(self.rms.var + self.epsilon), -self.clip, self.clip | |
) | |
def save(self, path: str) -> None: | |
self.rms.save(path) | |
def load(self, path: str) -> None: | |
self.rms.load(path) | |
def load_from(self: NormalizeRewardSelf, existing: NormalizeRewardSelf) -> None: | |
self.rms.load_from(existing.rms) | |