Map pools
Map pools allow you to instantiate multiple versions of your environment on the backend, the enables higher throughput with parallelization of interaction in simulations and embodied environments. Using map pools is simple with 🤗 Simulate. First define a function that will generate your environment, we call each environment instance a “map”.
def generate_map(index):
root = sm.Asset(name=f"root_{index}")
root += sm.Box(
name=f"floor_{index}",
position=[0, -0.05, 0],
scaling=[10, 0.1, 10],
material=sm.Material.BLUE,
with_collider=True,
)
root += sm.Box(
name=f"wall1_{index}",
position=[-1, 0.5, 0],
scaling=[0.1, 1, 5.1],
material=sm.Material.GRAY75,
with_collider=True,
)
root += sm.Box(
name=f"wall2_{index}",
position=[1, 0.5, 0],
scaling=[0.1, 1, 5.1],
material=sm.Material.GRAY75,
with_collider=True,
)
root += sm.Box(
name=f"wall3_{index}",
position=[0, 0.5, 4.5],
scaling=[5.9, 1, 0.1],
material=sm.Material.GRAY75,
with_collider=True,
)
# add actors, sensors, reward functions etc ...
return root
You can then provide the generate_map
method as an argument to the sm.ParallelRLEnv
class, which will instantiate n_maps
.
Training with a subset of the maps is possible using the n_show
option. At each environment reset, it cycles through to the next map.
class simulate.ParallelRLEnv
< source >( map_fn: typing.Union[typing.Callable, simulate.scene.Scene] n_maps: typing.Optional[int] = 1 n_show: typing.Optional[int] = 1 time_step: typing.Optional[float] = 0.03333333333333333 frame_skip: typing.Optional[int] = 4 **engine_kwargs )
RL environment wrapper for Simulate scene. Uses functionality from the VecEnv in stable baselines 3 For more information on VecEnv, see the source https://stable-baselines3.readthedocs.io/en/master/guide/vec_envs.html
Close the environment.
env_is_wrapped
< source >( wrapper_class: typing.Type[gym.core.Wrapper] indices: typing.Union[NoneType, int, typing.Iterable[int]] = None )
Check if the environment is wrapped.
reset
< source >(
)
→
obs (Dict
)
Returns
obs (Dict
)
the observation of the environment after reset.
Resets the actors and the scene of the environment.
sample_action
< source >(
)
→
action (list[list[list[float]]]
)
Returns
action (list[list[list[float]]]
)
Lists of the actions, dimensions are n-maps, n-actors, action-dim.
Samples an action from the actors in the environment. This function loads the configuration of maps and actors to return the correct shape across multiple configurations.
step
< source >(
action: typing.Union[typing.Dict, typing.List, numpy.ndarray]
)
→
all_observation (List[Dict]
)
Parameters
Returns
all_observation (List[Dict]
)
a list of dict of observations for each sensor.
all_reward (List[float]
):
all the rewards for the current step.
all_done (List[bool]
):
whether each episode is done.
all_info (List[Dict]
):
a list of dict of additional information.
The step function for the environment, follows the API from OpenAI Gym.
TODO verify, a dict with actuator tags as keys and as values a Tensor of shape (n_show, n_actors, n_actions)
step_recv_async
< source >(
)
→
obs (Dict
)
Returns
obs (Dict
)
A dict of observations for each sensor.
reward (float
):
The reward for the current step.
done (bool
):
Whether the episode is done.
info (Dict
):
A dict of additional information.
Receive the response of a step from the environment asynchronously.
step_send_async
< source >( action: typing.Union[typing.Dict, typing.List, numpy.ndarray] )
Send a step to the environment asynchronously.