Spaces:
Running
Running
from typing import Any, Dict, List, Optional, Type | |
import gym | |
import torch as th | |
from torch import nn | |
from stable_baselines3.common.policies import BasePolicy, register_policy | |
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor, FlattenExtractor, NatureCNN, create_mlp | |
from stable_baselines3.common.type_aliases import Schedule | |
class QNetwork(BasePolicy): | |
""" | |
Action-Value (Q-Value) network for DQN | |
:param observation_space: Observation space | |
:param action_space: Action space | |
:param net_arch: The specification of the policy and value networks. | |
:param activation_fn: Activation function | |
:param normalize_images: Whether to normalize images or not, | |
dividing by 255.0 (True by default) | |
""" | |
def __init__( | |
self, | |
observation_space: gym.spaces.Space, | |
action_space: gym.spaces.Space, | |
features_extractor: nn.Module, | |
features_dim: int, | |
net_arch: Optional[List[int]] = None, | |
activation_fn: Type[nn.Module] = nn.ReLU, | |
normalize_images: bool = True, | |
): | |
super(QNetwork, self).__init__( | |
observation_space, | |
action_space, | |
features_extractor=features_extractor, | |
normalize_images=normalize_images, | |
) | |
if net_arch is None: | |
net_arch = [64, 64] | |
self.net_arch = net_arch | |
self.activation_fn = activation_fn | |
self.features_extractor = features_extractor | |
self.features_dim = features_dim | |
self.normalize_images = normalize_images | |
action_dim = self.action_space.n # number of actions | |
q_net = create_mlp(self.features_dim, action_dim, self.net_arch, self.activation_fn) | |
self.q_net = nn.Sequential(*q_net) | |
def forward(self, obs: th.Tensor) -> th.Tensor: | |
""" | |
Predict the q-values. | |
:param obs: Observation | |
:return: The estimated Q-Value for each action. | |
""" | |
return self.q_net(self.extract_features(obs)) | |
def _predict(self, observation: th.Tensor, deterministic: bool = True) -> th.Tensor: | |
q_values = self.forward(observation) | |
# Greedy action | |
action = q_values.argmax(dim=1).reshape(-1) | |
return action | |
def _get_constructor_parameters(self) -> Dict[str, Any]: | |
data = super()._get_constructor_parameters() | |
data.update( | |
dict( | |
net_arch=self.net_arch, | |
features_dim=self.features_dim, | |
activation_fn=self.activation_fn, | |
features_extractor=self.features_extractor, | |
) | |
) | |
return data | |
class DQNPolicy(BasePolicy): | |
""" | |
Policy class with Q-Value Net and target net for DQN | |
:param observation_space: Observation space | |
:param action_space: Action space | |
:param lr_schedule: Learning rate schedule (could be constant) | |
:param net_arch: The specification of the policy and value networks. | |
:param activation_fn: Activation function | |
:param features_extractor_class: Features extractor to use. | |
:param features_extractor_kwargs: Keyword arguments | |
to pass to the features extractor. | |
:param normalize_images: Whether to normalize images or not, | |
dividing by 255.0 (True by default) | |
:param optimizer_class: The optimizer to use, | |
``th.optim.Adam`` by default | |
:param optimizer_kwargs: Additional keyword arguments, | |
excluding the learning rate, to pass to the optimizer | |
""" | |
def __init__( | |
self, | |
observation_space: gym.spaces.Space, | |
action_space: gym.spaces.Space, | |
lr_schedule: Schedule, | |
net_arch: Optional[List[int]] = None, | |
activation_fn: Type[nn.Module] = nn.ReLU, | |
features_extractor_class: Type[BaseFeaturesExtractor] = FlattenExtractor, | |
features_extractor_kwargs: Optional[Dict[str, Any]] = None, | |
normalize_images: bool = True, | |
optimizer_class: Type[th.optim.Optimizer] = th.optim.Adam, | |
optimizer_kwargs: Optional[Dict[str, Any]] = None, | |
): | |
super(DQNPolicy, self).__init__( | |
observation_space, | |
action_space, | |
features_extractor_class, | |
features_extractor_kwargs, | |
optimizer_class=optimizer_class, | |
optimizer_kwargs=optimizer_kwargs, | |
) | |
if net_arch is None: | |
if features_extractor_class == FlattenExtractor: | |
net_arch = [64, 64] | |
else: | |
net_arch = [] | |
self.net_arch = net_arch | |
self.activation_fn = activation_fn | |
self.normalize_images = normalize_images | |
self.net_args = { | |
"observation_space": self.observation_space, | |
"action_space": self.action_space, | |
"net_arch": self.net_arch, | |
"activation_fn": self.activation_fn, | |
"normalize_images": normalize_images, | |
} | |
self.q_net, self.q_net_target = None, None | |
self._build(lr_schedule) | |
def _build(self, lr_schedule: Schedule) -> None: | |
""" | |
Create the network and the optimizer. | |
:param lr_schedule: Learning rate schedule | |
lr_schedule(1) is the initial learning rate | |
""" | |
self.q_net = self.make_q_net() | |
self.q_net_target = self.make_q_net() | |
self.q_net_target.load_state_dict(self.q_net.state_dict()) | |
# Setup optimizer with initial learning rate | |
self.optimizer = self.optimizer_class(self.parameters(), lr=lr_schedule(1), **self.optimizer_kwargs) | |
def make_q_net(self) -> QNetwork: | |
# Make sure we always have separate networks for features extractors etc | |
net_args = self._update_features_extractor(self.net_args, features_extractor=None) | |
return QNetwork(**net_args).to(self.device) | |
def forward(self, obs: th.Tensor, deterministic: bool = True) -> th.Tensor: | |
return self._predict(obs, deterministic=deterministic) | |
def _predict(self, obs: th.Tensor, deterministic: bool = True) -> th.Tensor: | |
return self.q_net._predict(obs, deterministic=deterministic) | |
def _get_constructor_parameters(self) -> Dict[str, Any]: | |
data = super()._get_constructor_parameters() | |
data.update( | |
dict( | |
net_arch=self.net_args["net_arch"], | |
activation_fn=self.net_args["activation_fn"], | |
lr_schedule=self._dummy_schedule, # dummy lr schedule, not needed for loading policy alone | |
optimizer_class=self.optimizer_class, | |
optimizer_kwargs=self.optimizer_kwargs, | |
features_extractor_class=self.features_extractor_class, | |
features_extractor_kwargs=self.features_extractor_kwargs, | |
) | |
) | |
return data | |
MlpPolicy = DQNPolicy | |
class CnnPolicy(DQNPolicy): | |
""" | |
Policy class for DQN when using images as input. | |
:param observation_space: Observation space | |
:param action_space: Action space | |
:param lr_schedule: Learning rate schedule (could be constant) | |
:param net_arch: The specification of the policy and value networks. | |
:param activation_fn: Activation function | |
:param features_extractor_class: Features extractor to use. | |
:param normalize_images: Whether to normalize images or not, | |
dividing by 255.0 (True by default) | |
:param optimizer_class: The optimizer to use, | |
``th.optim.Adam`` by default | |
:param optimizer_kwargs: Additional keyword arguments, | |
excluding the learning rate, to pass to the optimizer | |
""" | |
def __init__( | |
self, | |
observation_space: gym.spaces.Space, | |
action_space: gym.spaces.Space, | |
lr_schedule: Schedule, | |
net_arch: Optional[List[int]] = None, | |
activation_fn: Type[nn.Module] = nn.ReLU, | |
features_extractor_class: Type[BaseFeaturesExtractor] = NatureCNN, | |
features_extractor_kwargs: Optional[Dict[str, Any]] = None, | |
normalize_images: bool = True, | |
optimizer_class: Type[th.optim.Optimizer] = th.optim.Adam, | |
optimizer_kwargs: Optional[Dict[str, Any]] = None, | |
): | |
super(CnnPolicy, self).__init__( | |
observation_space, | |
action_space, | |
lr_schedule, | |
net_arch, | |
activation_fn, | |
features_extractor_class, | |
features_extractor_kwargs, | |
normalize_images, | |
optimizer_class, | |
optimizer_kwargs, | |
) | |
register_policy("MlpPolicy", MlpPolicy) | |
register_policy("CnnPolicy", CnnPolicy) | |