File size: 4,690 Bytes
271ebe6
 
894cec0
 
 
 
 
 
 
 
 
271ebe6
 
894cec0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
271ebe6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
894cec0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
271ebe6
894cec0
 
271ebe6
 
 
 
 
 
 
 
 
894cec0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
271ebe6
894cec0
 
271ebe6
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
from typing import Tuple, TypeVar

import gym
import numpy as np
from numpy.typing import NDArray

from rl_algo_impls.wrappers.vectorable_wrapper import (
    VecotarableWrapper,
    single_observation_space,
)

RunningMeanStdSelf = TypeVar("RunningMeanStdSelf", bound="RunningMeanStd")


class RunningMeanStd:
    def __init__(self, episilon: float = 1e-4, shape: Tuple[int, ...] = ()) -> None:
        self.mean = np.zeros(shape, np.float64)
        self.var = np.ones(shape, np.float64)
        self.count = episilon

    def update(self, x: NDArray) -> None:
        batch_mean = np.mean(x, axis=0)
        batch_var = np.var(x, axis=0)
        batch_count = x.shape[0]

        delta = batch_mean - self.mean
        total_count = self.count + batch_count

        self.mean += delta * batch_count / total_count

        m_a = self.var * self.count
        m_b = batch_var * batch_count
        M2 = m_a + m_b + np.square(delta) * self.count * batch_count / total_count
        self.var = M2 / total_count
        self.count = total_count

    def save(self, path: str) -> None:
        np.savez_compressed(
            path,
            mean=self.mean,
            var=self.var,
            count=self.count,
        )

    def load(self, path: str) -> None:
        data = np.load(path)
        self.mean = data["mean"]
        self.var = data["var"]
        self.count = data["count"]

    def load_from(self: RunningMeanStdSelf, existing: RunningMeanStdSelf) -> None:
        self.mean = np.copy(existing.mean)
        self.var = np.copy(existing.var)
        self.count = np.copy(existing.count)


NormalizeObservationSelf = TypeVar(
    "NormalizeObservationSelf", bound="NormalizeObservation"
)


class NormalizeObservation(VecotarableWrapper):
    def __init__(
        self,
        env: gym.Env,
        training: bool = True,
        epsilon: float = 1e-8,
        clip: float = 10.0,
    ) -> None:
        super().__init__(env)
        self.rms = RunningMeanStd(shape=single_observation_space(env).shape)
        self.training = training
        self.epsilon = epsilon
        self.clip = clip

    def step(self, action):
        obs, reward, done, info = self.env.step(action)
        return self.normalize(obs), reward, done, info

    def reset(self, **kwargs):
        obs = self.env.reset(**kwargs)
        return self.normalize(obs)

    def normalize(self, obs: NDArray) -> NDArray:
        obs_array = np.array([obs]) if not self.is_vector_env else obs
        if self.training:
            self.rms.update(obs_array)
        normalized = np.clip(
            (obs_array - self.rms.mean) / np.sqrt(self.rms.var + self.epsilon),
            -self.clip,
            self.clip,
        )
        return normalized[0] if not self.is_vector_env else normalized

    def save(self, path: str) -> None:
        self.rms.save(path)

    def load(self, path: str) -> None:
        self.rms.load(path)

    def load_from(
        self: NormalizeObservationSelf, existing: NormalizeObservationSelf
    ) -> None:
        self.rms.load_from(existing.rms)


NormalizeRewardSelf = TypeVar("NormalizeRewardSelf", bound="NormalizeReward")


class NormalizeReward(VecotarableWrapper):
    def __init__(
        self,
        env: gym.Env,
        training: bool = True,
        gamma: float = 0.99,
        epsilon: float = 1e-8,
        clip: float = 10.0,
    ) -> None:
        super().__init__(env)
        self.rms = RunningMeanStd(shape=())
        self.training = training
        self.gamma = gamma
        self.epsilon = epsilon
        self.clip = clip

        self.returns = np.zeros(self.num_envs)

    def step(self, action):
        obs, reward, done, info = self.env.step(action)

        if not self.is_vector_env:
            reward = np.array([reward])
        reward = self.normalize(reward)
        if not self.is_vector_env:
            reward = reward[0]

        dones = done if self.is_vector_env else np.array([done])
        self.returns[dones] = 0

        return obs, reward, done, info

    def reset(self, **kwargs):
        self.returns = np.zeros(self.num_envs)
        return self.env.reset(**kwargs)

    def normalize(self, rewards):
        if self.training:
            self.returns = self.returns * self.gamma + rewards
            self.rms.update(self.returns)
        return np.clip(
            rewards / np.sqrt(self.rms.var + self.epsilon), -self.clip, self.clip
        )

    def save(self, path: str) -> None:
        self.rms.save(path)

    def load(self, path: str) -> None:
        self.rms.load(path)

    def load_from(self: NormalizeRewardSelf, existing: NormalizeRewardSelf) -> None:
        self.rms.load_from(existing.rms)