zjowowen's picture
init space
079c32c
from typing import List, Dict, Any, Optional
from collections import deque
import copy
import torch
from ding.utils import list_split, lists_to_dicts
from ding.rl_utils.gae import gae, gae_data
class Adder(object):
"""
Overview:
Adder is a component that handles different transformations and calculations for transitions
in Collector Module(data generation and processing), such as GAE, n-step return, transition sampling etc.
Interface:
__init__, get_gae, get_gae_with_default_last_value, get_nstep_return_data, get_train_sample
"""
@classmethod
def get_gae(cls, data: List[Dict[str, Any]], last_value: torch.Tensor, gamma: float, gae_lambda: float,
cuda: bool) -> List[Dict[str, Any]]:
"""
Overview:
Get GAE advantage for stacked transitions(T timestep, 1 batch). Call ``gae`` for calculation.
Arguments:
- data (:obj:`list`): Transitions list, each element is a transition dict with at least ['value', 'reward']
- last_value (:obj:`torch.Tensor`): The last value(i.e.: the T+1 timestep)
- gamma (:obj:`float`): The future discount factor, should be in [0, 1], defaults to 0.99.
- gae_lambda (:obj:`float`): GAE lambda parameter, should be in [0, 1], defaults to 0.97, \
when lambda -> 0, it induces bias, but when lambda -> 1, it has high variance due to the sum of terms.
- cuda (:obj:`bool`): Whether use cuda in GAE computation
Returns:
- data (:obj:`list`): transitions list like input one, but each element owns extra advantage key 'adv'
Examples:
>>> B, T = 2, 3 # batch_size, timestep
>>> data = [dict(value=torch.randn(B), reward=torch.randn(B)) for _ in range(T)]
>>> last_value = torch.randn(B)
>>> gamma = 0.99
>>> gae_lambda = 0.95
>>> cuda = False
>>> data = Adder.get_gae(data, last_value, gamma, gae_lambda, cuda)
"""
value = torch.stack([d['value'] for d in data])
next_value = torch.stack([d['value'] for d in data][1:] + [last_value])
reward = torch.stack([d['reward'] for d in data])
if cuda:
value = value.cuda()
next_value = next_value.cuda()
reward = reward.cuda()
adv = gae(gae_data(value, next_value, reward, None, None), gamma, gae_lambda)
if cuda:
adv = adv.cpu()
for i in range(len(data)):
data[i]['adv'] = adv[i]
return data
@classmethod
def get_gae_with_default_last_value(cls, data: deque, done: bool, gamma: float, gae_lambda: float,
cuda: bool) -> List[Dict[str, Any]]:
"""
Overview:
Like ``get_gae`` above to get GAE advantage for stacked transitions. However, this function is designed in
case ``last_value`` is not passed. If transition is not done yet, it wouold assign last value in ``data``
as ``last_value``, discard the last element in ``data``(i.e. len(data) would decrease by 1), and then call
``get_gae``. Otherwise it would make ``last_value`` equal to 0.
Arguments:
- data (:obj:`deque`): Transitions list, each element is a transition dict with \
at least['value', 'reward']
- done (:obj:`bool`): Whether the transition reaches the end of an episode(i.e. whether the env is done)
- gamma (:obj:`float`): The future discount factor, should be in [0, 1], defaults to 0.99.
- gae_lambda (:obj:`float`): GAE lambda parameter, should be in [0, 1], defaults to 0.97, \
when lambda -> 0, it induces bias, but when lambda -> 1, it has high variance due to the sum of terms.
- cuda (:obj:`bool`): Whether use cuda in GAE computation
Returns:
- data (:obj:`List[Dict[str, Any]]`): transitions list like input one, but each element owns \
extra advantage key 'adv'
Examples:
>>> B, T = 2, 3 # batch_size, timestep
>>> data = [dict(value=torch.randn(B), reward=torch.randn(B)) for _ in range(T)]
>>> done = False
>>> gamma = 0.99
>>> gae_lambda = 0.95
>>> cuda = False
>>> data = Adder.get_gae_with_default_last_value(data, done, gamma, gae_lambda, cuda)
"""
if done:
last_value = torch.zeros_like(data[-1]['value'])
else:
last_data = data.pop()
last_value = last_data['value']
return cls.get_gae(data, last_value, gamma, gae_lambda, cuda)
@classmethod
def get_nstep_return_data(
cls,
data: deque,
nstep: int,
cum_reward=False,
correct_terminate_gamma=True,
gamma=0.99,
) -> deque:
"""
Overview:
Process raw traj data by updating keys ['next_obs', 'reward', 'done'] in data's dict element.
Arguments:
- data (:obj:`deque`): Transitions list, each element is a transition dict
- nstep (:obj:`int`): Number of steps. If equals to 1, return ``data`` directly; \
Otherwise update with nstep value.
Returns:
- data (:obj:`deque`): Transitions list like input one, but each element updated with nstep value.
Examples:
>>> data = [dict(
>>> obs=torch.randn(B),
>>> reward=torch.randn(1),
>>> next_obs=torch.randn(B),
>>> done=False) for _ in range(T)]
>>> nstep = 2
>>> data = Adder.get_nstep_return_data(data, nstep)
"""
if nstep == 1:
return data
fake_reward = torch.zeros(1)
next_obs_flag = 'next_obs' in data[0]
for i in range(len(data) - nstep):
# update keys ['next_obs', 'reward', 'done'] with their n-step value
if next_obs_flag:
data[i]['next_obs'] = data[i + nstep]['obs'] # do not need deepcopy
if cum_reward:
data[i]['reward'] = sum([data[i + j]['reward'] * (gamma ** j) for j in range(nstep)])
else:
data[i]['reward'] = torch.cat([data[i + j]['reward'] for j in range(nstep)])
data[i]['done'] = data[i + nstep - 1]['done']
if correct_terminate_gamma:
data[i]['value_gamma'] = gamma ** nstep
for i in range(max(0, len(data) - nstep), len(data)):
if next_obs_flag:
data[i]['next_obs'] = data[-1]['next_obs'] # do not need deepcopy
if cum_reward:
data[i]['reward'] = sum([data[i + j]['reward'] * (gamma ** j) for j in range(len(data) - i)])
else:
data[i]['reward'] = torch.cat(
[data[i + j]['reward']
for j in range(len(data) - i)] + [fake_reward for _ in range(nstep - (len(data) - i))]
)
data[i]['done'] = data[-1]['done']
if correct_terminate_gamma:
data[i]['value_gamma'] = gamma ** (len(data) - i - 1)
return data
@classmethod
def get_train_sample(
cls,
data: List[Dict[str, Any]],
unroll_len: int,
last_fn_type: str = 'last',
null_transition: Optional[dict] = None
) -> List[Dict[str, Any]]:
"""
Overview:
Process raw traj data by updating keys ['next_obs', 'reward', 'done'] in data's dict element.
If ``unroll_len`` equals to 1, which means no process is needed, can directly return ``data``.
Otherwise, ``data`` will be splitted according to ``unroll_len``, process residual part according to
``last_fn_type`` and call ``lists_to_dicts`` to form sampled training data.
Arguments:
- data (:obj:`List[Dict[str, Any]]`): Transitions list, each element is a transition dict
- unroll_len (:obj:`int`): Learn training unroll length
- last_fn_type (:obj:`str`): The method type name for dealing with last residual data in a traj \
after splitting, should be in ['last', 'drop', 'null_padding']
- null_transition (:obj:`Optional[dict]`): Dict type null transition, used in ``null_padding``
Returns:
- data (:obj:`List[Dict[str, Any]]`): Transitions list processed after unrolling
"""
if unroll_len == 1:
return data
else:
# cut data into pieces whose length is unroll_len
split_data, residual = list_split(data, step=unroll_len)
def null_padding():
template = copy.deepcopy(residual[0])
template['null'] = True
if isinstance(template['obs'], dict):
template['obs'] = {k: torch.zeros_like(v) for k, v in template['obs'].items()}
else:
template['obs'] = torch.zeros_like(template['obs'])
if 'action' in template:
template['action'] = torch.zeros_like(template['action'])
template['done'] = True
template['reward'] = torch.zeros_like(template['reward'])
if 'value_gamma' in template:
template['value_gamma'] = 0.
null_data = [cls._get_null_transition(template, null_transition) for _ in range(miss_num)]
return null_data
if residual is not None:
miss_num = unroll_len - len(residual)
if last_fn_type == 'drop':
# drop the residual part
pass
elif last_fn_type == 'last':
if len(split_data) > 0:
# copy last datas from split_data's last element, and insert in front of residual
last_data = copy.deepcopy(split_data[-1][-miss_num:])
split_data.append(last_data + residual)
else:
# get null transitions using ``null_padding``, and insert behind residual
null_data = null_padding()
split_data.append(residual + null_data)
elif last_fn_type == 'null_padding':
# same to the case of 'last' type and split_data is empty
null_data = null_padding()
split_data.append(residual + null_data)
# collate unroll_len dicts according to keys
if len(split_data) > 0:
split_data = [lists_to_dicts(d, recursive=True) for d in split_data]
return split_data
@classmethod
def _get_null_transition(cls, template: dict, null_transition: Optional[dict] = None) -> dict:
"""
Overview:
Get null transition for padding. If ``cls._null_transition`` is None, return input ``template`` instead.
Arguments:
- template (:obj:`dict`): The template for null transition.
- null_transition (:obj:`Optional[dict]`): Dict type null transition, used in ``null_padding``
Returns:
- null_transition (:obj:`dict`): The deepcopied null transition.
"""
if null_transition is not None:
return copy.deepcopy(null_transition)
else:
return copy.deepcopy(template)
get_gae = Adder.get_gae
get_gae_with_default_last_value = Adder.get_gae_with_default_last_value
get_nstep_return_data = Adder.get_nstep_return_data
get_train_sample = Adder.get_train_sample