File size: 1,919 Bytes
0e936e1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
from abc import ABC, abstractmethod
from typing import Optional, Tuple, Type, Union
import gym
import numpy as np
import torch
import torch.nn as nn
from rl_algo_impls.shared.module.module import layer_init
EncoderOutDim = Union[int, Tuple[int, ...]]
class CnnEncoder(nn.Module, ABC):
@abstractmethod
def __init__(
self,
obs_space: gym.Space,
**kwargs,
) -> None:
super().__init__()
self.range_size = np.max(obs_space.high) - np.min(obs_space.low) # type: ignore
def preprocess(self, obs: torch.Tensor) -> torch.Tensor:
if len(obs.shape) == 3:
obs = obs.unsqueeze(0)
return obs.float() / self.range_size
def forward(self, obs: torch.Tensor) -> torch.Tensor:
return self.preprocess(obs)
@property
@abstractmethod
def out_dim(self) -> EncoderOutDim:
...
class FlattenedCnnEncoder(CnnEncoder):
def __init__(
self,
obs_space: gym.Space,
activation: Type[nn.Module],
linear_init_layers_orthogonal: bool,
cnn_flatten_dim: int,
cnn: nn.Module,
**kwargs,
) -> None:
super().__init__(obs_space, **kwargs)
self.cnn = cnn
self.flattened_dim = cnn_flatten_dim
with torch.no_grad():
cnn_out = torch.flatten(
cnn(self.preprocess(torch.as_tensor(obs_space.sample()))), start_dim=1
)
self.fc = nn.Sequential(
nn.Flatten(),
layer_init(
nn.Linear(cnn_out.shape[1], cnn_flatten_dim),
linear_init_layers_orthogonal,
),
activation(),
)
def forward(self, obs: torch.Tensor) -> torch.Tensor:
x = super().forward(obs)
x = self.cnn(x)
x = self.fc(x)
return x
@property
def out_dim(self) -> EncoderOutDim:
return self.flattened_dim
|