Spaces:
Sleeping
Sleeping
import time | |
import random | |
import numpy as np | |
from gym_minigrid.social_ai_envs.socialaigrammar import SocialAIGrammar, SocialAIActions, SocialAIActionSpace | |
from gym_minigrid.minigrid import * | |
from gym_minigrid.register import register | |
import time | |
from collections import deque | |
def next_to(posa, posb): | |
if type(posa) == tuple: | |
posa = np.array(posa) | |
if type(posb) == tuple: | |
posb = np.array(posb) | |
return abs(posa-posb).sum() == 1 | |
class Caretaker(NPC): | |
""" | |
A simple NPC that knows who is telling the truth | |
""" | |
def __init__(self, color, name, env): | |
super().__init__(color) | |
self.name = name | |
self.env = env | |
self.npc_dir = 1 # NPC initially looks downward | |
self.npc_type = 0 # this will be put into the encoding | |
self.was_introduced_to = False | |
self.decoy_color_given = False | |
self.ate_an_apple = False | |
self.demo_over = False | |
self.demo_over_and_position_safe = False | |
self.apple_unlocked_for_agent = False | |
self.list_of_possible_utterances = [ | |
*self.list_of_possible_utterances, | |
"Hot", | |
"Warm", | |
"Medium", | |
"Cold", | |
*COLOR_NAMES | |
] | |
# target obj | |
assert self.env.problem == self.env.parameters["Problem"] if self.env.parameters else "Apples" | |
if self.env.problem in ["Apples"]: | |
self.target_obj = self.env.apple | |
self.distractor_obj = None | |
elif self.env.problem == "Doors": | |
self.target_obj = self.env.door | |
self.distractor_obj = self.env.distractor_door | |
elif self.env.problem == "Levers": | |
self.target_obj = self.env.lever | |
self.distractor_obj = self.env.distractor_lever | |
elif self.env.problem == "Boxes": | |
self.target_obj = self.env.box | |
self.distractor_obj = self.env.distractor_box | |
elif self.env.problem == "Switches": | |
self.target_obj = self.env.switch | |
self.distractor_obj = self.env.distractor_switch | |
elif self.env.problem == "Generators": | |
self.target_obj = self.env.generator | |
self.distractor_obj = self.env.distractor_generator | |
elif self.env.problem in ["Marble", "Marbles"]: | |
self.target_obj = self.env.generator | |
self.distractor_obj = self.env.distractor_generator | |
if self.env.ja_recursive: | |
# how many objects | |
if int(self.env.parameters["N"]) == 1: | |
self.ja_decoy = self.env._rand_elem([self.target_obj]) | |
else: | |
self.ja_decoy = self.env._rand_elem([self.target_obj, self.distractor_obj]) | |
# the other object is a decoy distractor | |
self.ja_decoy_distractor = list({self.target_obj, self.distractor_obj} - {self.ja_decoy})[0] | |
self.decoy_point_from_loc = self.find_point_from_loc( | |
target_pos=self.ja_decoy.cur_pos, | |
distractor_pos=self.ja_decoy_distractor.cur_pos if self.ja_decoy_distractor else None | |
) | |
self.point_from_loc = self.find_point_from_loc() | |
assert self.env.grammar.contains_utterance(self.introduction_statement) | |
def step(self, utterance): | |
reply, info = super().step() | |
if self.env.hidden_npc: | |
return reply, info | |
scaffolding = self.env.parameters.get("Scaffolding", "N") == "Y" | |
language_color = False | |
language_feedback = False | |
pointing = False | |
emulation = False | |
if not scaffolding: | |
cue_type = self.env.parameters["Cue_type"] | |
if cue_type == "Language_Color": | |
language_color = True | |
elif cue_type == "Language_Feedback": | |
language_feedback = True | |
elif cue_type == "Pointing": | |
pointing = True | |
elif cue_type == "Emulation": | |
emulation = True | |
else: | |
raise ValueError(f"Cue_type ({cue_type}) not defined.") | |
else: | |
# there are no cues if scaffolding is used (the peer gives the apples to the agent) | |
assert "Cue_type" not in self.env.parameters | |
# there is no additional test for joint attention (no cues are given so this wouldn't make sense) | |
assert not self.env.ja_recursive | |
reply, action = None, None | |
if not self.was_introduced_to: | |
# check introduction, updates was_introduced_to if needed | |
reply, action = self.handle_introduction(utterance) | |
assert action is None | |
if self.env.ja_recursive: | |
# look at the center of the room (this makes the cue giving inside and outisde JA different) | |
action = self.look_at_action([self.env.current_width // 2, self.env.current_height // 2]) | |
else: | |
# look at the agent | |
action = self.look_at_action(self.env.agent_pos) | |
if self.was_introduced_to: | |
# was introduced just now | |
if self.is_pointing(): | |
action = self.stop_point | |
if language_color: | |
# only say the color once | |
reply = self.target_obj.color | |
elif self.env.ja_recursive: | |
# was not introduced | |
if language_feedback: | |
# random reply | |
reply = self.env._rand_elem([ | |
"Hot", | |
"Warm", | |
"Medium", | |
"Cold" | |
]) | |
if language_color and not self.decoy_color_given: | |
# color of a decoy (can be the correct one) | |
reply = self.ja_decoy.color | |
self.decoy_color_given=True | |
if pointing: | |
# point to a decoy | |
action = self.goto_point_action( | |
point_from_loc=self.decoy_point_from_loc, | |
target_pos=self.ja_decoy.cur_pos, | |
distractor_pos=self.ja_decoy_distractor.cur_pos if self.ja_decoy_distractor else None | |
) | |
if self.is_pointing(): | |
# if it's already pointing, turn to look at the center (to avoid looking at the wall) | |
action = self.look_at_action([self.env.current_width//2, self.env.current_height//2]) | |
else: | |
if self.was_introduced_to and language_color: | |
# language only once at introduction | |
# reply = self.target_obj.color | |
action = self.look_at_action(self.env.agent_pos) | |
if self.was_introduced_to and language_feedback: | |
# closeness string | |
agent_distance_to_target = np.abs(self.target_obj.cur_pos - self.env.agent_pos).sum() | |
if agent_distance_to_target <= 1: | |
reply = "Hot" | |
elif agent_distance_to_target <= 2: | |
reply = "Warm" | |
elif agent_distance_to_target <= 5: | |
reply = "Medium" | |
elif agent_distance_to_target >= 5: | |
reply = "Cold" | |
action = self.look_at_action(self.env.agent_pos) | |
# pointing | |
if self.was_introduced_to and pointing: | |
if self.env.parameters["N"] == "1": | |
distractor_pos = None | |
else: | |
distractor_pos = self.distractor_obj.cur_pos | |
action = self.goto_point_action( | |
point_from_loc=self.point_from_loc, | |
target_pos=self.target_obj.cur_pos, | |
distractor_pos=distractor_pos, | |
) | |
if self.is_pointing(): | |
action = self.look_at_action(self.env.agent_pos) | |
# emulation or scaffolding | |
emulation_demo = self.was_introduced_to and emulation and not self.demo_over | |
scaffolding_help = self.was_introduced_to and scaffolding | |
# do the demonstration / unlock the apple | |
# in both of those two scenarios the NPC in essence solves the task | |
# in demonstration - it eats the apple, and reverts the env at the end | |
# in scaffolding - it doesn't eat the apple and looks at the agent | |
if emulation_demo or scaffolding_help: | |
if emulation_demo or (scaffolding_help and not self.apple_unlocked_for_agent): | |
if self.is_pointing(): | |
# don't point during demonstration | |
action = self.stop_point | |
else: | |
# if apple unlocked go pick it up | |
if self.target_obj == self.env.switch and self.env.switch.is_on: | |
assert self.env.parameters["Problem"] == "Switches" | |
next_target_position = self.env.box.cur_pos | |
elif self.target_obj == self.env.generator and self.env.generator.is_pressed: | |
assert self.env.parameters["Problem"] in ["Generators", "Marbles", "Marble"] | |
next_target_position = self.env.generator_platform.cur_pos | |
elif self.target_obj == self.env.door and self.env.door.is_open: | |
next_target_position = self.env.apple.cur_pos | |
elif self.target_obj == self.env.lever and self.env.lever.is_on: | |
next_target_position = self.env.apple.cur_pos | |
else: | |
next_target_position = self.target_obj.cur_pos | |
if self.target_obj == self.env.generator and not self.env.generator.is_pressed: | |
if not self.env.generator.marble_activation: | |
# push generator | |
action = self.path_to_pos(next_target_position) | |
else: | |
# find angle | |
if self.env.marble.moving_dir is None: | |
distance = (self.env.marble.cur_pos - self.env.generator.cur_pos) | |
diff = np.sign(distance) | |
if sum(abs(diff)) == 1: | |
# if the agent pushed the ball during demo diff can be > 1, then it's unsolvable | |
push_pos = self.env.marble.cur_pos+diff | |
if all(self.cur_pos == push_pos): | |
next_target_position = self.env.marble.cur_pos | |
else: | |
next_target_position = push_pos | |
# go to loc in front of | |
# push | |
action = self.path_to_pos(next_target_position) | |
else: | |
# toggle all other objects | |
action = self.path_to_toggle_pos(next_target_position) | |
# for scaffolding check if trying to eat the apple | |
# if so, stop - apple is unlocked | |
if scaffolding_help: | |
if ( | |
self.env.get_cell(*self.front_pos) == self.env.apple and | |
action == self.toggle_action | |
): | |
# don't eat the apple | |
action = None | |
self.apple_unlocked_for_agent = True | |
# for emulation check if trying to toggle the eaten apple | |
# if so, stop and revert the env - demo is over | |
if emulation_demo: | |
if ( | |
self.ate_an_apple and | |
self.env.get_cell(*self.front_pos) == self.env.apple and | |
action == self.toggle_action and | |
self.env.apple.eaten | |
): | |
# trying to toggle an apple it ate | |
self.env.revert() | |
self.demo_over = True | |
action = None | |
# if scaffolding apple unlocked, look at the agent | |
if scaffolding_help and self.apple_unlocked_for_agent: | |
if all(self.cur_pos == self.initial_pos): | |
# if the apple is unlocked look at the agent | |
wanted_dir = self.compute_wanted_dir(self.env.agent_pos) | |
action = self.compute_turn_action(wanted_dir) | |
else: | |
# go to init pos, this removes problems in case the apple is unreachable now | |
action = self.path_to_pos(self.initial_pos) | |
if self.was_introduced_to and emulation and self.demo_over and not self.demo_over_and_position_safe: | |
if self.env.is_in_marble_way(self.cur_pos): | |
action = self.path_to_pos(self.find_point_from_loc()) | |
else: | |
self.demo_over_and_position_safe = True | |
if self.demo_over_and_position_safe: | |
assert emulation or scaffolding | |
# look at the agent after demo is done | |
action = self.look_at_action(self.env.agent_pos) | |
if self.was_introduced_to and self.env.parameters["Scaffolding"] == "Y": | |
if "Emulation" in self.env.parameters or "Pointing" in self.env.parameters or "Language_grounding" in self.env.parameters: | |
raise ValueError( | |
"Scaffolding cannot be used with information giving (Emulation, Pointing, Language_grounding)" | |
) | |
eaten_before = self.env.apple.eaten | |
if action is not None: | |
action() | |
# check if the NPC ate the apple | |
eaten_after = self.env.apple.eaten | |
self.ate_an_apple = not eaten_before and eaten_after | |
info = self.create_info( | |
action=action, | |
utterance=reply, | |
was_introduced_to=self.was_introduced_to, | |
) | |
assert (reply or "no_op") in self.list_of_possible_utterances | |
return reply, info | |
def create_info(self, action, utterance, was_introduced_to): | |
info = { | |
"prim_action": action.__name__ if action is not None else "no_op", | |
"utterance": utterance or "no_op", | |
"was_introduced_to": was_introduced_to | |
} | |
return info | |
def is_point_from_loc(self, pos, target_pos=None, distractor_pos=None): | |
if target_pos is None: | |
target_pos = self.target_obj.cur_pos | |
if distractor_pos is None: | |
if self.distractor_obj is not None: | |
distractor_pos = self.distractor_obj.cur_pos | |
else: | |
distractor_pos = [None, None] | |
if self.env.is_in_marble_way(pos): | |
return False | |
if self.env.problem in ["Doors", "Levers"]: | |
# must not be in front of a door | |
if abs(self.env.door_current_pos - pos).sum() == 1: | |
return False | |
if self.env.problem in ["Doors"]: | |
if abs(self.env.distractor_current_pos - pos).sum() == 1: | |
return False | |
if any(pos == target_pos): | |
same_ind = np.argmax(target_pos == pos) | |
# is there an occlusion in the way | |
start = pos[1-same_ind] | |
end = target_pos[1-same_ind] | |
step = 1 if start <= end else -1 | |
for i in np.arange(start, end, step): | |
p = pos.copy() | |
p[1-same_ind] = i | |
cell = self.env.grid.get(*p) | |
if cell is not None: | |
if not cell.see_behind(): | |
return False | |
if pos[same_ind] != distractor_pos[same_ind]: | |
return True | |
if pos[same_ind] == distractor_pos[same_ind]: | |
# if in between | |
if distractor_pos[1-same_ind] < pos[1-same_ind] < target_pos[1-same_ind]: | |
return True | |
if distractor_pos[1-same_ind] > pos[1-same_ind] > target_pos[1-same_ind]: | |
return True | |
return False | |
def find_point_from_loc(self, target_pos=None, distractor_pos=None): | |
reject_fn = lambda env, p: not self.is_point_from_loc(p, target_pos=target_pos, distractor_pos=distractor_pos) | |
point = self.env.find_loc(size=(self.env.wall_x, self.env.wall_y), reject_fn=reject_fn, reject_agent_pos=False) | |
# assert all(point < np.array([self.env.wall_x, self.env.wall_y])) | |
# assert all(point > np.array([0, 0])) | |
return point | |
def goto_point_action(self, point_from_loc, target_pos, distractor_pos): | |
if self.is_point_from_loc(self.cur_pos, target_pos=target_pos, distractor_pos=distractor_pos): | |
# point to a direction | |
action = self.compute_wanted_point_action(target_pos) | |
else: | |
# do not point if not is_point_from_loc | |
if self.is_pointing(): | |
# stop pointing | |
action = self.stop_point | |
else: | |
# move | |
action = self.path_to_pos(point_from_loc) | |
return action | |
class InformationSeekingEnv(MultiModalMiniGridEnv): | |
""" | |
Environment in which the agent is instructed to go to a given object | |
named using an English text string | |
""" | |
def __init__( | |
self, | |
size=10, | |
diminished_reward=True, | |
step_penalty=False, | |
knowledgeable=False, | |
max_steps=80, | |
hidden_npc=False, | |
switch_no_light=True, | |
reward_diminish_factor=0.1, | |
see_through_walls=False, | |
n_colors=None, | |
egocentric_observation=True, | |
): | |
assert size >= 5 | |
self.empty_symbol = "NA \n" | |
self.diminished_reward = diminished_reward | |
self.step_penalty = step_penalty | |
self.knowledgeable = knowledgeable | |
self.hidden_npc = hidden_npc | |
self.hear_yourself = False | |
self.switch_no_light = switch_no_light | |
if n_colors is None: | |
self.n_colors = len(COLOR_NAMES) | |
else: | |
self.n_colors = n_colors | |
self.grammar = SocialAIGrammar() | |
self.init_done = False | |
# parameters - to be set in reset | |
self.parameters = None | |
self.add_npc_direction = True | |
self.add_npc_point_direction = True | |
self.add_npc_last_prim_action = True | |
self.reward_diminish_factor = reward_diminish_factor | |
self.egocentric_observation = egocentric_observation | |
self.encoding_size = 3 + 2*bool(not self.egocentric_observation) + bool(self.add_npc_direction) + bool(self.add_npc_point_direction) + bool(self.add_npc_last_prim_action) | |
super().__init__( | |
grid_size=size, | |
max_steps=max_steps, | |
# Set this to True for maximum speed | |
see_through_walls=see_through_walls, | |
actions=SocialAIActions, # primitive actions | |
action_space=SocialAIActionSpace, | |
add_npc_direction=self.add_npc_direction, | |
add_npc_point_direction=self.add_npc_point_direction, | |
add_npc_last_prim_action=self.add_npc_last_prim_action, | |
reward_diminish_factor=self.reward_diminish_factor, | |
) | |
self.all_npc_utterance_actions = self.caretaker.list_of_possible_utterances | |
self.prim_actions_dict = SocialAINPCActionsDict | |
def revert(self): | |
self.grid.set(*self.caretaker.cur_pos, None) | |
self.place_npc() | |
self.put_objects_in_env(remove_objects=True) | |
def is_in_marble_way(self, pos): | |
target_pos = self.generator_current_pos | |
# generator distractor is in the same row / collumn as the marble and the generator | |
# if self.distractor_current_pos is not None: | |
# distractor_pos = self.distractor_current_pos | |
# else: | |
# distractor_pos = [None, None] | |
if self.problem in ["Marbles", "Marble"]: | |
# point can't be in the same row or column as both the marble and the generator | |
# all three: marble, generator, loc are in the same row or column | |
if any((pos == target_pos) * (pos == self.marble_current_pos)): | |
# all three: marble, generator, loc are in the same row or column -> is in its way | |
return True | |
if int(self.parameters["N"]) > 1: | |
# is it in the way for the distractor generator | |
if any((pos == self.distractor_current_pos) * (pos == self.marble_current_pos)): | |
# all three: marble, distractor generator, loc are in the same row or column -> is in its way | |
return True | |
# all good | |
return False | |
def _gen_grid(self, width_, height_): | |
# Create the grid | |
self.grid = Grid(width_, height_, nb_obj_dims=self.encoding_size) | |
# new | |
min_w = min(9, width_) | |
min_h = min(9, height_) | |
self.current_width = self._rand_int(min_w, width_+1) | |
self.current_height = self._rand_int(min_h, height_+1) | |
self.wall_x = self.current_width-1 | |
self.wall_y = self.current_height-1 | |
# problem: Apples/Boxes/Switches/Generators/Marbles | |
self.problem = self.parameters["Problem"] if self.parameters else "Apples" | |
num_of_colors = self.parameters.get("Num_of_colors", None) if self.parameters else None | |
if num_of_colors is None: | |
num_of_colors = self.n_colors | |
# additional test for recursivness of joint attention -> cues are given outside of JA | |
self.ja_recursive = self.parameters.get("JA_recursive", False) == "Y" if self.parameters else False | |
self.add_obstacles() | |
if self.obstacles != "No": | |
warnings.warn("InformationSeeking should no be using obstacles.") | |
# Generate the surrounding walls | |
self.grid.wall_rect(0, 0, self.current_width, self.current_height) | |
if self.problem in ["Doors", "Levers"]: | |
# Add a second wall: this is needed so that an apple cannot be seen diagonally between the wall and the door | |
self.grid.wall_rect(1, 1, self.wall_x-1, self.wall_y-1) | |
# apple | |
self.apple_pos = (self.current_width, self.current_height) | |
# box | |
locked = self.problem == "Switches" | |
if num_of_colors is None: | |
POSSIBLE_COLORS = COLOR_NAMES.copy() | |
else: | |
POSSIBLE_COLORS = COLOR_NAMES[:int(num_of_colors)].copy() | |
self.box_color = self._rand_elem(POSSIBLE_COLORS) | |
if self.problem in ["Doors", "Levers"]: | |
# door | |
# find the position on a wall | |
self.apple_current_pos = self.find_loc( | |
size=(self.current_width, self.current_height), | |
reject_taken_pos=False, # we will create a gap in the wall | |
reject_agent_pos=True, | |
reject_fn=lambda _, pos: | |
not (pos[0] in [0, self.wall_x] or pos[1] in [0, self.wall_y]) or # reject not on a wall | |
tuple(pos) in [ | |
(0, 0), | |
(0, 1), | |
(1, 0), | |
(0, self.wall_y), | |
(0, self.wall_y-1), | |
(1, self.wall_y), | |
(self.wall_x, self.wall_y), | |
(self.wall_x-1, self.wall_y), | |
(self.wall_x, self.wall_y-1), | |
(self.wall_x, 0), | |
(self.wall_x, 1), | |
(self.wall_x-1, 0), | |
] | |
) | |
self.grid.set(*self.apple_current_pos, None) # hole in the wall | |
# door is in front of the apple | |
door_x = { | |
0: 1, | |
self.wall_x: self.wall_x - 1, | |
}.get(self.apple_current_pos[0], self.apple_current_pos[0]) | |
door_y = { | |
0: 1, | |
self.wall_y: self.wall_y - 1, | |
}.get(self.apple_current_pos[1], self.apple_current_pos[1]) | |
self.door_current_pos = np.array([door_x, door_y]) | |
self.grid.set(*self.door_current_pos, None) # hole in the wall | |
# lever | |
if self.problem in ["Levers"]: | |
self.lever_current_pos = self.find_loc( | |
top=(2, 2), | |
size=(self.current_width-4, self.current_height-4), | |
reject_agent_pos=True, | |
reject_fn=lambda _, pos: next_to(pos, self.door_current_pos) # reject in front of the door | |
) | |
else: | |
# find the position for the apple/box/generator_platform | |
self.apple_current_pos = self.find_loc(size=self.apple_pos, reject_agent_pos=True) | |
assert all(self.apple_current_pos < np.array([self.current_width-1, self.current_height-1])) | |
# door | |
self.door_color = self._rand_elem(POSSIBLE_COLORS) | |
# lever | |
self.lever_color = self._rand_elem(POSSIBLE_COLORS) | |
# switch | |
self.switch_pos = (self.current_width, self.current_height) | |
self.switch_color = self._rand_elem(POSSIBLE_COLORS) | |
self.switch_current_pos = self.find_loc( | |
size=self.switch_pos, | |
reject_agent_pos=True, | |
reject_fn=lambda _, pos: tuple(pos) in map(tuple, [self.apple_current_pos]), | |
) | |
# generator | |
self.generator_pos = (self.current_width, self.current_height) | |
self.generator_color = self._rand_elem(POSSIBLE_COLORS) | |
self.generator_current_pos = self.find_loc( | |
size=self.generator_pos, | |
reject_agent_pos=True, | |
reject_fn=lambda _, pos: ( | |
tuple(pos) in map(tuple, [self.apple_current_pos]) | |
or | |
(self.problem in ["Marble"] and tuple(pos) in [ | |
# not in corners | |
(1, 1), | |
(self.current_width-2, 1), | |
(1, self.current_height-2), | |
(self.current_width-2, self.current_height-2), | |
]) | |
or | |
# not in the same row collumn as the platform | |
(self.problem in ["Marble"] and any(pos == self.apple_current_pos)) | |
), | |
) | |
# generator platform | |
self.generator_platform_color = self._rand_elem(POSSIBLE_COLORS) | |
# marbles | |
self.marble_pos = (self.current_width, self.current_height) | |
self.marble_color = self._rand_elem(POSSIBLE_COLORS) | |
self.marble_current_pos = self.find_loc( | |
size=self.marble_pos, | |
reject_agent_pos=True, | |
reject_fn=lambda _, pos: self.problem in ["Marbles", "Marble"] and ( | |
tuple(pos) in map(tuple, [self.apple_current_pos, self.generator_current_pos]) | |
or | |
all(pos != self.generator_current_pos) # reject if not in row or column as the generator | |
or | |
any(pos == 1) # next to a wall | |
or | |
pos[1] == self.current_height-2 | |
or | |
pos[0] == self.current_width-2 | |
), | |
) | |
# distractor | |
if self.problem == "Boxes": | |
assert not locked | |
POSSIBLE_COLORS.remove(self.box_color) | |
elif self.problem == "Doors": | |
POSSIBLE_COLORS.remove(self.door_color) | |
elif self.problem == "Levers": | |
POSSIBLE_COLORS.remove(self.lever_color) | |
elif self.problem == "Switches": | |
POSSIBLE_COLORS.remove(self.switch_color) | |
elif self.problem in ["Generators", "Marble"]: | |
POSSIBLE_COLORS.remove(self.generator_color) | |
self.distractor_color = self._rand_elem(POSSIBLE_COLORS) | |
self.distractor_pos = (self.current_width, self.current_height) | |
# distractor reject function | |
if self.problem in ["Apples", "Boxes"]: | |
distractor_reject_fn = lambda _, pos: tuple(pos) in map(tuple, [self.apple_current_pos]) | |
elif self.problem in ["Switches"]: | |
distractor_reject_fn = lambda _, pos: tuple(pos) in map(tuple, [self.apple_current_pos, self.switch_current_pos]) | |
elif self.problem in ["Generators"]: | |
distractor_reject_fn = lambda _, pos: tuple(pos) in map(tuple, [self.apple_current_pos, self.generator_current_pos]) | |
elif self.problem in ["Marble"]: | |
# problem is marbles | |
if self.parameters["N"] == "1": | |
distractor_reject_fn = lambda _, pos: tuple(pos) in map(tuple, [self.apple_current_pos, self.generator_current_pos, self.marble_current_pos]) | |
else: | |
same_dim = (self.generator_current_pos == self.marble_current_pos).argmax() | |
distactor_same_dim = 1-same_dim | |
distractor_reject_fn = lambda _, pos: tuple(pos) in map(tuple, [ | |
self.apple_current_pos, | |
self.generator_current_pos, | |
self.marble_current_pos | |
]) or pos[distactor_same_dim] != self.marble_current_pos[distactor_same_dim] | |
elif self.problem in ["Doors"]: | |
# reject not next to a wall | |
distractor_reject_fn = lambda _, pos: ( | |
not (pos[0] in [1, self.wall_x-1] or pos[1] in [1, self.wall_y-1]) or # reject not on a wall | |
tuple(pos) in [ | |
(1, 1), | |
(self.wall_x-1, self.wall_y - 1), | |
(1, self.wall_y-1), | |
(self.wall_x-1, 1), | |
tuple(self.door_current_pos) | |
] | |
) | |
elif self.problem in ["Levers"]: | |
# not in front of the door | |
distractor_reject_fn = lambda _, pos: next_to(pos, self.door_current_pos) or tuple(pos) in list(map(tuple, [self.door_current_pos, self.lever_current_pos])) | |
else: | |
raise ValueError("Problem {} indefined.".format(self.problem)) | |
if self.problem == "Doors": | |
self.distractor_current_pos = self.find_loc( | |
top=(1, 1), | |
size=(self.current_width-2, self.current_height-2), | |
reject_agent_pos=True, | |
reject_fn=distractor_reject_fn, | |
reject_taken_pos=False | |
) | |
if self.parameters["N"] != "1": | |
self.grid.set(*self.distractor_current_pos, None) # hole in the wall | |
else: | |
self.distractor_current_pos = self.find_loc( | |
size=self.distractor_pos, | |
reject_agent_pos=True, | |
reject_fn=distractor_reject_fn | |
) | |
self.put_objects_in_env() | |
# NPC | |
put_peer = self.parameters["Peer"] if self.parameters else "N" | |
assert put_peer in ["Y", "N"] | |
color = self._rand_elem(COLOR_NAMES) | |
self.caretaker = Caretaker(color, "Caretaker", self) | |
if put_peer == "Y": | |
self.place_npc() | |
# Randomize the agent's start position and orientation | |
self.place_agent(size=(self.current_width, self.current_height)) | |
# Generate the mission string | |
self.mission = 'lets collaborate' | |
# Dummy beginning string | |
# self.beginning_string = "This is what you hear. \n" | |
self.beginning_string = "Conversation: \n" | |
self.utterance = self.beginning_string | |
# utterance appended at the end of each step | |
self.utterance_history = "" | |
# used for rendering | |
self.full_conversation = self.utterance | |
self.outcome_info = None | |
def place_npc(self): | |
if self.problem in ["Doors"]: | |
self.place_obj( | |
self.caretaker, | |
size=(self.current_width, self.current_height), | |
reject_fn=lambda _, pos: next_to(pos, self.door_current_pos) or next_to(pos, self.distractor_current_pos) | |
) | |
elif self.problem in ["Levers"]: | |
self.place_obj( | |
self.caretaker, | |
size=(self.current_width, self.current_height), | |
reject_fn=lambda _, pos: next_to(pos, self.door_current_pos) | |
) | |
else: | |
self.place_obj(self.caretaker, size=(self.current_width, self.current_height), reject_fn=InformationSeekingEnv.is_in_marble_way) | |
self.caretaker.initial_pos = self.caretaker.cur_pos | |
def put_objects_in_env(self, remove_objects=False): | |
assert self.apple_current_pos is not None | |
assert self.switch_current_pos is not None | |
self.doors_block_set = [] | |
self.levers_block_set = [] | |
self.switches_block_set = [] | |
self.boxes_block_set = [] | |
self.generators_block_set = [] | |
self.distractor_door = None | |
self.distractor_lever = None | |
self.distractor_box = None | |
self.distractor_switch = None | |
self.distractor_generator = None | |
# problem: Apples/Boxes/Switches/Generators | |
assert self.problem == self.parameters["Problem"] if self.parameters else "Apples" | |
# move objects (used only in revert), not in gen_grid | |
if remove_objects: | |
# remove apple or box | |
# assert type(self.grid.get(*self.apple_current_pos)) in [Apple, LockableBox] | |
# self.grid.set(*self.apple_current_pos, None) | |
# remove apple (after demo it must be an apple) | |
assert type(self.grid.get(*self.apple_current_pos)) in [Apple] | |
self.grid.set(*self.apple_current_pos, None) | |
if self.problem in ["Doors"]: | |
# assert type(self.grid.get(*self.door_current_pos)) in [Door] | |
self.grid.set(*self.door.cur_pos, None) | |
elif self.problem in ["Levers"]: | |
# assert type(self.grid.get(*self.door_current_pos)) in [Door] | |
self.grid.set(*self.remote_door.cur_pos, None) | |
self.grid.set(*self.lever.cur_pos, None) | |
elif self.problem in ["Switches"]: | |
# remove switch | |
assert type(self.grid.get(*self.switch_current_pos)) in [Switch] | |
self.grid.set(*self.switch.cur_pos, None) | |
elif self.problem in ["Generators", "Marbles", "Marble"]: | |
# remove generator | |
assert type(self.grid.get(*self.generator.cur_pos)) in [AppleGenerator] | |
self.grid.set(*self.generator.cur_pos, None) | |
if self.problem in ["Marbles", "Marble"]: | |
# remove generator | |
assert type(self.grid.get(*self.marble.cur_pos)) in [Marble] | |
self.grid.set(*self.marble.cur_pos, None) | |
if self.marble.tee_uncovered: | |
self.grid.set(*self.marble.tee.cur_pos, None) | |
elif self.problem in ["Apples", "Boxes"]: | |
pass | |
else: | |
raise ValueError("Undefined problem {}".format(self.problem)) | |
# remove distractor | |
if self.problem in ["Boxes", "Switches", "Generators", "Marbles", "Marble", "Doors", "Levers"] and self.parameters["N"] != "1": | |
assert type(self.grid.get(*self.distractor_current_pos)) in [LockableBox, Switch, AppleGenerator, Door, Lever] | |
self.grid.set(*self.distractor_current_pos, None) | |
# apple | |
self.apple = Apple() | |
# Box | |
locked = self.problem == "Switches" | |
self.box = LockableBox( | |
self.box_color, | |
contains=self.apple, | |
is_locked=locked, | |
block_set=self.boxes_block_set | |
) | |
self.boxes_block_set.append(self.box) | |
# Doors | |
self.door = Door( | |
color=self.door_color, | |
is_locked=False, | |
block_set=self.doors_block_set, | |
) | |
self.doors_block_set.append(self.door) | |
# Levers | |
self.remote_door = RemoteDoor( | |
color=self.door_color, | |
) | |
self.lever = Lever( | |
color=self.lever_color, | |
object=self.remote_door, | |
active_steps=None, | |
block_set=self.levers_block_set, | |
) | |
self.levers_block_set.append(self.lever) | |
# Switch | |
self.switch = Switch( | |
color=self.switch_color, | |
lockable_object=self.box, | |
locker_switch=True, | |
no_turn_off=True, | |
no_light=self.switch_no_light, | |
block_set=self.switches_block_set, | |
) | |
self.switches_block_set.append(self.switch) | |
# Generator | |
self.generator = AppleGenerator( | |
self.generator_color, | |
block_set=self.generators_block_set, | |
# on_push=lambda: self.put_obj_np(self.apple, self.apple_current_pos) | |
on_push=lambda: self.grid.set(*self.apple_current_pos, self.apple), | |
marble_activation=self.problem in ["Marbles", "Marble"], | |
) | |
self.generators_block_set.append(self.generator) | |
self.generator_platform = GeneratorPlatform(self.generator_platform_color) | |
self.marble = Marble(self.marble_color, env=self) | |
if self.problem in ["Apples"]: | |
self.put_obj_np(self.apple, self.apple_current_pos) | |
elif self.problem in ["Doors"]: | |
self.put_obj_np(self.apple, self.apple_current_pos) | |
self.put_obj_np(self.door, self.door_current_pos) | |
elif self.problem in ["Levers"]: | |
self.put_obj_np(self.apple, self.apple_current_pos) | |
self.put_obj_np(self.remote_door, self.door_current_pos) | |
self.put_obj_np(self.lever, self.lever_current_pos) | |
elif self.problem in ["Boxes"]: | |
self.put_obj_np(self.box, self.apple_current_pos) | |
elif self.problem in ["Switches"]: | |
self.put_obj_np(self.box, self.apple_current_pos) | |
self.put_obj_np(self.switch, self.switch_current_pos) | |
elif self.problem in ["Generators", "Marbles", "Marble"]: | |
self.put_obj_np(self.generator, self.generator_current_pos) | |
self.put_obj_np(self.generator_platform, self.apple_current_pos) | |
if self.problem in ["Marbles", "Marble"]: | |
self.put_obj_np(self.marble, self.marble_current_pos) | |
else: | |
raise ValueError("Problem {} not defined. ".format(self.problem)) | |
# Distractors | |
if self.problem not in ["Apples"]: | |
N = int(self.parameters["N"]) | |
if N > 1: | |
assert N == 2 | |
if self.problem == "Boxes": | |
assert not locked | |
self.distractor_box = LockableBox( | |
self.distractor_color, | |
is_locked=locked, | |
block_set=self.boxes_block_set, | |
) | |
self.boxes_block_set.append(self.distractor_box) | |
self.put_obj_np(self.distractor_box, self.distractor_current_pos) | |
elif self.problem == "Doors": | |
self.distractor_door = Door( | |
color=self.distractor_color, | |
is_locked=False, | |
block_set=self.doors_block_set, | |
) | |
self.doors_block_set.append(self.distractor_door) | |
self.put_obj_np(self.distractor_door, self.distractor_current_pos) | |
elif self.problem == "Levers": | |
self.distractor_lever = Lever( | |
color=self.distractor_color, | |
active_steps=None, | |
block_set=self.levers_block_set, | |
) | |
self.levers_block_set.append(self.distractor_lever) | |
self.put_obj_np(self.distractor_lever, self.distractor_current_pos) | |
elif self.problem == "Switches": | |
self.distractor_switch = Switch( | |
color=self.distractor_color, | |
locker_switch=True, | |
no_turn_off=True, | |
no_light=self.switch_no_light, | |
block_set=self.switches_block_set, | |
) | |
self.switches_block_set.append(self.distractor_switch) | |
self.put_obj_np(self.distractor_switch, self.distractor_current_pos) | |
elif self.problem in ["Generators", "Marbles", "Marble"]: | |
self.distractor_generator = AppleGenerator( | |
color=self.distractor_color, | |
block_set=self.generators_block_set, | |
marble_activation=self.problem in ["Marbles", "Marble"], | |
) | |
self.generators_block_set.append(self.distractor_generator) | |
self.put_obj_np(self.distractor_generator, self.distractor_current_pos) | |
else: | |
raise ValueError("Undefined N for problem {}".format(self.problem)) | |
def reset( | |
self, *args, **kwargs | |
): | |
# This env must be used inside the parametric env | |
if not kwargs: | |
# The only place when kwargs can empty is during the class construction | |
# reset should be called again before using the env (paramenv does it in its constructor) | |
assert self.parameters is None | |
assert not self.init_done | |
self.init_done = True | |
obs = super().reset() | |
return obs | |
else: | |
assert self.init_done | |
self.parameters = dict(kwargs) | |
assert self.parameters is not None | |
assert len(self.parameters) > 0 | |
obs = super().reset() | |
self.agent_ate_the_apple = False | |
self.agent_opened_the_box = False | |
self.agent_opened_the_door = False | |
self.agent_pulled_the_lever = False | |
self.agent_turned_on_the_switch = False | |
self.agent_pressed_the_generator = False | |
self.agent_pushed_the_marble = False | |
return obs | |
def step(self, action): | |
success = False | |
p_action = action[0] | |
utterance_action = action[1:] | |
apple_had_been_eaten = self.apple.eaten | |
box_had_been_opened = self.box.is_open | |
door_had_been_opened = self.door.is_open | |
lever_had_been_pulled = self.lever.is_on | |
switch_had_been_turned_on = self.switch.is_on | |
generator_had_been_pressed = self.generator.is_pressed | |
marble_had_been_pushed = self.marble.was_pushed | |
# primitive actions | |
_, reward, done, info = super().step(p_action) | |
if self.problem in ["Marbles", "Marble"]: | |
# todo: create stepable objects which are stepped automatically? | |
self.marble.step() | |
# eaten just now by primitive actions of the agent | |
if not self.agent_ate_the_apple: | |
self.agent_ate_the_apple = self.apple.eaten and not apple_had_been_eaten | |
if not self.agent_opened_the_box: | |
self.agent_opened_the_box = self.box.is_open and not box_had_been_opened | |
if not self.agent_opened_the_door: | |
self.agent_opened_the_door = self.door.is_open and not door_had_been_opened | |
if not self.agent_pulled_the_lever: | |
self.agent_pulled_the_lever = self.lever.is_on and not lever_had_been_pulled | |
if not self.agent_turned_on_the_switch: | |
self.agent_turned_on_the_switch = self.switch.is_on and not switch_had_been_turned_on | |
if not self.agent_pressed_the_generator: | |
self.agent_pressed_the_generator = self.generator.is_pressed and not generator_had_been_pressed | |
if not self.agent_pushed_the_marble: | |
self.agent_pushed_the_marble = self.marble.was_pushed and not marble_had_been_pushed | |
# utterances | |
agent_spoke = not all(np.isnan(utterance_action)) | |
if agent_spoke: | |
utterance = self.grammar.construct_utterance(utterance_action) | |
if self.hear_yourself: | |
self.utterance += "YOU: {} \n".format(utterance) | |
self.full_conversation += "YOU: {} \n".format(utterance) | |
else: | |
utterance = None | |
if self.parameters["Peer"] == "Y": | |
reply, npc_info = self.caretaker.step(utterance) | |
else: | |
reply = None | |
npc_info = self.caretaker.create_info( | |
action=None, | |
utterance=None, | |
was_introduced_to=False | |
) | |
if reply: | |
self.utterance += "{}: {} \n".format(self.caretaker.name, reply) | |
self.full_conversation += "{}: {} \n".format(self.caretaker.name, reply) | |
# aftermath | |
if p_action == self.actions.done: | |
done = True | |
elif self.agent_ate_the_apple: | |
# check that it is the agent who ate it | |
assert self.actions(p_action) == self.actions.toggle | |
assert self.get_cell(*self.front_pos) == self.apple | |
if self.parameters.get("Cue_type", "nan") == "Emulation": | |
# during emulation it can be the NPC who eats the apple, opens the box, and turns on the switch | |
if self.parameters["Scaffolding"] and self.caretaker.apple_unlocked_for_agent: | |
# if the caretaker unlocked the apple the agent gets reward upon eating it | |
reward = self._reward() | |
success = True | |
elif self.problem == "Apples": | |
reward = self._reward() | |
success = True | |
elif self.problem == "Doors" and self.agent_opened_the_door: | |
reward = self._reward() | |
success = True | |
elif self.problem == "Levers" and self.agent_pulled_the_lever: | |
reward = self._reward() | |
success = True | |
elif self.problem == "Boxes" and self.agent_opened_the_box: | |
reward = self._reward() | |
success = True | |
elif self.problem == "Switches" and self.agent_opened_the_box and self.agent_turned_on_the_switch: | |
reward = self._reward() | |
success = True | |
elif self.problem == "Generators" and self.agent_pressed_the_generator: | |
reward = self._reward() | |
success = True | |
elif self.problem in ["Marble"] and self.agent_pushed_the_marble: | |
reward = self._reward() | |
success = True | |
else: | |
reward = self._reward() | |
success = True | |
done = True | |
# discount | |
if self.step_penalty: | |
reward = reward - 0.01 | |
# update obs with NPC movement | |
obs = self.gen_obs(full_obs=self.full_obs) | |
# fill observation with text | |
self.append_existing_utterance_to_history() | |
obs = self.add_utterance_to_observation(obs) | |
self.reset_utterance() | |
if done: | |
if reward > 0: | |
self.outcome_info = "SUCCESS: agent got {} reward \n".format(np.round(reward, 1)) | |
else: | |
self.outcome_info = "FAILURE: agent got {} reward \n".format(reward) | |
# is the npc seen by the agent | |
ag_view_npc = self.relative_coords(*self.caretaker.cur_pos) | |
if ag_view_npc is not None: | |
# in the agent's field of view | |
ag_view_npc_x, ag_view_npc_y = ag_view_npc | |
n_dims = obs['image'].shape[-1] | |
npc_encoding = self.caretaker.encode(n_dims) | |
# is it occluded | |
npc_observed = all(obs['image'][ag_view_npc_x, ag_view_npc_y] == npc_encoding) | |
else: | |
npc_observed = False | |
info = {**info, **{"NPC_"+k: v for k, v in npc_info.items()}} | |
info["NPC_observed"] = npc_observed | |
info["success"] = success | |
assert success == (reward > 0) | |
return obs, reward, done, info | |
def _reward(self): | |
if self.diminished_reward: | |
return super()._reward() | |
else: | |
return 1.0 | |
def render(self, *args, **kwargs): | |
obs = super().render(*args, **kwargs) | |
if args and args[0] == 'human': | |
self.window.clear_text() # erase previous text | |
self.window.set_caption(self.full_conversation) | |
# self.window.ax.set_title("correct color: {}".format(self.box.target_color), loc="left", fontsize=10) | |
if self.outcome_info: | |
color = None | |
if "SUCCESS" in self.outcome_info: | |
color = "lime" | |
elif "FAILURE" in self.outcome_info: | |
color = "red" | |
self.window.add_text(*(0.01, 0.85, self.outcome_info), | |
**{'fontsize': 15, 'color': color, 'weight': "bold"}) | |
self.window.show_img(obs) # re-draw image to add changes to window | |
return obs | |
register( | |
id='SocialAI-InformationSeeking-v0', | |
entry_point='gym_minigrid.social_ai_envs:InformationSeekingEnv' | |
) |