DQN playing SpaceInvadersNoFrameskip-v4 from https://github.com/sgoodfriend/rl-algo-impls/tree/983cb75e43e51cf4ef57f177194ab9a4a1a8808b
271ebe6
import numpy as np | |
import torch | |
from typing import NamedTuple, Sequence | |
from rl_algo_impls.shared.policy.actor_critic import OnPolicy | |
from rl_algo_impls.shared.trajectory import Trajectory | |
from rl_algo_impls.wrappers.vectorable_wrapper import VecEnvObs | |
class RtgAdvantage(NamedTuple): | |
rewards_to_go: torch.Tensor | |
advantage: torch.Tensor | |
def discounted_cumsum(x: np.ndarray, gamma: float) -> np.ndarray: | |
dc = x.copy() | |
for i in reversed(range(len(x) - 1)): | |
dc[i] += gamma * dc[i + 1] | |
return dc | |
def compute_advantage_from_trajectories( | |
trajectories: Sequence[Trajectory], | |
policy: OnPolicy, | |
gamma: float, | |
gae_lambda: float, | |
device: torch.device, | |
) -> torch.Tensor: | |
advantage = [] | |
for traj in trajectories: | |
last_val = 0 | |
if not traj.terminated and traj.next_obs is not None: | |
last_val = policy.value(traj.next_obs) | |
rew = np.append(np.array(traj.rew), last_val) | |
v = np.append(np.array(traj.v), last_val) | |
deltas = rew[:-1] + gamma * v[1:] - v[:-1] | |
advantage.append(discounted_cumsum(deltas, gamma * gae_lambda)) | |
return torch.as_tensor( | |
np.concatenate(advantage), dtype=torch.float32, device=device | |
) | |
def compute_rtg_and_advantage_from_trajectories( | |
trajectories: Sequence[Trajectory], | |
policy: OnPolicy, | |
gamma: float, | |
gae_lambda: float, | |
device: torch.device, | |
) -> RtgAdvantage: | |
rewards_to_go = [] | |
advantages = [] | |
for traj in trajectories: | |
last_val = 0 | |
if not traj.terminated and traj.next_obs is not None: | |
last_val = policy.value(traj.next_obs) | |
rew = np.append(np.array(traj.rew), last_val) | |
v = np.append(np.array(traj.v), last_val) | |
deltas = rew[:-1] + gamma * v[1:] - v[:-1] | |
adv = discounted_cumsum(deltas, gamma * gae_lambda) | |
advantages.append(adv) | |
rewards_to_go.append(v[:-1] + adv) | |
return RtgAdvantage( | |
torch.as_tensor( | |
np.concatenate(rewards_to_go), dtype=torch.float32, device=device | |
), | |
torch.as_tensor(np.concatenate(advantages), dtype=torch.float32, device=device), | |
) | |
def compute_advantages( | |
rewards: np.ndarray, | |
values: np.ndarray, | |
episode_starts: np.ndarray, | |
next_episode_starts: np.ndarray, | |
next_obs: VecEnvObs, | |
policy: OnPolicy, | |
gamma: float, | |
gae_lambda: float, | |
) -> np.ndarray: | |
advantages = np.zeros_like(rewards) | |
last_gae_lam = 0 | |
n_steps = advantages.shape[0] | |
for t in reversed(range(n_steps)): | |
if t == n_steps - 1: | |
next_nonterminal = 1.0 - next_episode_starts | |
next_value = policy.value(next_obs) | |
else: | |
next_nonterminal = 1.0 - episode_starts[t + 1] | |
next_value = values[t + 1] | |
delta = rewards[t] + gamma * next_value * next_nonterminal - values[t] | |
last_gae_lam = delta + gamma * gae_lambda * next_nonterminal * last_gae_lam | |
advantages[t] = last_gae_lam | |
return advantages | |