File size: 5,546 Bytes
479abbd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
import copy
import random
from collections import deque
from typing import Any, Deque, Dict, List, Optional
import numpy as np
from rl_algo_impls.runner.config import Config
from rl_algo_impls.shared.policy.policy import Policy
from rl_algo_impls.wrappers.action_mask_wrapper import find_action_masker
from rl_algo_impls.wrappers.vectorable_wrapper import (
VecEnvObs,
VecEnvStepReturn,
VecotarableWrapper,
)
class SelfPlayWrapper(VecotarableWrapper):
next_obs: VecEnvObs
next_action_masks: Optional[np.ndarray]
def __init__(
self,
env,
config: Config,
num_old_policies: int = 0,
save_steps: int = 20_000,
swap_steps: int = 10_000,
window: int = 10,
swap_window_size: int = 2,
selfplay_bots: Optional[Dict[str, Any]] = None,
bot_always_player_2: bool = False,
) -> None:
super().__init__(env)
assert num_old_policies % 2 == 0, f"num_old_policies must be even"
assert (
num_old_policies % swap_window_size == 0
), f"num_old_policies must be a multiple of swap_window_size"
self.config = config
self.num_old_policies = num_old_policies
self.save_steps = save_steps
self.swap_steps = swap_steps
self.swap_window_size = swap_window_size
self.selfplay_bots = selfplay_bots
self.bot_always_player_2 = bot_always_player_2
self.policies: Deque[Policy] = deque(maxlen=window)
self.policy_assignments: List[Optional[Policy]] = [None] * env.num_envs
self.steps_since_swap = np.zeros(env.num_envs)
self.selfplay_policies: Dict[str, Policy] = {}
self.num_envs = env.num_envs - num_old_policies
if self.selfplay_bots:
self.num_envs -= sum(self.selfplay_bots.values())
self.initialize_selfplay_bots()
def get_action_mask(self) -> Optional[np.ndarray]:
return self.env.get_action_mask()[self.learner_indexes()]
def learner_indexes(self) -> List[int]:
return [p is None for p in self.policy_assignments]
def checkpoint_policy(self, copied_policy: Policy) -> None:
copied_policy.train(False)
self.policies.append(copied_policy)
if all(p is None for p in self.policy_assignments[: 2 * self.num_old_policies]):
for i in range(self.num_old_policies):
# Switch between player 1 and 2
self.policy_assignments[
2 * i + (i % 2 if not self.bot_always_player_2 else 1)
] = copied_policy
def swap_policy(self, idx: int, swap_window_size: int = 1) -> None:
policy = random.choice(self.policies)
idx = idx // 2 * 2
for j in range(swap_window_size * 2):
if self.policy_assignments[idx + j]:
self.policy_assignments[idx + j] = policy
self.steps_since_swap[idx : idx + swap_window_size * 2] = np.zeros(
swap_window_size * 2
)
def initialize_selfplay_bots(self) -> None:
if not self.selfplay_bots:
return
from rl_algo_impls.runner.running_utils import get_device, make_policy
env = self.env # Type: ignore
device = get_device(self.config, env)
start_idx = 2 * self.num_old_policies
for model_path, n in self.selfplay_bots.items():
policy = make_policy(
self.config.algo,
env,
device,
load_path=model_path,
**self.config.policy_hyperparams,
).eval()
self.selfplay_policies["model_path"] = policy
for idx in range(start_idx, start_idx + 2 * n, 2):
bot_idx = (
(idx + 1) if self.bot_always_player_2 else (idx + idx // 2 % 2)
)
self.policy_assignments[bot_idx] = policy
start_idx += 2 * n
def step(self, actions: np.ndarray) -> VecEnvStepReturn:
env = self.env # type: ignore
all_actions = np.zeros((env.num_envs,) + actions.shape[1:], dtype=actions.dtype)
orig_learner_indexes = self.learner_indexes()
all_actions[orig_learner_indexes] = actions
for policy in set(p for p in self.policy_assignments if p):
policy_indexes = [policy == p for p in self.policy_assignments]
if any(policy_indexes):
all_actions[policy_indexes] = policy.act(
self.next_obs[policy_indexes],
deterministic=False,
action_masks=self.next_action_masks[policy_indexes]
if self.next_action_masks is not None
else None,
)
self.next_obs, rew, done, info = env.step(all_actions)
self.next_action_masks = self.env.get_action_mask()
rew = rew[orig_learner_indexes]
info = [i for i, b in zip(info, orig_learner_indexes) if b]
self.steps_since_swap += 1
for idx in range(0, self.num_old_policies * 2, 2 * self.swap_window_size):
if self.steps_since_swap[idx] > self.swap_steps:
self.swap_policy(idx, self.swap_window_size)
new_learner_indexes = self.learner_indexes()
return self.next_obs[new_learner_indexes], rew, done[new_learner_indexes], info
def reset(self) -> VecEnvObs:
self.next_obs = super().reset()
self.next_action_masks = self.env.get_action_mask()
return self.next_obs[self.learner_indexes()]
|