Spaces:
Running
Running
from __future__ import absolute_import | |
from __future__ import division | |
from __future__ import print_function | |
"""Tests for common.rollout.""" | |
import numpy as np | |
import tensorflow as tf | |
from common import rollout as rollout_lib # brain coder | |
class RolloutTest(tf.test.TestCase): | |
def MakeRollout(self, states, actions, rewards, values=None, terminated=True): | |
rollout = rollout_lib.Rollout() | |
rollout.add_many( | |
states=states, actions=actions, rewards=rewards, values=values, | |
terminated=terminated) | |
return rollout | |
def testDiscount(self): | |
discounted = np.array([1.0 / 2 ** n for n in range(4, -1, -1)]) | |
discounted[:2] += [1.0 / 2 ** n for n in range(1, -1, -1)] | |
self.assertTrue(np.array_equal( | |
rollout_lib.discount([0.0, 1.0, 0.0, 0.0, 1.0], 0.50), | |
discounted)) | |
self.assertTrue(np.array_equal( | |
rollout_lib.discount(np.array([0.0, 1.0, 0.0, 0.0, 1.0]), 0.50), | |
discounted)) | |
def testDiscountedAdvantageAndRewards(self): | |
# lambda=1, No bootstrapping. | |
values = [0.1, 0.5, 0.5, 0.25] | |
(empirical_values, | |
generalized_advantage) = rollout_lib.discounted_advantage_and_rewards( | |
[0.0, 0.0, 0.0, 1.0], | |
values, | |
gamma=0.75, | |
lambda_=1.0) | |
expected_discounted_r = ( | |
np.array([1.0 * 0.75 ** n for n in range(3, -1, -1)])) | |
expected_adv = expected_discounted_r - values | |
self.assertTrue(np.array_equal(empirical_values, expected_discounted_r)) | |
self.assertTrue(np.allclose(generalized_advantage, expected_adv)) | |
# lambda=1, With bootstrapping. | |
values = [0.1, 0.5, 0.5, 0.25, 0.75] | |
(empirical_values, | |
generalized_advantage) = rollout_lib.discounted_advantage_and_rewards( | |
[0.0, 0.0, 0.0, 1.0], | |
values, | |
gamma=0.75, | |
lambda_=1.0) | |
expected_discounted_r = ( | |
np.array([0.75 * 0.75 ** n for n in range(4, 0, -1)]) | |
+ np.array([1.0 * 0.75 ** n for n in range(3, -1, -1)])) | |
expected_adv = expected_discounted_r - values[:-1] | |
self.assertTrue(np.array_equal(empirical_values, expected_discounted_r)) | |
self.assertTrue(np.allclose(generalized_advantage, expected_adv)) | |
# lambda=0.5, With bootstrapping. | |
values = [0.1, 0.5, 0.5, 0.25, 0.75] | |
rewards = [0.0, 0.0, 0.0, 1.0] | |
l = 0.5 # lambda | |
g = 0.75 # gamma | |
(empirical_values, | |
generalized_advantage) = rollout_lib.discounted_advantage_and_rewards( | |
rewards, | |
values, | |
gamma=g, | |
lambda_=l) | |
expected_discounted_r = ( | |
np.array([0.75 * g ** n for n in range(4, 0, -1)]) | |
+ np.array([1.0 * g ** n for n in range(3, -1, -1)])) | |
expected_adv = [0.0] * len(values) | |
for t in range(3, -1, -1): | |
delta_t = rewards[t] + g * values[t + 1] - values[t] | |
expected_adv[t] = delta_t + g * l * expected_adv[t + 1] | |
expected_adv = expected_adv[:-1] | |
self.assertTrue(np.array_equal(empirical_values, expected_discounted_r)) | |
self.assertTrue(np.allclose(generalized_advantage, expected_adv)) | |
def testProcessRollouts(self): | |
g = 0.95 | |
rollouts = [ | |
self.MakeRollout( | |
states=[3, 6, 9], | |
actions=[1, 2, 3], | |
rewards=[1.0, -1.0, 0.5], | |
values=[0.5, 0.5, 0.1]), | |
self.MakeRollout( | |
states=[10], | |
actions=[5], | |
rewards=[1.0], | |
values=[0.5])] | |
batch = rollout_lib.process_rollouts(rollouts, gamma=g) | |
self.assertEqual(2, batch.batch_size) | |
self.assertEqual(3, batch.max_time) | |
self.assertEqual([3, 1], batch.episode_lengths) | |
self.assertEqual([0.5, 1.0], batch.total_rewards) | |
self.assertEqual( | |
[[3, 6, 9], [10, 0, 0]], | |
batch.states.tolist()) | |
self.assertEqual( | |
[[1, 2, 3], [5, 0, 0]], | |
batch.actions.tolist()) | |
rew1, rew2 = rollouts[0].rewards, rollouts[1].rewards | |
expected_discounted_rewards = [ | |
[rew1[0] + g * rew1[1] + g * g * rew1[2], | |
rew1[1] + g * rew1[2], | |
rew1[2]], | |
[rew2[0], 0.0, 0.0]] | |
expected_advantages = [ | |
[dr - v | |
for dr, v | |
in zip(expected_discounted_rewards[0], rollouts[0].values)], | |
[expected_discounted_rewards[1][0] - rollouts[1].values[0], 0.0, 0.0]] | |
self.assertTrue( | |
np.allclose(expected_discounted_rewards, batch.discounted_r)) | |
self.assertTrue( | |
np.allclose(expected_advantages, batch.discounted_adv)) | |
if __name__ == '__main__': | |
tf.test.main() | |