sgoodfriend's picture
PPO playing starpilot from https://github.com/sgoodfriend/rl-algo-impls/tree/227aa2fbde36e688a09d8ad309b0947721eef160
80e385e
import itertools
import numpy as np
import os
from copy import deepcopy
from stable_baselines3.common.vec_env.base_vec_env import VecEnv, VecEnvWrapper
from stable_baselines3.common.vec_env.vec_normalize import VecNormalize
from torch.utils.tensorboard.writer import SummaryWriter
from typing import List, Optional, Union
from shared.callbacks.callback import Callback
from shared.policy.policy import Policy
from shared.stats import Episode, EpisodeAccumulator, EpisodesStats
from wrappers.vec_episode_recorder import VecEpisodeRecorder
class EvaluateAccumulator(EpisodeAccumulator):
def __init__(
self,
num_envs: int,
goal_episodes: int,
print_returns: bool = True,
ignore_first_episode: bool = False,
):
super().__init__(num_envs)
self.completed_episodes_by_env_idx = [[] for _ in range(num_envs)]
self.goal_episodes_per_env = int(np.ceil(goal_episodes / num_envs))
self.print_returns = print_returns
if ignore_first_episode:
first_done = set()
def should_record_done(idx: int) -> bool:
has_done_first_episode = idx in first_done
first_done.add(idx)
return has_done_first_episode
self.should_record_done = should_record_done
else:
self.should_record_done = lambda idx: True
def on_done(self, ep_idx: int, episode: Episode) -> None:
if (
self.should_record_done(ep_idx)
and len(self.completed_episodes_by_env_idx[ep_idx])
>= self.goal_episodes_per_env
):
return
self.completed_episodes_by_env_idx[ep_idx].append(episode)
if self.print_returns:
print(
f"Episode {len(self)} | "
f"Score {episode.score} | "
f"Length {episode.length}"
)
def __len__(self) -> int:
return sum(len(ce) for ce in self.completed_episodes_by_env_idx)
@property
def episodes(self) -> List[Episode]:
return list(itertools.chain(*self.completed_episodes_by_env_idx))
def is_done(self) -> bool:
return all(
len(ce) == self.goal_episodes_per_env
for ce in self.completed_episodes_by_env_idx
)
def evaluate(
env: VecEnv,
policy: Policy,
n_episodes: int,
render: bool = False,
deterministic: bool = True,
print_returns: bool = True,
ignore_first_episode: bool = False,
) -> EpisodesStats:
policy.eval()
episodes = EvaluateAccumulator(
env.num_envs, n_episodes, print_returns, ignore_first_episode
)
obs = env.reset()
while not episodes.is_done():
act = policy.act(obs, deterministic=deterministic)
obs, rew, done, _ = env.step(act)
episodes.step(rew, done)
if render:
env.render()
stats = EpisodesStats(episodes.episodes)
if print_returns:
print(stats)
return stats
class EvalCallback(Callback):
def __init__(
self,
policy: Policy,
env: VecEnv,
tb_writer: SummaryWriter,
best_model_path: Optional[str] = None,
step_freq: Union[int, float] = 50_000,
n_episodes: int = 10,
save_best: bool = True,
deterministic: bool = True,
record_best_videos: bool = True,
video_env: Optional[VecEnv] = None,
best_video_dir: Optional[str] = None,
max_video_length: int = 3600,
ignore_first_episode: bool = False,
) -> None:
super().__init__()
self.policy = policy
self.env = env
self.tb_writer = tb_writer
self.best_model_path = best_model_path
self.step_freq = int(step_freq)
self.n_episodes = n_episodes
self.save_best = save_best
self.deterministic = deterministic
self.stats: List[EpisodesStats] = []
self.best = None
self.record_best_videos = record_best_videos
assert video_env or not record_best_videos
self.video_env = video_env
assert best_video_dir or not record_best_videos
self.best_video_dir = best_video_dir
if best_video_dir:
os.makedirs(best_video_dir, exist_ok=True)
self.max_video_length = max_video_length
self.best_video_base_path = None
self.ignore_first_episode = ignore_first_episode
def on_step(self, timesteps_elapsed: int = 1) -> bool:
super().on_step(timesteps_elapsed)
if self.timesteps_elapsed // self.step_freq >= len(self.stats):
sync_vec_normalize(self.policy.vec_normalize, self.env)
self.evaluate()
return True
def evaluate(
self, n_episodes: Optional[int] = None, print_returns: Optional[bool] = None
) -> EpisodesStats:
eval_stat = evaluate(
self.env,
self.policy,
n_episodes or self.n_episodes,
deterministic=self.deterministic,
print_returns=print_returns or False,
ignore_first_episode=self.ignore_first_episode,
)
self.policy.train(True)
print(f"Eval Timesteps: {self.timesteps_elapsed} | {eval_stat}")
self.stats.append(eval_stat)
if not self.best or eval_stat >= self.best:
strictly_better = not self.best or eval_stat > self.best
self.best = eval_stat
if self.save_best:
assert self.best_model_path
self.policy.save(self.best_model_path)
print("Saved best model")
self.best.write_to_tensorboard(
self.tb_writer, "best_eval", self.timesteps_elapsed
)
if strictly_better and self.record_best_videos:
assert self.video_env and self.best_video_dir
sync_vec_normalize(self.policy.vec_normalize, self.video_env)
self.best_video_base_path = os.path.join(
self.best_video_dir, str(self.timesteps_elapsed)
)
video_wrapped = VecEpisodeRecorder(
self.video_env,
self.best_video_base_path,
max_video_length=self.max_video_length,
)
video_stats = evaluate(
video_wrapped,
self.policy,
1,
deterministic=self.deterministic,
print_returns=False,
)
print(f"Saved best video: {video_stats}")
eval_stat.write_to_tensorboard(self.tb_writer, "eval", self.timesteps_elapsed)
return eval_stat
def sync_vec_normalize(
origin_vec_normalize: Optional[VecNormalize], destination_env: VecEnv
) -> None:
if origin_vec_normalize is not None:
eval_env_wrapper = destination_env
while isinstance(eval_env_wrapper, VecEnvWrapper):
if isinstance(eval_env_wrapper, VecNormalize):
if hasattr(origin_vec_normalize, "obs_rms"):
eval_env_wrapper.obs_rms = deepcopy(origin_vec_normalize.obs_rms)
eval_env_wrapper.ret_rms = deepcopy(origin_vec_normalize.ret_rms)
eval_env_wrapper = eval_env_wrapper.venv