import time import random import numpy as np from gym_minigrid.social_ai_envs.socialaigrammar import SocialAIGrammar, SocialAIActions, SocialAIActionSpace from gym_minigrid.minigrid import * from gym_minigrid.register import register import time from collections import deque def next_to(posa, posb): if type(posa) == tuple: posa = np.array(posa) if type(posb) == tuple: posb = np.array(posb) return abs(posa-posb).sum() == 1 class Caretaker(NPC): """ A simple NPC that knows who is telling the truth """ def __init__(self, color, name, env): super().__init__(color) self.name = name self.env = env self.npc_dir = 1 # NPC initially looks downward self.npc_type = 0 # this will be put into the encoding self.was_introduced_to = False self.decoy_color_given = False self.ate_an_apple = False self.demo_over = False self.demo_over_and_position_safe = False self.apple_unlocked_for_agent = False self.list_of_possible_utterances = [ *self.list_of_possible_utterances, "Hot", "Warm", "Medium", "Cold", *COLOR_NAMES ] # target obj assert self.env.problem == self.env.parameters["Problem"] if self.env.parameters else "Apples" if self.env.problem in ["Apples"]: self.target_obj = self.env.apple self.distractor_obj = None elif self.env.problem == "Doors": self.target_obj = self.env.door self.distractor_obj = self.env.distractor_door elif self.env.problem == "Levers": self.target_obj = self.env.lever self.distractor_obj = self.env.distractor_lever elif self.env.problem == "Boxes": self.target_obj = self.env.box self.distractor_obj = self.env.distractor_box elif self.env.problem == "Switches": self.target_obj = self.env.switch self.distractor_obj = self.env.distractor_switch elif self.env.problem == "Generators": self.target_obj = self.env.generator self.distractor_obj = self.env.distractor_generator elif self.env.problem in ["Marble", "Marbles"]: self.target_obj = self.env.generator self.distractor_obj = self.env.distractor_generator if self.env.ja_recursive: # how many objects if int(self.env.parameters["N"]) == 1: self.ja_decoy = self.env._rand_elem([self.target_obj]) else: self.ja_decoy = self.env._rand_elem([self.target_obj, self.distractor_obj]) # the other object is a decoy distractor self.ja_decoy_distractor = list({self.target_obj, self.distractor_obj} - {self.ja_decoy})[0] self.decoy_point_from_loc = self.find_point_from_loc( target_pos=self.ja_decoy.cur_pos, distractor_pos=self.ja_decoy_distractor.cur_pos if self.ja_decoy_distractor else None ) self.point_from_loc = self.find_point_from_loc() assert self.env.grammar.contains_utterance(self.introduction_statement) def step(self, utterance): reply, info = super().step() if self.env.hidden_npc: return reply, info scaffolding = self.env.parameters.get("Scaffolding", "N") == "Y" language_color = False language_feedback = False pointing = False emulation = False if not scaffolding: cue_type = self.env.parameters["Cue_type"] if cue_type == "Language_Color": language_color = True elif cue_type == "Language_Feedback": language_feedback = True elif cue_type == "Pointing": pointing = True elif cue_type == "Emulation": emulation = True else: raise ValueError(f"Cue_type ({cue_type}) not defined.") else: # there are no cues if scaffolding is used (the peer gives the apples to the agent) assert "Cue_type" not in self.env.parameters # there is no additional test for joint attention (no cues are given so this wouldn't make sense) assert not self.env.ja_recursive reply, action = None, None if not self.was_introduced_to: # check introduction, updates was_introduced_to if needed reply, action = self.handle_introduction(utterance) assert action is None if self.env.ja_recursive: # look at the center of the room (this makes the cue giving inside and outisde JA different) action = self.look_at_action([self.env.current_width // 2, self.env.current_height // 2]) else: # look at the agent action = self.look_at_action(self.env.agent_pos) if self.was_introduced_to: # was introduced just now if self.is_pointing(): action = self.stop_point if language_color: # only say the color once reply = self.target_obj.color elif self.env.ja_recursive: # was not introduced if language_feedback: # random reply reply = self.env._rand_elem([ "Hot", "Warm", "Medium", "Cold" ]) if language_color and not self.decoy_color_given: # color of a decoy (can be the correct one) reply = self.ja_decoy.color self.decoy_color_given=True if pointing: # point to a decoy action = self.goto_point_action( point_from_loc=self.decoy_point_from_loc, target_pos=self.ja_decoy.cur_pos, distractor_pos=self.ja_decoy_distractor.cur_pos if self.ja_decoy_distractor else None ) if self.is_pointing(): # if it's already pointing, turn to look at the center (to avoid looking at the wall) action = self.look_at_action([self.env.current_width//2, self.env.current_height//2]) else: if self.was_introduced_to and language_color: # language only once at introduction # reply = self.target_obj.color action = self.look_at_action(self.env.agent_pos) if self.was_introduced_to and language_feedback: # closeness string agent_distance_to_target = np.abs(self.target_obj.cur_pos - self.env.agent_pos).sum() if agent_distance_to_target <= 1: reply = "Hot" elif agent_distance_to_target <= 2: reply = "Warm" elif agent_distance_to_target <= 5: reply = "Medium" elif agent_distance_to_target >= 5: reply = "Cold" action = self.look_at_action(self.env.agent_pos) # pointing if self.was_introduced_to and pointing: if self.env.parameters["N"] == "1": distractor_pos = None else: distractor_pos = self.distractor_obj.cur_pos action = self.goto_point_action( point_from_loc=self.point_from_loc, target_pos=self.target_obj.cur_pos, distractor_pos=distractor_pos, ) if self.is_pointing(): action = self.look_at_action(self.env.agent_pos) # emulation or scaffolding emulation_demo = self.was_introduced_to and emulation and not self.demo_over scaffolding_help = self.was_introduced_to and scaffolding # do the demonstration / unlock the apple # in both of those two scenarios the NPC in essence solves the task # in demonstration - it eats the apple, and reverts the env at the end # in scaffolding - it doesn't eat the apple and looks at the agent if emulation_demo or scaffolding_help: if emulation_demo or (scaffolding_help and not self.apple_unlocked_for_agent): if self.is_pointing(): # don't point during demonstration action = self.stop_point else: # if apple unlocked go pick it up if self.target_obj == self.env.switch and self.env.switch.is_on: assert self.env.parameters["Problem"] == "Switches" next_target_position = self.env.box.cur_pos elif self.target_obj == self.env.generator and self.env.generator.is_pressed: assert self.env.parameters["Problem"] in ["Generators", "Marbles", "Marble"] next_target_position = self.env.generator_platform.cur_pos elif self.target_obj == self.env.door and self.env.door.is_open: next_target_position = self.env.apple.cur_pos elif self.target_obj == self.env.lever and self.env.lever.is_on: next_target_position = self.env.apple.cur_pos else: next_target_position = self.target_obj.cur_pos if self.target_obj == self.env.generator and not self.env.generator.is_pressed: if not self.env.generator.marble_activation: # push generator action = self.path_to_pos(next_target_position) else: # find angle if self.env.marble.moving_dir is None: distance = (self.env.marble.cur_pos - self.env.generator.cur_pos) diff = np.sign(distance) if sum(abs(diff)) == 1: # if the agent pushed the ball during demo diff can be > 1, then it's unsolvable push_pos = self.env.marble.cur_pos+diff if all(self.cur_pos == push_pos): next_target_position = self.env.marble.cur_pos else: next_target_position = push_pos # go to loc in front of # push action = self.path_to_pos(next_target_position) else: # toggle all other objects action = self.path_to_toggle_pos(next_target_position) # for scaffolding check if trying to eat the apple # if so, stop - apple is unlocked if scaffolding_help: if ( self.env.get_cell(*self.front_pos) == self.env.apple and action == self.toggle_action ): # don't eat the apple action = None self.apple_unlocked_for_agent = True # for emulation check if trying to toggle the eaten apple # if so, stop and revert the env - demo is over if emulation_demo: if ( self.ate_an_apple and self.env.get_cell(*self.front_pos) == self.env.apple and action == self.toggle_action and self.env.apple.eaten ): # trying to toggle an apple it ate self.env.revert() self.demo_over = True action = None # if scaffolding apple unlocked, look at the agent if scaffolding_help and self.apple_unlocked_for_agent: if all(self.cur_pos == self.initial_pos): # if the apple is unlocked look at the agent wanted_dir = self.compute_wanted_dir(self.env.agent_pos) action = self.compute_turn_action(wanted_dir) else: # go to init pos, this removes problems in case the apple is unreachable now action = self.path_to_pos(self.initial_pos) if self.was_introduced_to and emulation and self.demo_over and not self.demo_over_and_position_safe: if self.env.is_in_marble_way(self.cur_pos): action = self.path_to_pos(self.find_point_from_loc()) else: self.demo_over_and_position_safe = True if self.demo_over_and_position_safe: assert emulation or scaffolding # look at the agent after demo is done action = self.look_at_action(self.env.agent_pos) if self.was_introduced_to and self.env.parameters["Scaffolding"] == "Y": if "Emulation" in self.env.parameters or "Pointing" in self.env.parameters or "Language_grounding" in self.env.parameters: raise ValueError( "Scaffolding cannot be used with information giving (Emulation, Pointing, Language_grounding)" ) eaten_before = self.env.apple.eaten if action is not None: action() # check if the NPC ate the apple eaten_after = self.env.apple.eaten self.ate_an_apple = not eaten_before and eaten_after info = self.create_info( action=action, utterance=reply, was_introduced_to=self.was_introduced_to, ) assert (reply or "no_op") in self.list_of_possible_utterances return reply, info def create_info(self, action, utterance, was_introduced_to): info = { "prim_action": action.__name__ if action is not None else "no_op", "utterance": utterance or "no_op", "was_introduced_to": was_introduced_to } return info def is_point_from_loc(self, pos, target_pos=None, distractor_pos=None): if target_pos is None: target_pos = self.target_obj.cur_pos if distractor_pos is None: if self.distractor_obj is not None: distractor_pos = self.distractor_obj.cur_pos else: distractor_pos = [None, None] if self.env.is_in_marble_way(pos): return False if self.env.problem in ["Doors", "Levers"]: # must not be in front of a door if abs(self.env.door_current_pos - pos).sum() == 1: return False if self.env.problem in ["Doors"]: if abs(self.env.distractor_current_pos - pos).sum() == 1: return False if any(pos == target_pos): same_ind = np.argmax(target_pos == pos) # is there an occlusion in the way start = pos[1-same_ind] end = target_pos[1-same_ind] step = 1 if start <= end else -1 for i in np.arange(start, end, step): p = pos.copy() p[1-same_ind] = i cell = self.env.grid.get(*p) if cell is not None: if not cell.see_behind(): return False if pos[same_ind] != distractor_pos[same_ind]: return True if pos[same_ind] == distractor_pos[same_ind]: # if in between if distractor_pos[1-same_ind] < pos[1-same_ind] < target_pos[1-same_ind]: return True if distractor_pos[1-same_ind] > pos[1-same_ind] > target_pos[1-same_ind]: return True return False def find_point_from_loc(self, target_pos=None, distractor_pos=None): reject_fn = lambda env, p: not self.is_point_from_loc(p, target_pos=target_pos, distractor_pos=distractor_pos) point = self.env.find_loc(size=(self.env.wall_x, self.env.wall_y), reject_fn=reject_fn, reject_agent_pos=False) # assert all(point < np.array([self.env.wall_x, self.env.wall_y])) # assert all(point > np.array([0, 0])) return point def goto_point_action(self, point_from_loc, target_pos, distractor_pos): if self.is_point_from_loc(self.cur_pos, target_pos=target_pos, distractor_pos=distractor_pos): # point to a direction action = self.compute_wanted_point_action(target_pos) else: # do not point if not is_point_from_loc if self.is_pointing(): # stop pointing action = self.stop_point else: # move action = self.path_to_pos(point_from_loc) return action class InformationSeekingEnv(MultiModalMiniGridEnv): """ Environment in which the agent is instructed to go to a given object named using an English text string """ def __init__( self, size=10, diminished_reward=True, step_penalty=False, knowledgeable=False, max_steps=80, hidden_npc=False, switch_no_light=True, reward_diminish_factor=0.1, see_through_walls=False, n_colors=None, egocentric_observation=True, ): assert size >= 5 self.empty_symbol = "NA \n" self.diminished_reward = diminished_reward self.step_penalty = step_penalty self.knowledgeable = knowledgeable self.hidden_npc = hidden_npc self.hear_yourself = False self.switch_no_light = switch_no_light if n_colors is None: self.n_colors = len(COLOR_NAMES) else: self.n_colors = n_colors self.grammar = SocialAIGrammar() self.init_done = False # parameters - to be set in reset self.parameters = None self.add_npc_direction = True self.add_npc_point_direction = True self.add_npc_last_prim_action = True self.reward_diminish_factor = reward_diminish_factor self.egocentric_observation = egocentric_observation self.encoding_size = 3 + 2*bool(not self.egocentric_observation) + bool(self.add_npc_direction) + bool(self.add_npc_point_direction) + bool(self.add_npc_last_prim_action) super().__init__( grid_size=size, max_steps=max_steps, # Set this to True for maximum speed see_through_walls=see_through_walls, actions=SocialAIActions, # primitive actions action_space=SocialAIActionSpace, add_npc_direction=self.add_npc_direction, add_npc_point_direction=self.add_npc_point_direction, add_npc_last_prim_action=self.add_npc_last_prim_action, reward_diminish_factor=self.reward_diminish_factor, ) self.all_npc_utterance_actions = self.caretaker.list_of_possible_utterances self.prim_actions_dict = SocialAINPCActionsDict def revert(self): self.grid.set(*self.caretaker.cur_pos, None) self.place_npc() self.put_objects_in_env(remove_objects=True) def is_in_marble_way(self, pos): target_pos = self.generator_current_pos # generator distractor is in the same row / collumn as the marble and the generator # if self.distractor_current_pos is not None: # distractor_pos = self.distractor_current_pos # else: # distractor_pos = [None, None] if self.problem in ["Marbles", "Marble"]: # point can't be in the same row or column as both the marble and the generator # all three: marble, generator, loc are in the same row or column if any((pos == target_pos) * (pos == self.marble_current_pos)): # all three: marble, generator, loc are in the same row or column -> is in its way return True if int(self.parameters["N"]) > 1: # is it in the way for the distractor generator if any((pos == self.distractor_current_pos) * (pos == self.marble_current_pos)): # all three: marble, distractor generator, loc are in the same row or column -> is in its way return True # all good return False def _gen_grid(self, width_, height_): # Create the grid self.grid = Grid(width_, height_, nb_obj_dims=self.encoding_size) # new min_w = min(9, width_) min_h = min(9, height_) self.current_width = self._rand_int(min_w, width_+1) self.current_height = self._rand_int(min_h, height_+1) self.wall_x = self.current_width-1 self.wall_y = self.current_height-1 # problem: Apples/Boxes/Switches/Generators/Marbles self.problem = self.parameters["Problem"] if self.parameters else "Apples" num_of_colors = self.parameters.get("Num_of_colors", None) if self.parameters else None if num_of_colors is None: num_of_colors = self.n_colors # additional test for recursivness of joint attention -> cues are given outside of JA self.ja_recursive = self.parameters.get("JA_recursive", False) == "Y" if self.parameters else False self.add_obstacles() if self.obstacles != "No": warnings.warn("InformationSeeking should no be using obstacles.") # Generate the surrounding walls self.grid.wall_rect(0, 0, self.current_width, self.current_height) if self.problem in ["Doors", "Levers"]: # Add a second wall: this is needed so that an apple cannot be seen diagonally between the wall and the door self.grid.wall_rect(1, 1, self.wall_x-1, self.wall_y-1) # apple self.apple_pos = (self.current_width, self.current_height) # box locked = self.problem == "Switches" if num_of_colors is None: POSSIBLE_COLORS = COLOR_NAMES.copy() else: POSSIBLE_COLORS = COLOR_NAMES[:int(num_of_colors)].copy() self.box_color = self._rand_elem(POSSIBLE_COLORS) if self.problem in ["Doors", "Levers"]: # door # find the position on a wall self.apple_current_pos = self.find_loc( size=(self.current_width, self.current_height), reject_taken_pos=False, # we will create a gap in the wall reject_agent_pos=True, reject_fn=lambda _, pos: not (pos[0] in [0, self.wall_x] or pos[1] in [0, self.wall_y]) or # reject not on a wall tuple(pos) in [ (0, 0), (0, 1), (1, 0), (0, self.wall_y), (0, self.wall_y-1), (1, self.wall_y), (self.wall_x, self.wall_y), (self.wall_x-1, self.wall_y), (self.wall_x, self.wall_y-1), (self.wall_x, 0), (self.wall_x, 1), (self.wall_x-1, 0), ] ) self.grid.set(*self.apple_current_pos, None) # hole in the wall # door is in front of the apple door_x = { 0: 1, self.wall_x: self.wall_x - 1, }.get(self.apple_current_pos[0], self.apple_current_pos[0]) door_y = { 0: 1, self.wall_y: self.wall_y - 1, }.get(self.apple_current_pos[1], self.apple_current_pos[1]) self.door_current_pos = np.array([door_x, door_y]) self.grid.set(*self.door_current_pos, None) # hole in the wall # lever if self.problem in ["Levers"]: self.lever_current_pos = self.find_loc( top=(2, 2), size=(self.current_width-4, self.current_height-4), reject_agent_pos=True, reject_fn=lambda _, pos: next_to(pos, self.door_current_pos) # reject in front of the door ) else: # find the position for the apple/box/generator_platform self.apple_current_pos = self.find_loc(size=self.apple_pos, reject_agent_pos=True) assert all(self.apple_current_pos < np.array([self.current_width-1, self.current_height-1])) # door self.door_color = self._rand_elem(POSSIBLE_COLORS) # lever self.lever_color = self._rand_elem(POSSIBLE_COLORS) # switch self.switch_pos = (self.current_width, self.current_height) self.switch_color = self._rand_elem(POSSIBLE_COLORS) self.switch_current_pos = self.find_loc( size=self.switch_pos, reject_agent_pos=True, reject_fn=lambda _, pos: tuple(pos) in map(tuple, [self.apple_current_pos]), ) # generator self.generator_pos = (self.current_width, self.current_height) self.generator_color = self._rand_elem(POSSIBLE_COLORS) self.generator_current_pos = self.find_loc( size=self.generator_pos, reject_agent_pos=True, reject_fn=lambda _, pos: ( tuple(pos) in map(tuple, [self.apple_current_pos]) or (self.problem in ["Marble"] and tuple(pos) in [ # not in corners (1, 1), (self.current_width-2, 1), (1, self.current_height-2), (self.current_width-2, self.current_height-2), ]) or # not in the same row collumn as the platform (self.problem in ["Marble"] and any(pos == self.apple_current_pos)) ), ) # generator platform self.generator_platform_color = self._rand_elem(POSSIBLE_COLORS) # marbles self.marble_pos = (self.current_width, self.current_height) self.marble_color = self._rand_elem(POSSIBLE_COLORS) self.marble_current_pos = self.find_loc( size=self.marble_pos, reject_agent_pos=True, reject_fn=lambda _, pos: self.problem in ["Marbles", "Marble"] and ( tuple(pos) in map(tuple, [self.apple_current_pos, self.generator_current_pos]) or all(pos != self.generator_current_pos) # reject if not in row or column as the generator or any(pos == 1) # next to a wall or pos[1] == self.current_height-2 or pos[0] == self.current_width-2 ), ) # distractor if self.problem == "Boxes": assert not locked POSSIBLE_COLORS.remove(self.box_color) elif self.problem == "Doors": POSSIBLE_COLORS.remove(self.door_color) elif self.problem == "Levers": POSSIBLE_COLORS.remove(self.lever_color) elif self.problem == "Switches": POSSIBLE_COLORS.remove(self.switch_color) elif self.problem in ["Generators", "Marble"]: POSSIBLE_COLORS.remove(self.generator_color) self.distractor_color = self._rand_elem(POSSIBLE_COLORS) self.distractor_pos = (self.current_width, self.current_height) # distractor reject function if self.problem in ["Apples", "Boxes"]: distractor_reject_fn = lambda _, pos: tuple(pos) in map(tuple, [self.apple_current_pos]) elif self.problem in ["Switches"]: distractor_reject_fn = lambda _, pos: tuple(pos) in map(tuple, [self.apple_current_pos, self.switch_current_pos]) elif self.problem in ["Generators"]: distractor_reject_fn = lambda _, pos: tuple(pos) in map(tuple, [self.apple_current_pos, self.generator_current_pos]) elif self.problem in ["Marble"]: # problem is marbles if self.parameters["N"] == "1": distractor_reject_fn = lambda _, pos: tuple(pos) in map(tuple, [self.apple_current_pos, self.generator_current_pos, self.marble_current_pos]) else: same_dim = (self.generator_current_pos == self.marble_current_pos).argmax() distactor_same_dim = 1-same_dim distractor_reject_fn = lambda _, pos: tuple(pos) in map(tuple, [ self.apple_current_pos, self.generator_current_pos, self.marble_current_pos ]) or pos[distactor_same_dim] != self.marble_current_pos[distactor_same_dim] elif self.problem in ["Doors"]: # reject not next to a wall distractor_reject_fn = lambda _, pos: ( not (pos[0] in [1, self.wall_x-1] or pos[1] in [1, self.wall_y-1]) or # reject not on a wall tuple(pos) in [ (1, 1), (self.wall_x-1, self.wall_y - 1), (1, self.wall_y-1), (self.wall_x-1, 1), tuple(self.door_current_pos) ] ) elif self.problem in ["Levers"]: # not in front of the door distractor_reject_fn = lambda _, pos: next_to(pos, self.door_current_pos) or tuple(pos) in list(map(tuple, [self.door_current_pos, self.lever_current_pos])) else: raise ValueError("Problem {} indefined.".format(self.problem)) if self.problem == "Doors": self.distractor_current_pos = self.find_loc( top=(1, 1), size=(self.current_width-2, self.current_height-2), reject_agent_pos=True, reject_fn=distractor_reject_fn, reject_taken_pos=False ) if self.parameters["N"] != "1": self.grid.set(*self.distractor_current_pos, None) # hole in the wall else: self.distractor_current_pos = self.find_loc( size=self.distractor_pos, reject_agent_pos=True, reject_fn=distractor_reject_fn ) self.put_objects_in_env() # NPC put_peer = self.parameters["Peer"] if self.parameters else "N" assert put_peer in ["Y", "N"] color = self._rand_elem(COLOR_NAMES) self.caretaker = Caretaker(color, "Caretaker", self) if put_peer == "Y": self.place_npc() # Randomize the agent's start position and orientation self.place_agent(size=(self.current_width, self.current_height)) # Generate the mission string self.mission = 'lets collaborate' # Dummy beginning string # self.beginning_string = "This is what you hear. \n" self.beginning_string = "Conversation: \n" self.utterance = self.beginning_string # utterance appended at the end of each step self.utterance_history = "" # used for rendering self.full_conversation = self.utterance self.outcome_info = None def place_npc(self): if self.problem in ["Doors"]: self.place_obj( self.caretaker, size=(self.current_width, self.current_height), reject_fn=lambda _, pos: next_to(pos, self.door_current_pos) or next_to(pos, self.distractor_current_pos) ) elif self.problem in ["Levers"]: self.place_obj( self.caretaker, size=(self.current_width, self.current_height), reject_fn=lambda _, pos: next_to(pos, self.door_current_pos) ) else: self.place_obj(self.caretaker, size=(self.current_width, self.current_height), reject_fn=InformationSeekingEnv.is_in_marble_way) self.caretaker.initial_pos = self.caretaker.cur_pos def put_objects_in_env(self, remove_objects=False): assert self.apple_current_pos is not None assert self.switch_current_pos is not None self.doors_block_set = [] self.levers_block_set = [] self.switches_block_set = [] self.boxes_block_set = [] self.generators_block_set = [] self.distractor_door = None self.distractor_lever = None self.distractor_box = None self.distractor_switch = None self.distractor_generator = None # problem: Apples/Boxes/Switches/Generators assert self.problem == self.parameters["Problem"] if self.parameters else "Apples" # move objects (used only in revert), not in gen_grid if remove_objects: # remove apple or box # assert type(self.grid.get(*self.apple_current_pos)) in [Apple, LockableBox] # self.grid.set(*self.apple_current_pos, None) # remove apple (after demo it must be an apple) assert type(self.grid.get(*self.apple_current_pos)) in [Apple] self.grid.set(*self.apple_current_pos, None) if self.problem in ["Doors"]: # assert type(self.grid.get(*self.door_current_pos)) in [Door] self.grid.set(*self.door.cur_pos, None) elif self.problem in ["Levers"]: # assert type(self.grid.get(*self.door_current_pos)) in [Door] self.grid.set(*self.remote_door.cur_pos, None) self.grid.set(*self.lever.cur_pos, None) elif self.problem in ["Switches"]: # remove switch assert type(self.grid.get(*self.switch_current_pos)) in [Switch] self.grid.set(*self.switch.cur_pos, None) elif self.problem in ["Generators", "Marbles", "Marble"]: # remove generator assert type(self.grid.get(*self.generator.cur_pos)) in [AppleGenerator] self.grid.set(*self.generator.cur_pos, None) if self.problem in ["Marbles", "Marble"]: # remove generator assert type(self.grid.get(*self.marble.cur_pos)) in [Marble] self.grid.set(*self.marble.cur_pos, None) if self.marble.tee_uncovered: self.grid.set(*self.marble.tee.cur_pos, None) elif self.problem in ["Apples", "Boxes"]: pass else: raise ValueError("Undefined problem {}".format(self.problem)) # remove distractor if self.problem in ["Boxes", "Switches", "Generators", "Marbles", "Marble", "Doors", "Levers"] and self.parameters["N"] != "1": assert type(self.grid.get(*self.distractor_current_pos)) in [LockableBox, Switch, AppleGenerator, Door, Lever] self.grid.set(*self.distractor_current_pos, None) # apple self.apple = Apple() # Box locked = self.problem == "Switches" self.box = LockableBox( self.box_color, contains=self.apple, is_locked=locked, block_set=self.boxes_block_set ) self.boxes_block_set.append(self.box) # Doors self.door = Door( color=self.door_color, is_locked=False, block_set=self.doors_block_set, ) self.doors_block_set.append(self.door) # Levers self.remote_door = RemoteDoor( color=self.door_color, ) self.lever = Lever( color=self.lever_color, object=self.remote_door, active_steps=None, block_set=self.levers_block_set, ) self.levers_block_set.append(self.lever) # Switch self.switch = Switch( color=self.switch_color, lockable_object=self.box, locker_switch=True, no_turn_off=True, no_light=self.switch_no_light, block_set=self.switches_block_set, ) self.switches_block_set.append(self.switch) # Generator self.generator = AppleGenerator( self.generator_color, block_set=self.generators_block_set, # on_push=lambda: self.put_obj_np(self.apple, self.apple_current_pos) on_push=lambda: self.grid.set(*self.apple_current_pos, self.apple), marble_activation=self.problem in ["Marbles", "Marble"], ) self.generators_block_set.append(self.generator) self.generator_platform = GeneratorPlatform(self.generator_platform_color) self.marble = Marble(self.marble_color, env=self) if self.problem in ["Apples"]: self.put_obj_np(self.apple, self.apple_current_pos) elif self.problem in ["Doors"]: self.put_obj_np(self.apple, self.apple_current_pos) self.put_obj_np(self.door, self.door_current_pos) elif self.problem in ["Levers"]: self.put_obj_np(self.apple, self.apple_current_pos) self.put_obj_np(self.remote_door, self.door_current_pos) self.put_obj_np(self.lever, self.lever_current_pos) elif self.problem in ["Boxes"]: self.put_obj_np(self.box, self.apple_current_pos) elif self.problem in ["Switches"]: self.put_obj_np(self.box, self.apple_current_pos) self.put_obj_np(self.switch, self.switch_current_pos) elif self.problem in ["Generators", "Marbles", "Marble"]: self.put_obj_np(self.generator, self.generator_current_pos) self.put_obj_np(self.generator_platform, self.apple_current_pos) if self.problem in ["Marbles", "Marble"]: self.put_obj_np(self.marble, self.marble_current_pos) else: raise ValueError("Problem {} not defined. ".format(self.problem)) # Distractors if self.problem not in ["Apples"]: N = int(self.parameters["N"]) if N > 1: assert N == 2 if self.problem == "Boxes": assert not locked self.distractor_box = LockableBox( self.distractor_color, is_locked=locked, block_set=self.boxes_block_set, ) self.boxes_block_set.append(self.distractor_box) self.put_obj_np(self.distractor_box, self.distractor_current_pos) elif self.problem == "Doors": self.distractor_door = Door( color=self.distractor_color, is_locked=False, block_set=self.doors_block_set, ) self.doors_block_set.append(self.distractor_door) self.put_obj_np(self.distractor_door, self.distractor_current_pos) elif self.problem == "Levers": self.distractor_lever = Lever( color=self.distractor_color, active_steps=None, block_set=self.levers_block_set, ) self.levers_block_set.append(self.distractor_lever) self.put_obj_np(self.distractor_lever, self.distractor_current_pos) elif self.problem == "Switches": self.distractor_switch = Switch( color=self.distractor_color, locker_switch=True, no_turn_off=True, no_light=self.switch_no_light, block_set=self.switches_block_set, ) self.switches_block_set.append(self.distractor_switch) self.put_obj_np(self.distractor_switch, self.distractor_current_pos) elif self.problem in ["Generators", "Marbles", "Marble"]: self.distractor_generator = AppleGenerator( color=self.distractor_color, block_set=self.generators_block_set, marble_activation=self.problem in ["Marbles", "Marble"], ) self.generators_block_set.append(self.distractor_generator) self.put_obj_np(self.distractor_generator, self.distractor_current_pos) else: raise ValueError("Undefined N for problem {}".format(self.problem)) def reset( self, *args, **kwargs ): # This env must be used inside the parametric env if not kwargs: # The only place when kwargs can empty is during the class construction # reset should be called again before using the env (paramenv does it in its constructor) assert self.parameters is None assert not self.init_done self.init_done = True obs = super().reset() return obs else: assert self.init_done self.parameters = dict(kwargs) assert self.parameters is not None assert len(self.parameters) > 0 obs = super().reset() self.agent_ate_the_apple = False self.agent_opened_the_box = False self.agent_opened_the_door = False self.agent_pulled_the_lever = False self.agent_turned_on_the_switch = False self.agent_pressed_the_generator = False self.agent_pushed_the_marble = False return obs def step(self, action): success = False p_action = action[0] utterance_action = action[1:] apple_had_been_eaten = self.apple.eaten box_had_been_opened = self.box.is_open door_had_been_opened = self.door.is_open lever_had_been_pulled = self.lever.is_on switch_had_been_turned_on = self.switch.is_on generator_had_been_pressed = self.generator.is_pressed marble_had_been_pushed = self.marble.was_pushed # primitive actions _, reward, done, info = super().step(p_action) if self.problem in ["Marbles", "Marble"]: # todo: create stepable objects which are stepped automatically? self.marble.step() # eaten just now by primitive actions of the agent if not self.agent_ate_the_apple: self.agent_ate_the_apple = self.apple.eaten and not apple_had_been_eaten if not self.agent_opened_the_box: self.agent_opened_the_box = self.box.is_open and not box_had_been_opened if not self.agent_opened_the_door: self.agent_opened_the_door = self.door.is_open and not door_had_been_opened if not self.agent_pulled_the_lever: self.agent_pulled_the_lever = self.lever.is_on and not lever_had_been_pulled if not self.agent_turned_on_the_switch: self.agent_turned_on_the_switch = self.switch.is_on and not switch_had_been_turned_on if not self.agent_pressed_the_generator: self.agent_pressed_the_generator = self.generator.is_pressed and not generator_had_been_pressed if not self.agent_pushed_the_marble: self.agent_pushed_the_marble = self.marble.was_pushed and not marble_had_been_pushed # utterances agent_spoke = not all(np.isnan(utterance_action)) if agent_spoke: utterance = self.grammar.construct_utterance(utterance_action) if self.hear_yourself: self.utterance += "YOU: {} \n".format(utterance) self.full_conversation += "YOU: {} \n".format(utterance) else: utterance = None if self.parameters["Peer"] == "Y": reply, npc_info = self.caretaker.step(utterance) else: reply = None npc_info = self.caretaker.create_info( action=None, utterance=None, was_introduced_to=False ) if reply: self.utterance += "{}: {} \n".format(self.caretaker.name, reply) self.full_conversation += "{}: {} \n".format(self.caretaker.name, reply) # aftermath if p_action == self.actions.done: done = True elif self.agent_ate_the_apple: # check that it is the agent who ate it assert self.actions(p_action) == self.actions.toggle assert self.get_cell(*self.front_pos) == self.apple if self.parameters.get("Cue_type", "nan") == "Emulation": # during emulation it can be the NPC who eats the apple, opens the box, and turns on the switch if self.parameters["Scaffolding"] and self.caretaker.apple_unlocked_for_agent: # if the caretaker unlocked the apple the agent gets reward upon eating it reward = self._reward() success = True elif self.problem == "Apples": reward = self._reward() success = True elif self.problem == "Doors" and self.agent_opened_the_door: reward = self._reward() success = True elif self.problem == "Levers" and self.agent_pulled_the_lever: reward = self._reward() success = True elif self.problem == "Boxes" and self.agent_opened_the_box: reward = self._reward() success = True elif self.problem == "Switches" and self.agent_opened_the_box and self.agent_turned_on_the_switch: reward = self._reward() success = True elif self.problem == "Generators" and self.agent_pressed_the_generator: reward = self._reward() success = True elif self.problem in ["Marble"] and self.agent_pushed_the_marble: reward = self._reward() success = True else: reward = self._reward() success = True done = True # discount if self.step_penalty: reward = reward - 0.01 # update obs with NPC movement obs = self.gen_obs(full_obs=self.full_obs) # fill observation with text self.append_existing_utterance_to_history() obs = self.add_utterance_to_observation(obs) self.reset_utterance() if done: if reward > 0: self.outcome_info = "SUCCESS: agent got {} reward \n".format(np.round(reward, 1)) else: self.outcome_info = "FAILURE: agent got {} reward \n".format(reward) # is the npc seen by the agent ag_view_npc = self.relative_coords(*self.caretaker.cur_pos) if ag_view_npc is not None: # in the agent's field of view ag_view_npc_x, ag_view_npc_y = ag_view_npc n_dims = obs['image'].shape[-1] npc_encoding = self.caretaker.encode(n_dims) # is it occluded npc_observed = all(obs['image'][ag_view_npc_x, ag_view_npc_y] == npc_encoding) else: npc_observed = False info = {**info, **{"NPC_"+k: v for k, v in npc_info.items()}} info["NPC_observed"] = npc_observed info["success"] = success assert success == (reward > 0) return obs, reward, done, info def _reward(self): if self.diminished_reward: return super()._reward() else: return 1.0 def render(self, *args, **kwargs): obs = super().render(*args, **kwargs) if args and args[0] == 'human': self.window.clear_text() # erase previous text self.window.set_caption(self.full_conversation) # self.window.ax.set_title("correct color: {}".format(self.box.target_color), loc="left", fontsize=10) if self.outcome_info: color = None if "SUCCESS" in self.outcome_info: color = "lime" elif "FAILURE" in self.outcome_info: color = "red" self.window.add_text(*(0.01, 0.85, self.outcome_info), **{'fontsize': 15, 'color': color, 'weight': "bold"}) self.window.show_img(obs) # re-draw image to add changes to window return obs register( id='SocialAI-InformationSeeking-v0', entry_point='gym_minigrid.social_ai_envs:InformationSeekingEnv' )