File size: 4,241 Bytes
ee5d423
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

from gym.spaces import Box, Discrete
from stable_baselines3.common.preprocessing import get_flattened_obs_dim
from typing import Sequence, Type


class FeatureExtractor(nn.Module):
    def __init__(
        self,
        obs_space: gym.Space,
        activation: Type[nn.Module],
        init_layers_orthogonal: bool = False,
        cnn_feature_dim: int = 512,
    ) -> None:
        super().__init__()
        if isinstance(obs_space, Box):
            # Conv2D: (channels, height, width)
            if len(obs_space.shape) == 3:
                # CNN from DQN Nature paper: Mnih, Volodymyr, et al.
                # "Human-level control through deep reinforcement learning."
                # Nature 518.7540 (2015): 529-533.
                cnn = nn.Sequential(
                    layer_init(
                        nn.Conv2d(obs_space.shape[0], 32, kernel_size=8, stride=4),
                        init_layers_orthogonal,
                    ),
                    activation(),
                    layer_init(
                        nn.Conv2d(32, 64, kernel_size=4, stride=2),
                        init_layers_orthogonal,
                    ),
                    activation(),
                    layer_init(
                        nn.Conv2d(64, 64, kernel_size=3, stride=1),
                        init_layers_orthogonal,
                    ),
                    activation(),
                    nn.Flatten(),
                )

                def preprocess(obs: torch.Tensor) -> torch.Tensor:
                    if len(obs.shape) == 3:
                        obs = obs.unsqueeze(0)
                    return obs.float() / 255.0

                with torch.no_grad():
                    cnn_out = cnn(preprocess(torch.as_tensor(obs_space.sample())))
                self.preprocess = preprocess
                self.feature_extractor = nn.Sequential(
                    cnn,
                    layer_init(
                        nn.Linear(cnn_out.shape[1], cnn_feature_dim),
                        init_layers_orthogonal,
                    ),
                    activation(),
                )
                self.out_dim = cnn_feature_dim
            elif len(obs_space.shape) == 1:

                def preprocess(obs: torch.Tensor) -> torch.Tensor:
                    if len(obs.shape) == 1:
                        obs = obs.unsqueeze(0)
                    return obs.float()

                self.preprocess = preprocess
                self.feature_extractor = nn.Flatten()
                self.out_dim = get_flattened_obs_dim(obs_space)
            else:
                raise ValueError(f"Unsupported observation space: {obs_space}")
        elif isinstance(obs_space, Discrete):
            self.preprocess = lambda x: F.one_hot(x, obs_space.n).float()
            self.feature_extractor = nn.Flatten()
            self.out_dim = obs_space.n
        else:
            raise NotImplementedError

    def forward(self, obs: torch.Tensor) -> torch.Tensor:
        if self.preprocess:
            obs = self.preprocess(obs)
        return self.feature_extractor(obs)


def mlp(
    layer_sizes: Sequence[int],
    activation: Type[nn.Module],
    output_activation: Type[nn.Module] = nn.Identity,
    init_layers_orthogonal: bool = False,
    final_layer_gain: float = np.sqrt(2),
) -> nn.Module:
    layers = []
    for i in range(len(layer_sizes) - 2):
        layers.append(
            layer_init(
                nn.Linear(layer_sizes[i], layer_sizes[i + 1]), init_layers_orthogonal
            )
        )
        layers.append(activation())
    layers.append(
        layer_init(
            nn.Linear(layer_sizes[-2], layer_sizes[-1]),
            init_layers_orthogonal,
            std=final_layer_gain,
        )
    )
    layers.append(output_activation())
    return nn.Sequential(*layers)


def layer_init(
    layer: nn.Module, init_layers_orthogonal: bool, std: float = np.sqrt(2)
) -> nn.Module:
    if not init_layers_orthogonal:
        return layer
    nn.init.orthogonal_(layer.weight, std)  # type: ignore
    nn.init.constant_(layer.bias, 0.0)  # type: ignore
    return layer