File size: 1,473 Bytes
946448b
 
c2ccc18
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from typing import Dict, List, Optional, Tuple, Type, TypeVar, Union

import numpy as np
from gym import Env, Space, Wrapper
from stable_baselines3.common.vec_env import VecEnv as SB3VecEnv

VecEnvObs = Union[np.ndarray, Dict[str, np.ndarray], Tuple[np.ndarray, ...]]
VecEnvStepReturn = Tuple[VecEnvObs, np.ndarray, np.ndarray, List[Dict]]


class VecotarableWrapper(Wrapper):
    def __init__(self, env: Env) -> None:
        super().__init__(env)
        self.num_envs = getattr(env, "num_envs", 1)
        self.is_vector_env = getattr(env, "is_vector_env", False)
        self.single_observation_space = single_observation_space(env)
        self.single_action_space = single_action_space(env)

    def step(self, action) -> VecEnvStepReturn:
        return self.env.step(action)

    def reset(self) -> VecEnvObs:
        return self.env.reset()


VecEnv = Union[VecotarableWrapper, SB3VecEnv]


def single_observation_space(env: Union[VecEnv, Env]) -> Space:
    return getattr(env, "single_observation_space", env.observation_space)


def single_action_space(env: Union[VecEnv, Env]) -> Space:
    return getattr(env, "single_action_space", env.action_space)


W = TypeVar("W", bound=Wrapper)


def find_wrapper(env: VecEnv, wrapper_class: Type[W]) -> Optional[W]:
    current = env
    while current and current != current.unwrapped:
        if isinstance(current, wrapper_class):
            return current
        current = getattr(current, "env")
    return None