grg's picture
Cleaned old git history
be5548b
raw
history blame contribute delete
No virus
32.3 kB
import time
import numpy as np
from gym_minigrid.social_ai_envs.socialaigrammar import SocialAIGrammar, SocialAIActions, SocialAIActionSpace
from gym_minigrid.minigrid import *
from gym_minigrid.register import register
import time
from collections import deque
class Partner(NPC):
"""
A simple NPC that knows who is telling the truth
"""
def __init__(self, color, name, env):
super().__init__(color)
self.name = name
self.env = env
self.npc_dir = 1 # NPC initially looks downward
# todo: this should be id == name
self.npc_type = 0 # this will be put into the encoding
self.npc_side = "L" if self.env.agent_side == "R" else "R"
assert {self.npc_side, self.env.agent_side} == {"L", "R"}
self.target_obj = None
self.was_introduced_to = False
self.ate_an_apple = False
self.demo_over = False
self.demo_over_and_position_safe = False
self.apple_unlocked_for_agent = False
self.list_of_possible_utterances = [
*self.list_of_possible_utterances,
"Hot", # change to hot -> all with small letters
"Warm",
"Medium",
"Cold",
*COLOR_NAMES
]
assert self.env.grammar.contains_utterance(self.introduction_statement)
def step(self, utterance):
reply, info = super().step()
if self.env.hidden_npc:
return reply, info
if self.npc_side == "L":
# the npc waits for the agent to open one of the right boxes, and then uses the object of the same color
action = None
if self.env.chosen_left_obj is not None:
self.target_obj = self.env.chosen_left_obj
if type(self.target_obj) == Switch and self.target_obj.is_on:
next_target_position = self.env.box.cur_pos
elif type(self.target_obj) == AppleGenerator and self.target_obj.is_pressed:
next_target_position = self.env.left_generator_platform.cur_pos
else:
next_target_position = self.target_obj.cur_pos
if type(self.target_obj) == AppleGenerator and not self.target_obj.is_pressed:
# we have to activate the generator
if not self.env.generator.marble_activation:
# push generator
action = self.path_to_pos(next_target_position)
else:
# find angle
if self.env.marble.moving_dir is None:
distance = (self.env.marble.cur_pos - self.target_obj.cur_pos)
diff = np.sign(distance)
if sum(abs(diff)) == 1:
push_pos = self.env.marble.cur_pos + diff
if all(self.cur_pos == push_pos):
next_target_position = self.env.marble.cur_pos
else:
next_target_position = push_pos
# go to loc in front of
# push
action = self.path_to_pos(next_target_position)
else:
action = None
else:
# toggle all other objects
action = self.path_to_toggle_pos(next_target_position)
else:
action = self.turn_to_see_agent()
else:
if self.ate_an_apple:
action = self.turn_to_see_agent()
else:
# toggle the chosen box then the apple
if self.target_obj is None:
self.target_obj = self.env._rand_elem([
self.env.right_box1,
self.env.right_box2
])
action = self.path_to_toggle_pos(self.target_obj.cur_pos)
if self.npc_side == "R":
eaten_before = self.env.right_apple.eaten
else:
eaten_before = self.env.left_apple.eaten
if action is not None:
action()
if not self.ate_an_apple:
# check if the NPC ate the apple
if self.npc_side == "R":
self.ate_an_apple = not eaten_before and self.env.right_apple.eaten
else:
self.ate_an_apple = not eaten_before and self.env.left_apple.eaten
info = {
"prim_action": action.__name__ if action is not None else "no_op",
"utterance": "no_op",
"was_introduced_to": self.was_introduced_to
}
reply = None
return reply, info
def is_point_from_loc(self, pos):
target_pos = self.target_obj.cur_pos
if self.distractor_obj is not None:
distractor_pos = self.distractor_obj.cur_pos
else:
distractor_pos = [None, None]
if self.env.is_in_marble_way(pos):
return False
if any(pos == target_pos):
same_ind = np.argmax(target_pos == pos)
if pos[same_ind] != distractor_pos[same_ind]:
return True
if pos[same_ind] == distractor_pos[same_ind]:
# if in between
if distractor_pos[1-same_ind] < pos[1-same_ind] < target_pos[1-same_ind]:
return True
if distractor_pos[1-same_ind] > pos[1-same_ind] > target_pos[1-same_ind]:
return True
return False
def find_point_from_loc(self):
reject_fn = lambda env, p: not self.is_point_from_loc(p)
point = self.env.find_loc(size=(self.env.wall_x, self.env.wall_y), reject_fn=reject_fn, reject_agent_pos=False)
assert all(point < np.array([self.env.wall_x, self.env.wall_y]))
assert all(point > np.array([0, 0]))
return point
class ObjectsCollaborationEnv(MultiModalMiniGridEnv):
"""
Environment in which the agent is instructed to go to a given object
named using an English text string
"""
def __init__(
self,
size=10,
diminished_reward=True,
step_penalty=False,
knowledgeable=False,
max_steps=80,
hidden_npc=False,
switch_no_light=True,
reward_diminish_factor=0.1,
see_through_walls=False,
egocentric_observation=True,
):
assert size >= 5
self.empty_symbol = "NA \n"
self.diminished_reward = diminished_reward
self.step_penalty = step_penalty
self.knowledgeable = knowledgeable
self.hidden_npc = hidden_npc
self.hear_yourself = False
self.switch_no_light = switch_no_light
self.grammar = SocialAIGrammar()
self.init_done = False
# parameters - to be set in reset
self.parameters = None
# encoding size should be 5
self.add_npc_direction = True
self.add_npc_point_direction = True
self.add_npc_last_prim_action = True
self.reward_diminish_factor = reward_diminish_factor
self.egocentric_observation = egocentric_observation
self.encoding_size = 3 + 2*bool(not self.egocentric_observation) + bool(self.add_npc_direction) + bool(self.add_npc_point_direction) + bool(self.add_npc_last_prim_action)
super().__init__(
grid_size=size,
max_steps=max_steps,
# Set this to True for maximum speed
see_through_walls=see_through_walls,
actions=SocialAIActions, # primitive actions
action_space=SocialAIActionSpace,
add_npc_direction=self.add_npc_direction,
add_npc_point_direction=self.add_npc_point_direction,
add_npc_last_prim_action=self.add_npc_last_prim_action,
reward_diminish_factor=self.reward_diminish_factor,
)
self.all_npc_utterance_actions = Partner.get_list_of_possible_utterances()
self.prim_actions_dict = SocialAINPCActionsDict
def revert(self):
self.put_objects_in_env(remove_objects=True)
def is_in_marble_way(self, pos):
target_pos = self.generator_current_pos
# generator distractor is in the same row / collumn as the marble and the generator
# if self.distractor_current_pos is not None:
# distractor_pos = self.distractor_current_pos
# else:
# distractor_pos = [None, None]
if self.problem in ["Marble"]:
# point can't be in the same row or column as both the marble and the generator
# all three: marble, generator, loc are in the same row or column
if any((pos == target_pos) * (pos == self.marble_current_pos)):
# all three: marble, generator, loc are in the same row or column -> is in its way
return True
# is it in the way for the distractor generator
if any((pos == self.distractor_current_pos) * (pos == self.marble_current_pos)):
# all three: marble, distractor generator, loc are in the same row or column -> is in its way
return True
# all good
return False
def _gen_grid(self, width_, height_):
# Create the grid
self.grid = Grid(width_, height_, nb_obj_dims=self.encoding_size)
# new
min_w = min(9, width_)
min_h = min(9, height_)
self.current_width = self._rand_int(min_w, width_+1)
self.current_height = self._rand_int(min_h, height_+1)
self.wall_x = self.current_width-1
self.wall_y = self.current_height-1
# Generate the surrounding walls
self.grid.wall_rect(0, 0, self.current_width, self.current_height)
# problem: Apples/Boxes/Switches/Generators/Marbles
self.problem = self.parameters["Problem"] if self.parameters else "Apples"
num_of_colors = self.parameters.get("Num_of_colors", None) if self.parameters else None
self.version = self.parameters["Version"] if self.parameters else "Asocial"
self.role = self.parameters["Role"] if self.parameters else "A"
assert self.role in ["A", "B", "Meta"]
if self.role in ["B", "Meta"]:
self.agent_side = "R" # starts on the right side
else:
self.agent_side = "L" # starts on the right side
self.add_obstacles()
# apple
# box
locked = self.problem == "Switches"
if num_of_colors is None:
POSSIBLE_COLORS = COLOR_NAMES.copy()
else:
POSSIBLE_COLORS = COLOR_NAMES[:int(num_of_colors)].copy()
self.left_half_size = (self.current_width//2, self.current_height)
self.left_half_top = (0, 0)
self.right_half_size = (self.current_width//2 - 1, self.current_height)
self.right_half_top = (self.current_width - self.current_width // 2 + 1, 0)
# add fence to grid
self.grid.vert_wall(
x=self.current_width//2 + 1, # one collumn to the right of the center
y=1,
length=self.current_height - 2,
obj_type=Fence
)
self.right_box1_color = self._rand_elem(POSSIBLE_COLORS)
POSSIBLE_COLORS.remove(self.right_box1_color)
self.right_box2_color = self._rand_elem(POSSIBLE_COLORS)
assert self.right_box1_color != self.right_box2_color
POSSIBLE_COLORS_LEFT = [self.right_box1_color, self.right_box2_color]
self.left_color_1 = self._rand_elem(POSSIBLE_COLORS_LEFT)
POSSIBLE_COLORS_LEFT.remove(self.left_color_1)
self.left_color_2 = self._rand_elem(POSSIBLE_COLORS_LEFT)
self.box_color = self.left_color_1
# find the position for the apple/box/generator_platform
self.left_apple_current_pos = self.find_loc(
size=self.left_half_size,
top=self.left_half_top,
reject_agent_pos=True
)
# right boxes
self.right_box1_current_pos = self.find_loc(
size=self.right_half_size,
top=self.right_half_top,
reject_agent_pos=True
)
self.right_box2_current_pos = self.find_loc(
size=self.right_half_size,
top=self.right_half_top,
reject_agent_pos=True,
reject_fn=lambda _, pos: tuple(pos) in map(tuple, [self.right_box1_current_pos]),
)
assert all(self.left_apple_current_pos < np.array([self.current_width - 1, self.current_height - 1]))
# switch
# self.switch_pos = (self.current_width, self.current_height)
self.switch_color = self.left_color_1
self.switch_current_pos = self.find_loc(
top=self.left_half_top,
size=self.left_half_size,
reject_agent_pos=True,
reject_fn=lambda _, pos: tuple(pos) in map(tuple, [self.left_apple_current_pos]),
)
# generator
# self.generator_pos = (self.current_width, self.current_height)
self.generator_color = self.left_color_1
self.generator_current_pos = self.find_loc(
top=self.left_half_top,
size=self.left_half_size,
reject_agent_pos=True,
reject_fn=lambda _, pos: (
tuple(pos) in map(tuple, [self.left_apple_current_pos])
or
(self.problem in ["Marbles", "Marble"] and tuple(pos) in [
# not in corners
(1, 1),
(self.current_width-2, 1),
(1, self.current_height-2),
(self.current_width-2, self.current_height-2),
])
or
# not in the same row collumn as the platform
(self.problem in ["Marbles", "Marble"] and any(pos == self.left_apple_current_pos))
),
)
# generator platform
self.left_generator_platform_color = self._rand_elem(POSSIBLE_COLORS)
# marbles
# self.marble_pos = (self.current_width, self.current_height)
self.marble_color = self._rand_elem(POSSIBLE_COLORS)
self.marble_current_pos = self.find_loc(
top=self.left_half_top,
size=self.left_half_size,
reject_agent_pos=True,
reject_fn=lambda _, pos: self.problem in ["Marbles", "Marble"] and (
tuple(pos) in map(tuple, [self.left_apple_current_pos, self.generator_current_pos])
or
all(pos != self.generator_current_pos) # reject if not in row or column as the generator
or
any(pos == 1) # next to a wall
or
pos[1] == self.current_height-2
or
pos[0] == self.current_width-2
),
)
self.distractor_color = self.left_color_2
# self.distractor_pos = (self.current_width, self.current_height)
if self.problem in ["Apples", "Boxes"]:
distractor_reject_fn = lambda _, pos: tuple(pos) in map(tuple, [self.left_apple_current_pos])
elif self.problem in ["Switches"]:
distractor_reject_fn = lambda _, pos: tuple(pos) in map(tuple, [self.left_apple_current_pos, self.switch_current_pos])
elif self.problem in ["Generators"]:
distractor_reject_fn = lambda _, pos: tuple(pos) in map(tuple, [self.left_apple_current_pos, self.generator_current_pos])
elif self.problem in ["Marbles", "Marble"]:
# problem is marbles
same_dim = (self.generator_current_pos == self.marble_current_pos).argmax()
distactor_same_dim = 1-same_dim
distractor_reject_fn = lambda _, pos: tuple(pos) in map(tuple, [
self.left_apple_current_pos,
self.generator_current_pos,
self.marble_current_pos
]) or pos[distactor_same_dim] != self.marble_current_pos[distactor_same_dim]
# todo: not in corners -> but it's not that important
# or tuple(pos) in [
# # not in corners
# (1, 1),
# (self.current_width-2, 1),
# (1, self.current_height-2),
# (self.current_width-2, self.current_height-2),
# ])
else:
raise ValueError("Problem {} indefined.".format(self.problem))
self.distractor_current_pos = self.find_loc(
top=self.left_half_top,
size=self.left_half_size,
reject_agent_pos=True,
# todo: reject based on problem
reject_fn=distractor_reject_fn
)
self.put_objects_in_env()
# place agent
if self.agent_side == "L":
self.place_agent(size=self.left_half_size, top=self.left_half_top)
else:
self.place_agent(size=self.right_half_size, top=self.right_half_top)
# NPC
if self.version == "Social":
self.npc_color = self._rand_elem(COLOR_NAMES)
self.caretaker = Partner(self.npc_color, "Partner", self)
if self.agent_side == "L":
self.place_obj(self.caretaker, size=self.right_half_size, top=self.right_half_top, reject_fn=ObjectsCollaborationEnv.is_in_marble_way)
else:
self.place_obj(self.caretaker, size=self.left_half_size, top=self.left_half_top, reject_fn=ObjectsCollaborationEnv.is_in_marble_way)
# Generate the mission string
self.mission = 'lets collaborate'
# Dummy beginning string
# self.beginning_string = "This is what you hear. \n"
self.beginning_string = "Conversation: \n" # todo: go back to "this what you hear?
self.utterance = self.beginning_string
# utterance appended at the end of each step
self.utterance_history = ""
# used for rendering
self.full_conversation = self.utterance
self.outcome_info = None
def put_objects_in_env(self, remove_objects=False):
assert self.left_apple_current_pos is not None
assert self.right_box1_current_pos is not None
assert self.right_box2_current_pos is not None
assert self.switch_current_pos is not None
self.switches_block_set = []
self.boxes_block_set = []
self.right_boxes_block_set = []
self.generators_block_set = []
self.other_box = None
self.other_switch = None
self.other_generator = None
# problem: Apples/Boxes/Switches/Generators
assert self.problem == self.parameters["Problem"] if self.parameters else "Apples"
# move objects (used only in revert), not in gen_grid
if remove_objects:
# remove apple or box
# assert type(self.grid.get(*self.apple_current_pos)) in [Apple, LockableBox]
# self.grid.set(*self.apple_current_pos, None)
# remove apple (after demo it must be an apple)
assert type(self.grid.get(*self.left_apple_current_pos)) in [Apple]
self.grid.set(*self.left_apple_current_pos, None)
self.grid.set(*self.right_apple_current_pos, None)
if self.problem in ["Switches"]:
# remove switch
assert type(self.grid.get(*self.switch_current_pos)) in [Switch]
self.grid.set(*self.switch.cur_pos, None)
elif self.problem in ["Generators", "Marbles", "Marble"]:
# remove generator
assert type(self.grid.get(*self.generator.cur_pos)) in [AppleGenerator]
self.grid.set(*self.generator.cur_pos, None)
if self.problem in ["Marbles", "Marble"]:
# remove generator
assert type(self.grid.get(*self.marble.cur_pos)) in [Marble]
self.grid.set(*self.marble.cur_pos, None)
if self.marble.tee_uncovered:
self.grid.set(*self.marble.tee.cur_pos, None)
elif self.problem in ["Apples", "Boxes"]:
pass
else:
raise ValueError("Undefined problem {}".format(self.problem))
# remove distractor
if self.problem in ["Boxes", "Switches", "Generators", "Marbles", "Marble"]:
assert type(self.grid.get(*self.distractor_current_pos)) in [LockableBox, Switch, AppleGenerator]
self.grid.set(*self.distractor_current_pos, None)
# apple
self.left_apple = Apple()
self.right_apple = Apple()
# right apple
self.right_box1 = LockableBox(
self.right_box1_color,
contains=self.right_apple,
is_locked=False,
block_set=self.right_boxes_block_set
)
self.right_boxes_block_set.append(self.right_box1)
# right apple
self.right_box2 = LockableBox(
self.right_box2_color,
contains=self.right_apple,
is_locked=False,
block_set=self.right_boxes_block_set
)
self.right_boxes_block_set.append(self.right_box2)
# Box
locked = self.problem == "Switches"
self.box = LockableBox(
self.box_color,
# contains=self.left_apple,
is_locked=locked,
block_set=self.boxes_block_set
)
self.boxes_block_set.append(self.box)
# Switch
self.switch = Switch(
color=self.switch_color,
# lockable_object=self.box,
locker_switch=True,
no_turn_off=True,
no_light=self.switch_no_light,
block_set=self.switches_block_set,
)
self.switches_block_set.append(self.switch)
# Generator
self.generator = AppleGenerator(
self.generator_color,
block_set=self.generators_block_set,
# on_push=lambda: self.grid.set(*self.left_apple_current_pos, self.left_apple),
marble_activation=self.problem in ["Marble"],
)
self.generators_block_set.append(self.generator)
self.left_generator_platform = GeneratorPlatform(self.left_generator_platform_color)
self.marble = Marble(self.marble_color, env=self)
# right side
self.put_obj_np(self.right_box1, self.right_box1_current_pos)
self.put_obj_np(self.right_box2, self.right_box2_current_pos)
self.candidate_objects=[]
# left side
if self.problem == "Apples":
self.put_obj_np(self.left_apple, self.left_apple_current_pos)
self.candidate_objects.append(self.left_apple)
elif self.problem in ["Boxes"]:
self.put_obj_np(self.box, self.left_apple_current_pos)
self.candidate_objects.append(self.box)
elif self.problem in ["Switches"]:
self.put_obj_np(self.box, self.left_apple_current_pos)
self.put_obj_np(self.switch, self.switch_current_pos)
self.candidate_objects.append(self.switch)
elif self.problem in ["Generators", "Marble"]:
self.put_obj_np(self.generator, self.generator_current_pos)
self.put_obj_np(self.left_generator_platform, self.left_apple_current_pos)
self.candidate_objects.append(self.generator)
if self.problem in ["Marble"]:
self.put_obj_np(self.marble, self.marble_current_pos)
else:
raise ValueError("Problem {} not defined. ".format(self.problem))
# Distractors
if self.problem == "Boxes":
assert not locked
self.other_box = LockableBox(
self.left_color_2,
is_locked=locked,
block_set=self.boxes_block_set,
)
self.boxes_block_set.append(self.other_box)
self.put_obj_np(self.other_box, self.distractor_current_pos)
self.candidate_objects.append(self.other_box)
elif self.problem == "Switches":
self.other_switch = Switch(
color=self.left_color_2,
locker_switch=True,
no_turn_off=True,
no_light=self.switch_no_light,
block_set=self.switches_block_set,
)
self.switches_block_set.append(self.other_switch)
self.put_obj_np(self.other_switch, self.distractor_current_pos)
self.candidate_objects.append(self.other_switch)
elif self.problem in ["Generators", "Marble"]:
self.other_generator = AppleGenerator(
color=self.left_color_2,
block_set=self.generators_block_set,
marble_activation=self.problem in ["Marble"],
)
self.generators_block_set.append(self.other_generator)
self.put_obj_np(self.other_generator, self.distractor_current_pos)
self.candidate_objects.append(self.other_generator)
def reset(
self, *args, **kwargs
):
# This env must be used inside the parametric env
if not kwargs:
# The only place when kwargs can empty is during the class construction
# reset should be called again before using the env (paramenv does it in its constructor)
assert self.parameters is None
assert not self.init_done
self.init_done = True
obs = super().reset()
return obs
else:
assert self.init_done
self.parameters = dict(kwargs)
assert self.parameters is not None
assert len(self.parameters) > 0
obs = super().reset()
self.agent_ate_an_apple = False
self.chosen_right_box = None
self.chosen_left_obj = None
return obs
def step(self, action):
success = False
p_action = action[0]
utterance_action = action[1:]
left_apple_had_been_eaten = self.left_apple.eaten
right_apple_had_been_eaten = self.right_apple.eaten
# primitive actions
_, reward, done, info = super().step(p_action)
if self.problem in ["Marbles", "Marble"]:
# todo: create objects which can stepped automatically?
self.marble.step()
if not self.agent_ate_an_apple:
if self.agent_side == "L":
self.agent_ate_an_apple = self.left_apple.eaten and not left_apple_had_been_eaten
else:
self.agent_ate_an_apple = self.right_apple.eaten and not right_apple_had_been_eaten
if self.right_box1.is_open:
self.chosen_right_box = self.right_box1
if self.right_box2.is_open:
self.chosen_right_box = self.right_box2
if self.chosen_right_box is not None:
chosen_color = self.chosen_right_box.color
self.chosen_left_obj = [o for o in self.candidate_objects if o.color == chosen_color][0]
if type(self.chosen_left_obj) == LockableBox:
self.chosen_left_obj.contains = self.left_apple
elif type(self.chosen_left_obj) == Switch:
self.chosen_left_obj.lockable_object = self.box
self.box.contains = self.left_apple
elif type(self.chosen_left_obj) == AppleGenerator:
self.chosen_left_obj.on_push=lambda: self.grid.set(*self.left_apple_current_pos, self.left_apple)
else:
raise ValueError("Unknown target object.")
# utterances
agent_spoke = not all(np.isnan(utterance_action))
if agent_spoke:
utterance = self.grammar.construct_utterance(utterance_action)
if self.hear_yourself:
self.utterance += "YOU: {} \n".format(utterance)
self.full_conversation += "YOU: {} \n".format(utterance)
else:
utterance = None
if self.version == "Social":
reply, npc_info = self.caretaker.step(utterance)
if reply:
self.utterance += "{}: {} \n".format(self.caretaker.name, reply)
self.full_conversation += "{}: {} \n".format(self.caretaker.name, reply)
else:
npc_info = {
"prim_action": "no_op",
"utterance": "no_op",
"was_introduced_to": False,
}
# aftermath
if p_action == self.actions.done:
done = True
if (self.role in ["A", "B"] or self.version == "Asocial") and self.agent_ate_an_apple:
reward = self._reward()
success = True
done = True
elif self.role == "Meta" and self.version == "Social" and self.agent_ate_an_apple and self.caretaker.ate_an_apple:
if self.agent_side == "L":
reward = self._reward() / 2
success = True
done = True
else:
# revert and rotate
reward = self._reward() / 2
self.agent_ate_an_apple = False
self.caretaker.ate_an_apple = False
self.agent_side = "L"
self.put_objects_in_env(remove_objects=True)
# teleport the agent and the NPC
self.place_agent(size=self.left_half_size, top=self.left_half_top)
self.grid.set(*self.caretaker.cur_pos, None)
self.caretaker = Partner(self.npc_color, "Partner", self)
self.place_obj(self.caretaker, size=self.right_half_size, top=self.right_half_top, reject_fn=ObjectsCollaborationEnv.is_in_marble_way)
# discount
if self.step_penalty:
reward = reward - 0.01
# update obs with NPC movement
obs = self.gen_obs(full_obs=self.full_obs)
# fill observation with text
self.append_existing_utterance_to_history()
obs = self.add_utterance_to_observation(obs)
self.reset_utterance()
if done:
if reward > 0:
self.outcome_info = "SUCCESS: agent got {} reward \n".format(np.round(reward, 1))
else:
self.outcome_info = "FAILURE: agent got {} reward \n".format(reward)
if self.version == "Social":
# is the npc seen by the agent
ag_view_npc = self.relative_coords(*self.caretaker.cur_pos)
if ag_view_npc is not None:
# in the agent's field of view
ag_view_npc_x, ag_view_npc_y = ag_view_npc
n_dims = obs['image'].shape[-1]
npc_encoding = self.caretaker.encode(n_dims)
# is it occluded
npc_observed = all(obs['image'][ag_view_npc_x, ag_view_npc_y] == npc_encoding)
else:
npc_observed = False
else:
npc_observed = False
info = {**info, **{"NPC_"+k: v for k, v in npc_info.items()}}
info["NPC_observed"] = npc_observed
info["success"] = success
return obs, reward, done, info
def _reward(self):
if self.diminished_reward:
return super()._reward()
else:
return 1.0
# def render(self, *args, **kwargs):
# obs = super().render(*args, **kwargs)
# self.window.clear_text() # erase previous text
# self.window.set_caption(self.full_conversation)
#
# # self.window.ax.set_title("correct color: {}".format(self.box.target_color), loc="left", fontsize=10)
#
# if self.outcome_info:
# color = None
# if "SUCCESS" in self.outcome_info:
# color = "lime"
# elif "FAILURE" in self.outcome_info:
# color = "red"
# self.window.add_text(*(0.01, 0.85, self.outcome_info),
# **{'fontsize': 15, 'color': color, 'weight': "bold"})
#
# self.window.show_img(obs) # re-draw image to add changes to window
# return obs
register(
id='SocialAI-ObjectsCollaboration-v0',
entry_point='gym_minigrid.social_ai_envs:ObjectsCollaborationEnv'
)