gomoku / DI-engine /dizoo /smac /envs /smac_reward.py
zjowowen's picture
init space
079c32c
raw
history blame
9.1 kB
from collections import namedtuple
import numpy as np
ORIGINAL_AGENT = "me"
OPPONENT_AGENT = "opponent"
class SMACReward:
info_template = namedtuple('EnvElementInfo', ['shape', 'value', 'to_agent_processor', 'from_agent_processor'])
def __init__(
self,
n_agents,
n_enemies,
two_player,
reward_type,
max_reward,
reward_scale=True,
reduce_agent=True,
reward_only_positive=True
):
self.reward_only_positive = reward_only_positive
self.reward_scale = reward_scale
self.max_reward = max_reward
self.reward_death_value = 10
self.reward_win = 200
self.reward_defeat = 0
self.reward_negative_scale = 0.5
self.reward_scale_rate = 20
self.reduce_agent = reduce_agent
self.reward_type = reward_type
assert self.reward_type in ['sparse', 'original', 'new']
self.n_agents = n_agents
self.n_enemies = n_enemies
self.death_tracker_ally = np.zeros(n_agents)
self.death_tracker_enemy = np.zeros(n_enemies)
self.two_player = two_player
def reset(self, max_reward):
self.max_reward = max_reward
if self.reward_type == 'original':
self.info().value['max'] = self.max_reward / self.reward_scale_rate
self.death_tracker_ally.fill(0)
self.death_tracker_enemy.fill(0)
def get_reward(self, engine, action, game_end_code, win_counted, defeat_counted):
reward = {
ORIGINAL_AGENT: np.asarray(self.reward_battle_split(engine, action, is_opponent=False)),
OPPONENT_AGENT: np.asarray(self.reward_battle_split(engine, action, is_opponent=True))
}
for k in reward:
if reward[k].shape == ():
reward[k] = np.expand_dims(reward[k], 0)
if game_end_code is not None:
# Battle is over
if game_end_code == 1 and not win_counted:
if self.reward_type != "sparse":
reward[ORIGINAL_AGENT] += self.reward_win
reward[OPPONENT_AGENT] += self.reward_defeat
else:
reward[ORIGINAL_AGENT] += 1
reward[OPPONENT_AGENT] += -1
elif game_end_code == -1 and not defeat_counted:
if self.reward_type != "sparse":
reward[ORIGINAL_AGENT] += self.reward_defeat
reward[OPPONENT_AGENT] += self.reward_win
else:
reward[ORIGINAL_AGENT] += -1
reward[OPPONENT_AGENT] += 1
# Note: if draw happen, the game_end_code may still be None.
if self.reward_scale:
# rescale to 0~1
min_val, max_val = self.info().value['min'], self.info().value['max']
reward[ORIGINAL_AGENT] = (reward[ORIGINAL_AGENT] - min_val) / (max_val - min_val)
reward[OPPONENT_AGENT] = (reward[OPPONENT_AGENT] - min_val) / (max_val - min_val)
return reward
def reward_battle_split(self, engine, action, is_opponent=False):
"""Reward function when self.reward_type != 'sparse'.
Returns accumulative hit/shield point damage dealt to the enemy
+ reward_death_value per enemy unit killed, and, in case
self.reward_only_positive == False, - (damage dealt to ally units
+ reward_death_value per ally unit killed) * self.reward_negative_scale
"""
num_agents = engine.n_agents if not is_opponent else engine.n_enemies
num_enmies = engine.n_agents if is_opponent else engine.n_enemies
if self.reward_type == 'sparse':
if self.reduce_agent:
return 0.
else:
return np.zeros(num_agents)
# if self.reward_type != 'original':
assert self.reward_type == 'original', 'reward_type={} is not supported!'.format(self.reward_type)
delta_deaths = np.zeros([num_agents])
reward = np.zeros([num_agents])
delta_ally = np.zeros([num_agents])
delta_enemy = np.zeros([num_enmies])
delta_death_enemy = np.zeros([num_enmies])
neg_scale = self.reward_negative_scale
# update deaths
if is_opponent:
iterator = engine.enemies.items()
previous_units = engine.previous_enemy_units
death_tracker = self.death_tracker_enemy
else:
iterator = engine.agents.items()
previous_units = engine.previous_ally_units
death_tracker = self.death_tracker_ally
num_players = 2 if self.two_player else 1
for al_id, al_unit in iterator:
if death_tracker[al_id] < num_players:
# did not die so far
prev_health = (previous_units[al_id].health + previous_units[al_id].shield)
if al_unit.health == 0:
# just died
death_tracker[al_id] += 1
delta_deaths[al_id] -= self.reward_death_value * neg_scale
delta_ally[al_id] += prev_health * neg_scale
else:
# still alive
delta_ally[al_id] += neg_scale * (prev_health - al_unit.health - al_unit.shield)
# Calculate the damage to opponent.
if is_opponent:
iterator = engine.agents.items()
previous_units = engine.previous_ally_units
death_tracker = self.death_tracker_ally
else:
iterator = engine.enemies.items()
previous_units = engine.previous_enemy_units
death_tracker = self.death_tracker_enemy
for e_id, e_unit in iterator:
if death_tracker[e_id] < num_players:
prev_health = (previous_units[e_id].health + previous_units[e_id].shield)
if e_unit.health == 0:
death_tracker[e_id] += 1
delta_death_enemy[e_id] += self.reward_death_value
delta_enemy[e_id] += prev_health
else:
delta_enemy[e_id] += prev_health - e_unit.health - e_unit.shield
# if e_unit.health == 0:
# death_tracker[e_id] += 1
# delta_death_enemy[e_id] += self.reward_death_value
# normed_delta_health = prev_health / (e_unit.health_max + e_unit.shield_max)
# delta_enemy[e_id] += normed_delta_health * self.reward_death_value
# else:
# normed_delta_health = (prev_health - e_unit.health -
# e_unit.shield) / (e_unit.health_max + e_unit.shield_max)
# delta_enemy[e_id] += normed_delta_health * self.reward_death_value
# if self.reward_type == 'original':
# if self.reduce_agent:
# total_reward = sum(delta_deaths) + sum(delta_death_enemy) + sum(delta_enemy)
# return total_reward
# else:
# total_reward = sum(delta_deaths) + sum(delta_death_enemy) + sum(delta_enemy) / num_agents
# return np.ones(num_agents) * total_reward
# Attacking reward
# if isinstance(action, dict):
# my_action = action["me"] if not is_opponent else action["opponent"]
# else:
# my_action = action
# for my_id, my_action in enumerate(my_action):
# if my_action > 5:
# reward[my_id] += 2
if self.reward_only_positive:
# reward = abs((delta_deaths + delta_death_enemy + delta_enemy).sum())
reward = abs(delta_deaths.sum() + delta_death_enemy.sum() + delta_enemy.sum())
else:
reward = delta_deaths.sum() + delta_death_enemy.sum() + delta_enemy.sum() - delta_ally.sum()
return reward
def info(self):
if self.reward_type == 'sparse':
value = {'min': -1, 'max': 1}
elif self.reward_type == 'original':
value = {'min': 0, 'max': self.max_reward / self.reward_scale_rate}
# value = {'min': 0, 'max': 75.5}
# value = {'min': 0, 'max': self.max_reward / 75.5}
# # TODO(nyz) health + shield range
# if self.reduce_agent:
# value = {'min': 0, 'max': (self.reward_win + self.reward_death_value * self.n_enemies +1230)/20}
# else:
# value = {'min': 0, 'max': self.reward_win + self.reward_death_value * self.n_enemies / self.n_agents}
# elif self.reward_type == 'new':
# if self.reduce_agent:
# value = {'min': 0, 'max': self.reward_win + 2 + self.reward_death_value * self.n_enemies}
# else:
# value = {
# 'min': 0,
# 'max': self.reward_win + 2 + self.reward_death_value * self.n_enemies / self.n_agents
# }
shape = (1, ) if self.reduce_agent else (self.n_agents, )
return SMACReward.info_template(shape, value, None, None)