File size: 6,595 Bytes
079c32c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
from typing import List, Dict, Any, Optional, Callable, Tuple
import copy
import numpy as np
import torch


class HerRewardModel:
    """
    Overview:
        Hindsight Experience Replay model.

    .. note::
        - her_strategy (:obj:`str`): Type of strategy that HER uses, should be in ['final', 'future', 'episode']
        - her_replay_k (:obj:`int`): Number of new episodes generated by an original episode. (Not used in episodic HER)
        - episode_size (:obj:`int`): Sample how many episodes in one iteration.
        - sample_per_episode (:obj:`int`): How many new samples are generated from an episode.

    .. note::
        In HER, we require episode trajectory to change the goals. However, episode lengths are different
        and may have high variance. As a result, we **recommend** that you only use some transitions in
        the complete episode by specifying ``episode_size`` and ``sample_per_episode`` in config.
        Therefore, in one iteration, ``batch_size`` would be ``episode_size`` * ``sample_per_episode``.
    """

    def __init__(
            self,
            cfg: dict,
            cuda: bool = False,
    ) -> None:
        self._cuda = cuda and torch.cuda.is_available()
        self._device = 'cuda' if self._cuda else 'cpu'
        self._her_strategy = cfg.her_strategy
        assert self._her_strategy in ['final', 'future', 'episode']
        # `her_replay_k` may not be used in episodic HER, so default set to 1.
        self._her_replay_k = cfg.get('her_replay_k', 1)
        self._episode_size = cfg.get('episode_size', None)
        self._sample_per_episode = cfg.get('sample_per_episode', None)

    def estimate(
            self,
            episode: List[Dict[str, Any]],
            merge_func: Optional[Callable] = None,
            split_func: Optional[Callable] = None,
            goal_reward_func: Optional[Callable] = None
    ) -> List[Dict[str, Any]]:
        """
        Overview:
            Get HER processed episodes from original episodes.
        Arguments:
            - episode (:obj:`List[Dict[str, Any]]`): Episode list, each element is a transition.
            - merge_func (:obj:`Callable`): The merge function to use, default set to None. If None, \
                then use ``__her_default_merge_func``
            - split_func (:obj:`Callable`): The split function to use, default set to None. If None, \
                then use ``__her_default_split_func``
            - goal_reward_func (:obj:`Callable`): The goal_reward function to use, default set to None. If None, \
                then use ``__her_default_goal_reward_func``
        Returns:
            - new_episode (:obj:`List[Dict[str, Any]]`): the processed transitions
        """
        if merge_func is None:
            merge_func = HerRewardModel.__her_default_merge_func
        if split_func is None:
            split_func = HerRewardModel.__her_default_split_func
        if goal_reward_func is None:
            goal_reward_func = HerRewardModel.__her_default_goal_reward_func
        new_episodes = [[] for _ in range(self._her_replay_k)]
        if self._sample_per_episode is None:
            # Use complete episode
            indices = range(len(episode))
        else:
            # Use some transitions in one episode
            indices = np.random.randint(0, len(episode), (self._sample_per_episode))
        for idx in indices:
            obs, _, _ = split_func(episode[idx]['obs'])
            next_obs, _, achieved_goal = split_func(episode[idx]['next_obs'])
            for k in range(self._her_replay_k):
                if self._her_strategy == 'final':
                    p_idx = -1
                elif self._her_strategy == 'episode':
                    p_idx = np.random.randint(0, len(episode))
                elif self._her_strategy == 'future':
                    p_idx = np.random.randint(idx, len(episode))
                _, _, new_desired_goal = split_func(episode[p_idx]['next_obs'])
                timestep = {
                    k: copy.deepcopy(v)
                    for k, v in episode[idx].items() if k not in ['obs', 'next_obs', 'reward']
                }
                timestep['obs'] = merge_func(obs, new_desired_goal)
                timestep['next_obs'] = merge_func(next_obs, new_desired_goal)
                timestep['reward'] = goal_reward_func(achieved_goal, new_desired_goal).to(self._device)
                new_episodes[k].append(timestep)
        return new_episodes

    @staticmethod
    def __her_default_merge_func(x: Any, y: Any) -> Any:
        r"""
        Overview:
            The function to merge obs in HER timestep
        Arguments:
            - x (:obj:`Any`): one of the timestep obs to merge
            - y (:obj:`Any`): another timestep obs to merge
        Returns:
            - ret (:obj:`Any`): the merge obs
        """
        # TODO(nyz) dict/list merge_func
        return torch.cat([x, y], dim=0)

    @staticmethod
    def __her_default_split_func(x: Any) -> Tuple[Any, Any, Any]:
        r"""
        Overview:
            Split the input into obs, desired goal, and achieved goal.
        Arguments:
            - x (:obj:`Any`): The input to split
        Returns:
            - obs (:obj:`torch.Tensor`): Original obs.
            - desired_goal (:obj:`torch.Tensor`): The final goal that wants to desired_goal
            - achieved_goal (:obj:`torch.Tensor`): the achieved_goal
        """
        # TODO(nyz) dict/list split_func
        # achieved_goal = f(obs), default: f == identical function
        obs, desired_goal = torch.chunk(x, 2)
        achieved_goal = obs
        return obs, desired_goal, achieved_goal

    @staticmethod
    def __her_default_goal_reward_func(achieved_goal: torch.Tensor, desired_goal: torch.Tensor) -> torch.Tensor:
        r"""
        Overview:
            Get the corresponding merge reward according to whether the achieved_goal fit the desired_goal
        Arguments:
            - achieved_goal (:obj:`torch.Tensor`): the achieved goal
            - desired_goal (:obj:`torch.Tensor`): the desired_goal
        Returns:
            - goal_reward (:obj:`torch.Tensor`): the goal reward according to \
            whether the achieved_goal fit the disired_goal
        """
        if (achieved_goal == desired_goal).all():
            return torch.FloatTensor([1])
        else:
            return torch.FloatTensor([0])

    @property
    def episode_size(self) -> int:
        return self._episode_size

    @property
    def sample_per_episode(self) -> int:
        return self._sample_per_episode