grg's picture
Cleaned old git history
be5548b
raw
history blame contribute delete
No virus
12.9 kB
import time
import numpy as np
from gym_minigrid.minigrid import *
from gym_minigrid.register import register
from gym_minigrid.social_ai_envs.socialaigrammar import SocialAIGrammar, SocialAIActions, SocialAIActionSpace
import time
from collections import deque
class AppleGuardingNPC(NPC):
"""
A simple NPC that knows who is telling the truth
"""
def __init__(self, color, name, env):
super().__init__(color)
self.name = name
self.env = env
self.npc_dir = 1 # NPC initially looks downward
self.npc_dir = np.random.randint(0, 4) # NPC initially looks downward
self.npc_type = 1 # this will be put into the encoding
self.was_introduced_to = False
self.ate_an_apple = False
self.demo_over = False
self.demo_over_and_position_safe = False
self.apple_unlocked_for_agent = False
self.target_obj = self.env.apple
self.waiting_counter = 0
self.wait_steps = 4
assert self.env.grammar.contains_utterance(self.introduction_statement)
def draw_npc_face(self, c):
assert self.npc_type == 1
assert all(COLORS[self.color] == c)
shapes = []
shapes_colors = []
# Draw eyes
shapes.append(point_in_circle(cx=0.70, cy=0.50, r=0.10))
shapes_colors.append(c)
shapes.append(point_in_circle(cx=0.30, cy=0.50, r=0.10))
shapes_colors.append(c)
# Draw mouth
shapes.append(point_in_rect(0.20, 0.80, 0.72, 0.81))
shapes_colors.append(c)
# Draw eyebrows
shapes.append(point_in_triangle((0.15, 0.20),
(0.85, 0.20),
(0.50, 0.35)))
shapes_colors.append(c)
shapes.append(point_in_triangle((0.30, 0.20),
(0.70, 0.20),
(0.5, 0.35)))
shapes_colors.append((0,0,0))
return shapes, shapes_colors
def can_see_pos(self, obj_pos):
# is the npc seen by the agent
npc_view_obj = self.relative_coords(*obj_pos)
grid, vis_mask = self.gen_obs_grid()
if npc_view_obj is not None:
# in the agent's field of view
ag_view_npc_x, ag_view_npc_y = npc_view_obj
# is it occluded
object_observed = vis_mask[ag_view_npc_x, ag_view_npc_y]
else:
object_observed = False
return object_observed, grid, vis_mask
def step(self, utterance):
reply, info = super().step()
if self.env.hidden_npc:
return reply, info
# reply, action = self.handle_introduction(utterance) # revert this?
reply, action = None, None
NPC_movement = self.env.parameters.get("NPC_movement", "Rotating")
if self.waiting_counter >= self.wait_steps:
self.waiting_counter = 0
if NPC_movement == "Rotating":
action = random.choice([self.rotate_left, self.rotate_right])
elif NPC_movement == "Walking":
action = random.choice([
random.choice([
self.rotate_left, # 25 %
self.rotate_right # 25 %
]),
self.go_forward # 50%
])
else:
raise DeprecationWarning(f"Undefined movement option {NPC_movement}")
else:
self.waiting_counter += 1
if action is not None:
action()
info = {
"prim_action": action.__name__ if action is not None else "no_op",
"utterance": reply or "no_op",
"was_introduced_to": self.was_introduced_to
}
assert (reply or "no_op") in self.list_of_possible_utterances
return reply, info
class AppleStealingEnv(MultiModalMiniGridEnv):
"""
Environment in which the agent is instructed to go to a given object
named using an English text string
"""
def __init__(
self,
size=10,
diminished_reward=True,
step_penalty=False,
knowledgeable=False,
max_steps=80,
hidden_npc=False,
switch_no_light=False,
reward_diminish_factor=0.1,
see_through_walls=False,
egocentric_observation=True,
tagged_apple=False,
):
assert size >= 5
self.empty_symbol = "NA \n"
self.diminished_reward = diminished_reward
self.step_penalty = step_penalty
self.knowledgeable = knowledgeable
self.hidden_npc = hidden_npc
self.hear_yourself = False
self.switch_no_light = switch_no_light
self.grammar = SocialAIGrammar()
self.init_done = False
# parameters - to be set in reset
self.parameters = None
# encoding size should be 5
self.add_npc_direction = True
self.add_npc_point_direction = True
self.add_npc_last_prim_action = True
self.reward_diminish_factor = reward_diminish_factor
self.egocentric_observation = egocentric_observation
self.encoding_size = 3 + 2*bool(not self.egocentric_observation) + bool(self.add_npc_direction) + bool(self.add_npc_point_direction) + bool(self.add_npc_last_prim_action)
super().__init__(
grid_size=size,
max_steps=max_steps,
# Set this to True for maximum speed
see_through_walls=see_through_walls,
actions=SocialAIActions, # primitive actions
action_space=SocialAIActionSpace,
add_npc_direction=self.add_npc_direction,
add_npc_point_direction=self.add_npc_point_direction,
add_npc_last_prim_action=self.add_npc_last_prim_action,
reward_diminish_factor=self.reward_diminish_factor,
)
self.all_npc_utterance_actions = AppleGuardingNPC.get_list_of_possible_utterances()
self.prim_actions_dict = SocialAINPCActionsDict
self.tagged_apple = tagged_apple
def _gen_grid(self, width_, height_):
# Create the grid
self.grid = Grid(width_, height_, nb_obj_dims=self.encoding_size)
# new
self.current_width = self._rand_int(7, width_+1)
self.current_height = self._rand_int(7, height_+1)
# print("Room size: {}x{}".format(self.current_width, self.current_height))
self.wall_x = self.current_width-1
self.wall_y = self.current_height-1
self.version = self.parameters["Version"] if self.parameters else "Asocial"
# Generate the surrounding walls
self.grid.wall_rect(0, 0, self.current_width, self.current_height)
self.add_obstacles()
# apple
self.apple_pos = (self.current_width, self.current_height)
# find the position for the apple/box/generator_platform
self.apple_current_pos = self.find_loc(size=self.apple_pos, reject_agent_pos=True, reject_taken_pos=True)
assert all(self.apple_current_pos < np.array([self.current_width-1, self.current_height-1]))
self.apple = Apple()
self.put_obj_np(self.apple, self.apple_current_pos)
# NPC
color = self._rand_elem(COLOR_NAMES)
self.caretaker = AppleGuardingNPC(color, "Peer", self)
if self.version == "Social":
self.place_obj(self.caretaker, size=(self.current_width, self.current_height))
# Randomize the agent's start position and orientation
self.place_agent(size=(self.current_width, self.current_height))
# Generate the mission string
self.mission = 'undefined'
# Dummy beginning string
# self.beginning_string = "This is what you hear. \n"
self.beginning_string = "Conversation: \n"
self.utterance = self.beginning_string
# utterance appended at the end of each step
self.utterance_history = ""
# used for rendering
self.full_conversation = self.utterance
self.outcome_info = None
def reset(
self, *args, **kwargs
):
# This env must be used inside the parametric env
if not kwargs:
# The only place when kwargs can empty is during the class construction
# reset should be called again before using the env (paramenv does it in its constructor)
assert self.parameters is None
assert not self.init_done
self.init_done = True
obs = super().reset()
return obs
else:
assert self.init_done
self.parameters = dict(kwargs)
assert self.parameters is not None
assert len(self.parameters) > 0
obs = super().reset()
self.agent_ate_the_apple = False
return obs
def step(self, action):
success = False
p_action = action[0]
utterance_action = action[1:]
apple_had_been_eaten = self.apple.eaten
if self.version == "Social":
agent_seen_by_npc, _, _ = self.caretaker.can_see_pos(self.agent_pos)
else:
agent_seen_by_npc = False
# primitive actions
_, reward, done, info = super().step(p_action)
if not self.agent_ate_the_apple:
self.agent_ate_the_apple = self.apple.eaten and not apple_had_been_eaten
# utterances
agent_spoke = not all(np.isnan(utterance_action))
if agent_spoke:
utterance = self.grammar.construct_utterance(utterance_action)
if self.hear_yourself:
self.utterance += "YOU: {} \n".format(utterance)
self.full_conversation += "YOU: {} \n".format(utterance)
else:
utterance = None
if self.version == "Social":
reply, npc_info = self.caretaker.step(utterance)
if reply:
self.utterance += "{}: {} \n".format(self.caretaker.name, reply)
self.full_conversation += "{}: {} \n".format(self.caretaker.name, reply)
else:
npc_info = {
"prim_action": "no_op",
"utterance": "no_op",
"was_introduced_to": False,
}
# aftermath
if p_action == self.actions.done:
done = True
elif self.agent_ate_the_apple:
# check that it is the agent who ate it
assert self.actions(p_action) == self.actions.toggle
assert self.get_cell(*self.front_pos) == self.apple
if agent_seen_by_npc:
reward = 0
success = False
else:
reward = self._reward()
success = True
done = True
# check that it is the agent who ate it
assert self.actions(p_action) == self.actions.toggle
assert self.get_cell(*self.front_pos) == self.apple
# discount
if self.step_penalty:
reward = reward - 0.01
# update obs with NPC movement
obs = self.gen_obs(full_obs=self.full_obs)
# fill observation with text
self.append_existing_utterance_to_history()
obs = self.add_utterance_to_observation(obs)
self.reset_utterance()
# for rendering
if done:
if reward > 0:
self.outcome_info = "SUCCESS: agent got {} reward \n".format(np.round(reward, 1))
else:
self.outcome_info = "FAILURE: agent got {} reward \n".format(reward)
if self.version == "Social":
# is the npc seen by the agent
ag_view_npc = self.relative_coords(*self.caretaker.cur_pos)
if ag_view_npc is not None:
# in the agent's field of view
ag_view_npc_x, ag_view_npc_y = ag_view_npc
n_dims = obs['image'].shape[-1]
npc_encoding = self.caretaker.encode(n_dims)
# is it occluded
npc_observed = all(obs['image'][ag_view_npc_x, ag_view_npc_y] == npc_encoding)
else:
npc_observed = False
else:
npc_observed = False
info = {**info, **{"NPC_"+k: v for k, v in npc_info.items()}}
info["NPC_observed"] = npc_observed
info["success"] = success
assert success == (reward > 0)
return obs, reward, done, info
def _reward(self):
if self.diminished_reward:
return super()._reward()
else:
return 1.0
def render(self, *args, **kwargs):
obs = super().render(*args, show_dialogue=False, **kwargs)
return obs
register(
id='SocialAI-AppleStealingEnv-v0',
entry_point='gym_minigrid.social_ai_envs:AppleStealingEnv'
)