from typing import Optional, Tuple, Type import numpy as np import torch import torch.nn as nn from numpy.typing import NDArray from rl_algo_impls.shared.actor import Actor, PiForward, pi_forward from rl_algo_impls.shared.actor.gridnet import GridnetDistribution from rl_algo_impls.shared.encoder import EncoderOutDim from rl_algo_impls.shared.module.utils import layer_init class Transpose(nn.Module): def __init__(self, permutation: Tuple[int, ...]) -> None: super().__init__() self.permutation = permutation def forward(self, x: torch.Tensor) -> torch.Tensor: return x.permute(self.permutation) class GridnetDecoder(Actor): def __init__( self, map_size: int, action_vec: NDArray[np.int64], in_dim: EncoderOutDim, activation: Type[nn.Module] = nn.ReLU, init_layers_orthogonal: bool = True, ) -> None: super().__init__() self.map_size = map_size self.action_vec = action_vec assert isinstance(in_dim, tuple) self.deconv = nn.Sequential( layer_init( nn.ConvTranspose2d( in_dim[0], 128, 3, stride=2, padding=1, output_padding=1 ), init_layers_orthogonal=init_layers_orthogonal, ), activation(), layer_init( nn.ConvTranspose2d(128, 64, 3, stride=2, padding=1, output_padding=1), init_layers_orthogonal=init_layers_orthogonal, ), activation(), layer_init( nn.ConvTranspose2d(64, 32, 3, stride=2, padding=1, output_padding=1), init_layers_orthogonal=init_layers_orthogonal, ), activation(), layer_init( nn.ConvTranspose2d( 32, action_vec.sum(), 3, stride=2, padding=1, output_padding=1 ), init_layers_orthogonal=init_layers_orthogonal, std=0.01, ), Transpose((0, 2, 3, 1)), ) def forward( self, obs: torch.Tensor, actions: Optional[torch.Tensor] = None, action_masks: Optional[torch.Tensor] = None, ) -> PiForward: assert ( action_masks is not None ), f"No mask case unhandled in {self.__class__.__name__}" logits = self.deconv(obs) pi = GridnetDistribution(self.map_size, self.action_vec, logits, action_masks) return pi_forward(pi, actions) @property def action_shape(self) -> Tuple[int, ...]: return (self.map_size, len(self.action_vec))