zjowowen's picture
init space
079c32c
import itertools
import numpy as np
from copy import deepcopy
class Node():
def __init__(self, label, qpos_ids, qvel_ids, act_ids, body_fn=None, bodies=None, extra_obs=None, tendons=None):
self.label = label
self.qpos_ids = qpos_ids
self.qvel_ids = qvel_ids
self.act_ids = act_ids
self.bodies = bodies
self.extra_obs = {} if extra_obs is None else extra_obs
self.body_fn = body_fn
self.tendons = tendons
pass
def __str__(self):
return self.label
def __repr__(self):
return self.label
class HyperEdge():
def __init__(self, *edges):
self.edges = set(edges)
def __contains__(self, item):
return item in self.edges
def __str__(self):
return "HyperEdge({})".format(self.edges)
def __repr__(self):
return "HyperEdge({})".format(self.edges)
def get_joints_at_kdist(
agent_id,
agent_partitions,
hyperedges,
k=0,
kagents=False,
):
""" Identify all joints at distance <= k from agent agent_id
:param agent_id: id of agent to be considered
:param agent_partitions: list of joint tuples in order of agentids
:param edges: list of tuples (joint1, joint2)
:param k: kth degree
:param kagents: True (observe all joints of an agent if a single one is) or False (individual joint granularity)
:return:
dict with k as key, and list of joints at that distance
"""
assert not kagents, "kagents not implemented!"
agent_joints = agent_partitions[agent_id]
def _adjacent(lst, kagents=False):
# return all sets adjacent to any element in lst
ret = set([])
for l in lst:
ret = ret.union(set(itertools.chain(*[e.edges.difference({l}) for e in hyperedges if l in e])))
return ret
seen = set([])
new = set([])
k_dict = {}
for _k in range(k + 1):
if not _k:
new = set(agent_joints)
else:
print(hyperedges)
new = _adjacent(new) - seen
seen = seen.union(new)
k_dict[_k] = sorted(list(new), key=lambda x: x.label)
return k_dict
def build_obs(env, k_dict, k_categories, global_dict, global_categories, vec_len=None):
"""Given a k_dict from get_joints_at_kdist, extract observation vector.
:param k_dict: k_dict
:param qpos: qpos numpy array
:param qvel: qvel numpy array
:param vec_len: if None no padding, else zero-pad to vec_len
:return:
observation vector
"""
# TODO: This needs to be fixed, it was designed for half-cheetah only!
#if add_global_pos:
# obs_qpos_lst.append(global_qpos)
# obs_qvel_lst.append(global_qvel)
body_set_dict = {}
obs_lst = []
# Add parts attributes
for k in sorted(list(k_dict.keys())):
cats = k_categories[k]
for _t in k_dict[k]:
for c in cats:
if c in _t.extra_obs:
items = _t.extra_obs[c](env).tolist()
obs_lst.extend(items if isinstance(items, list) else [items])
else:
if c in ["qvel", "qpos"]: # this is a "joint position/velocity" item
items = getattr(env.sim.data, c)[getattr(_t, "{}_ids".format(c))]
obs_lst.extend(items if isinstance(items, list) else [items])
elif c in ["qfrc_actuator"]: # this is a "vel position" item
items = getattr(env.sim.data, c)[getattr(_t, "{}_ids".format("qvel"))]
obs_lst.extend(items if isinstance(items, list) else [items])
elif c in ["cvel", "cinert", "cfrc_ext"]: # this is a "body position" item
if _t.bodies is not None:
for b in _t.bodies:
if c not in body_set_dict:
body_set_dict[c] = set()
if b not in body_set_dict[c]:
items = getattr(env.sim.data, c)[b].tolist()
items = getattr(_t, "body_fn", lambda _id, x: x)(b, items)
obs_lst.extend(items if isinstance(items, list) else [items])
body_set_dict[c].add(b)
# Add global attributes
body_set_dict = {}
for c in global_categories:
if c in ["qvel", "qpos"]: # this is a "joint position" item
for j in global_dict.get("joints", []):
items = getattr(env.sim.data, c)[getattr(j, "{}_ids".format(c))]
obs_lst.extend(items if isinstance(items, list) else [items])
else:
for b in global_dict.get("bodies", []):
if c not in body_set_dict:
body_set_dict[c] = set()
if b not in body_set_dict[c]:
obs_lst.extend(getattr(env.sim.data, c)[b].tolist())
body_set_dict[c].add(b)
if vec_len is not None:
pad = np.array((vec_len - len(obs_lst)) * [0])
if len(pad):
return np.concatenate([np.array(obs_lst), pad])
return np.array(obs_lst)
def build_actions(agent_partitions, k_dict):
# Composes agent actions output from networks
# into coherent joint action vector to be sent to the env.
pass
def get_parts_and_edges(label, partitioning):
if label in ["half_cheetah", "HalfCheetah-v2"]:
# define Mujoco graph
bthigh = Node("bthigh", -6, -6, 0)
bshin = Node("bshin", -5, -5, 1)
bfoot = Node("bfoot", -4, -4, 2)
fthigh = Node("fthigh", -3, -3, 3)
fshin = Node("fshin", -2, -2, 4)
ffoot = Node("ffoot", -1, -1, 5)
edges = [
HyperEdge(bfoot, bshin),
HyperEdge(bshin, bthigh),
HyperEdge(bthigh, fthigh),
HyperEdge(fthigh, fshin),
HyperEdge(fshin, ffoot)
]
root_x = Node("root_x", 0, 0, -1, extra_obs={"qpos": lambda env: np.array([])})
root_z = Node("root_z", 1, 1, -1)
root_y = Node("root_y", 2, 2, -1)
globals = {"joints": [root_x, root_y, root_z]}
if partitioning == "2x3":
parts = [(bfoot, bshin, bthigh), (ffoot, fshin, fthigh)]
elif partitioning == "6x1":
parts = [(bfoot, ), (bshin, ), (bthigh, ), (ffoot, ), (fshin, ), (fthigh, )]
else:
raise Exception("UNKNOWN partitioning config: {}".format(partitioning))
return parts, edges, globals
elif label in ["Ant-v2"]:
# define Mujoco graph
torso = 1
front_left_leg = 2
aux_1 = 3
ankle_1 = 4
front_right_leg = 5
aux_2 = 6
ankle_2 = 7
back_leg = 8
aux_3 = 9
ankle_3 = 10
right_back_leg = 11
aux_4 = 12
ankle_4 = 13
hip1 = Node(
"hip1", -8, -8, 2, bodies=[torso, front_left_leg], body_fn=lambda _id, x: np.clip(x, -1, 1).tolist()
) #
ankle1 = Node(
"ankle1",
-7,
-7,
3,
bodies=[front_left_leg, aux_1, ankle_1],
body_fn=lambda _id, x: np.clip(x, -1, 1).tolist()
) #,
hip2 = Node(
"hip2", -6, -6, 4, bodies=[torso, front_right_leg], body_fn=lambda _id, x: np.clip(x, -1, 1).tolist()
) #,
ankle2 = Node(
"ankle2",
-5,
-5,
5,
bodies=[front_right_leg, aux_2, ankle_2],
body_fn=lambda _id, x: np.clip(x, -1, 1).tolist()
) #,
hip3 = Node("hip3", -4, -4, 6, bodies=[torso, back_leg], body_fn=lambda _id, x: np.clip(x, -1, 1).tolist()) #,
ankle3 = Node(
"ankle3", -3, -3, 7, bodies=[back_leg, aux_3, ankle_3], body_fn=lambda _id, x: np.clip(x, -1, 1).tolist()
) #,
hip4 = Node(
"hip4", -2, -2, 0, bodies=[torso, right_back_leg], body_fn=lambda _id, x: np.clip(x, -1, 1).tolist()
) #,
ankle4 = Node(
"ankle4",
-1,
-1,
1,
bodies=[right_back_leg, aux_4, ankle_4],
body_fn=lambda _id, x: np.clip(x, -1, 1).tolist()
) #,
edges = [
HyperEdge(ankle4, hip4),
HyperEdge(ankle1, hip1),
HyperEdge(ankle2, hip2),
HyperEdge(ankle3, hip3),
HyperEdge(hip4, hip1, hip2, hip3),
]
free_joint = Node(
"free",
0,
0,
-1,
extra_obs={
"qpos": lambda env: env.sim.data.qpos[:7],
"qvel": lambda env: env.sim.data.qvel[:6],
"cfrc_ext": lambda env: np.clip(env.sim.data.cfrc_ext[0:1], -1, 1)
}
)
globals = {"joints": [free_joint]}
if partitioning == "2x4": # neighbouring legs together
parts = [(hip1, ankle1, hip2, ankle2), (hip3, ankle3, hip4, ankle4)]
elif partitioning == "2x4d": # diagonal legs together
parts = [(hip1, ankle1, hip3, ankle3), (hip2, ankle2, hip4, ankle4)]
elif partitioning == "4x2":
parts = [(hip1, ankle1), (hip2, ankle2), (hip3, ankle3), (hip4, ankle4)]
else:
raise Exception("UNKNOWN partitioning config: {}".format(partitioning))
return parts, edges, globals
elif label in ["Hopper-v2"]:
# define Mujoco-Graph
thigh_joint = Node(
"thigh_joint",
-3,
-3,
0,
extra_obs={"qvel": lambda env: np.clip(np.array([env.sim.data.qvel[-3]]), -10, 10)}
)
leg_joint = Node(
"leg_joint", -2, -2, 1, extra_obs={"qvel": lambda env: np.clip(np.array([env.sim.data.qvel[-2]]), -10, 10)}
)
foot_joint = Node(
"foot_joint",
-1,
-1,
2,
extra_obs={"qvel": lambda env: np.clip(np.array([env.sim.data.qvel[-1]]), -10, 10)}
)
edges = [HyperEdge(foot_joint, leg_joint), HyperEdge(leg_joint, thigh_joint)]
root_x = Node(
"root_x",
0,
0,
-1,
extra_obs={
"qpos": lambda env: np.array([]),
"qvel": lambda env: np.clip(np.array([env.sim.data.qvel[1]]), -10, 10)
}
)
root_z = Node(
"root_z", 1, 1, -1, extra_obs={"qvel": lambda env: np.clip(np.array([env.sim.data.qvel[1]]), -10, 10)}
)
root_y = Node(
"root_y", 2, 2, -1, extra_obs={"qvel": lambda env: np.clip(np.array([env.sim.data.qvel[2]]), -10, 10)}
)
globals = {"joints": [root_x, root_y, root_z]}
if partitioning == "3x1":
parts = [(thigh_joint, ), (leg_joint, ), (foot_joint, )]
else:
raise Exception("UNKNOWN partitioning config: {}".format(partitioning))
return parts, edges, globals
elif label in ["Humanoid-v2", "HumanoidStandup-v2"]:
# define Mujoco-Graph
abdomen_y = Node("abdomen_y", -16, -16, 0) # act ordering bug in env -- double check!
abdomen_z = Node("abdomen_z", -17, -17, 1)
abdomen_x = Node("abdomen_x", -15, -15, 2)
right_hip_x = Node("right_hip_x", -14, -14, 3)
right_hip_z = Node("right_hip_z", -13, -13, 4)
right_hip_y = Node("right_hip_y", -12, -12, 5)
right_knee = Node("right_knee", -11, -11, 6)
left_hip_x = Node("left_hip_x", -10, -10, 7)
left_hip_z = Node("left_hip_z", -9, -9, 8)
left_hip_y = Node("left_hip_y", -8, -8, 9)
left_knee = Node("left_knee", -7, -7, 10)
right_shoulder1 = Node("right_shoulder1", -6, -6, 11)
right_shoulder2 = Node("right_shoulder2", -5, -5, 12)
right_elbow = Node("right_elbow", -4, -4, 13)
left_shoulder1 = Node("left_shoulder1", -3, -3, 14)
left_shoulder2 = Node("left_shoulder2", -2, -2, 15)
left_elbow = Node("left_elbow", -1, -1, 16)
edges = [
HyperEdge(abdomen_x, abdomen_y, abdomen_z),
HyperEdge(right_hip_x, right_hip_y, right_hip_z),
HyperEdge(left_hip_x, left_hip_y, left_hip_z),
HyperEdge(left_elbow, left_shoulder1, left_shoulder2),
HyperEdge(right_elbow, right_shoulder1, right_shoulder2),
HyperEdge(left_knee, left_hip_x, left_hip_y, left_hip_z),
HyperEdge(right_knee, right_hip_x, right_hip_y, right_hip_z),
HyperEdge(left_shoulder1, left_shoulder2, abdomen_x, abdomen_y, abdomen_z),
HyperEdge(right_shoulder1, right_shoulder2, abdomen_x, abdomen_y, abdomen_z),
HyperEdge(abdomen_x, abdomen_y, abdomen_z, left_hip_x, left_hip_y, left_hip_z),
HyperEdge(abdomen_x, abdomen_y, abdomen_z, right_hip_x, right_hip_y, right_hip_z),
]
globals = {}
if partitioning == "9|8": # 17 in total, so one action is a dummy (to be handled by pymarl)
# isolate upper and lower body
parts = [
(
left_shoulder1, left_shoulder2, abdomen_x, abdomen_y, abdomen_z, right_shoulder1, right_shoulder2,
right_elbow, left_elbow
), (left_hip_x, left_hip_y, left_hip_z, right_hip_x, right_hip_y, right_hip_z, right_knee, left_knee)
]
# TODO: There could be tons of decompositions here
else:
raise Exception("UNKNOWN partitioning config: {}".format(partitioning))
return parts, edges, globals
elif label in ["Reacher-v2"]:
# define Mujoco-Graph
body0 = 1
body1 = 2
fingertip = 3
joint0 = Node(
"joint0",
-4,
-4,
0,
bodies=[body0, body1],
extra_obs={"qpos": (lambda env: np.array([np.sin(env.sim.data.qpos[-4]),
np.cos(env.sim.data.qpos[-4])]))}
)
joint1 = Node(
"joint1",
-3,
-3,
1,
bodies=[body1, fingertip],
extra_obs={
"fingertip_dist": (lambda env: env.get_body_com("fingertip") - env.get_body_com("target")),
"qpos": (lambda env: np.array([np.sin(env.sim.data.qpos[-3]),
np.cos(env.sim.data.qpos[-3])]))
}
)
edges = [HyperEdge(joint0, joint1)]
worldbody = 0
target = 4
target_x = Node("target_x", -2, -2, -1, extra_obs={"qvel": (lambda env: np.array([]))})
target_y = Node("target_y", -1, -1, -1, extra_obs={"qvel": (lambda env: np.array([]))})
globals = {"bodies": [worldbody, target], "joints": [target_x, target_y]}
if partitioning == "2x1":
# isolate upper and lower arms
parts = [(joint0, ), (joint1, )]
# TODO: There could be tons of decompositions here
else:
raise Exception("UNKNOWN partitioning config: {}".format(partitioning))
return parts, edges, globals
elif label in ["Swimmer-v2"]:
# define Mujoco-Graph
joint0 = Node("rot2", -2, -2, 0) # TODO: double-check ids
joint1 = Node("rot3", -1, -1, 1)
edges = [HyperEdge(joint0, joint1)]
globals = {}
if partitioning == "2x1":
# isolate upper and lower body
parts = [(joint0, ), (joint1, )]
# TODO: There could be tons of decompositions here
else:
raise Exception("UNKNOWN partitioning config: {}".format(partitioning))
return parts, edges, globals
elif label in ["Walker2d-v2"]:
# define Mujoco-Graph
thigh_joint = Node("thigh_joint", -6, -6, 0)
leg_joint = Node("leg_joint", -5, -5, 1)
foot_joint = Node("foot_joint", -4, -4, 2)
thigh_left_joint = Node("thigh_left_joint", -3, -3, 3)
leg_left_joint = Node("leg_left_joint", -2, -2, 4)
foot_left_joint = Node("foot_left_joint", -1, -1, 5)
edges = [
HyperEdge(foot_joint, leg_joint),
HyperEdge(leg_joint, thigh_joint),
HyperEdge(foot_left_joint, leg_left_joint),
HyperEdge(leg_left_joint, thigh_left_joint),
HyperEdge(thigh_joint, thigh_left_joint)
]
globals = {}
if partitioning == "2x3":
# isolate upper and lower body
parts = [(foot_joint, leg_joint, thigh_joint), (
foot_left_joint,
leg_left_joint,
thigh_left_joint,
)]
# TODO: There could be tons of decompositions here
else:
raise Exception("UNKNOWN partitioning config: {}".format(partitioning))
return parts, edges, globals
elif label in ["coupled_half_cheetah"]:
# define Mujoco graph
tendon = 0
bthigh = Node(
"bthigh",
-6,
-6,
0,
tendons=[tendon],
extra_obs={
"ten_J": lambda env: env.sim.data.ten_J[tendon],
"ten_length": lambda env: env.sim.data.ten_length,
"ten_velocity": lambda env: env.sim.data.ten_velocity
}
)
bshin = Node("bshin", -5, -5, 1)
bfoot = Node("bfoot", -4, -4, 2)
fthigh = Node("fthigh", -3, -3, 3)
fshin = Node("fshin", -2, -2, 4)
ffoot = Node("ffoot", -1, -1, 5)
bthigh2 = Node(
"bthigh2",
-6,
-6,
0,
tendons=[tendon],
extra_obs={
"ten_J": lambda env: env.sim.data.ten_J[tendon],
"ten_length": lambda env: env.sim.data.ten_length,
"ten_velocity": lambda env: env.sim.data.ten_velocity
}
)
bshin2 = Node("bshin2", -5, -5, 1)
bfoot2 = Node("bfoot2", -4, -4, 2)
fthigh2 = Node("fthigh2", -3, -3, 3)
fshin2 = Node("fshin2", -2, -2, 4)
ffoot2 = Node("ffoot2", -1, -1, 5)
edges = [
HyperEdge(bfoot, bshin),
HyperEdge(bshin, bthigh),
HyperEdge(bthigh, fthigh),
HyperEdge(fthigh, fshin),
HyperEdge(fshin, ffoot),
HyperEdge(bfoot2, bshin2),
HyperEdge(bshin2, bthigh2),
HyperEdge(bthigh2, fthigh2),
HyperEdge(fthigh2, fshin2),
HyperEdge(fshin2, ffoot2)
]
globals = {}
root_x = Node("root_x", 0, 0, -1, extra_obs={"qpos": lambda env: np.array([])})
root_z = Node("root_z", 1, 1, -1)
root_y = Node("root_y", 2, 2, -1)
globals = {"joints": [root_x, root_y, root_z]}
if partitioning == "1p1":
parts = [(bfoot, bshin, bthigh, ffoot, fshin, fthigh), (bfoot2, bshin2, bthigh2, ffoot2, fshin2, fthigh2)]
else:
raise Exception("UNKNOWN partitioning config: {}".format(partitioning))
return parts, edges, globals
elif label in ["manyagent_swimmer"]:
# Generate asset file
try:
n_agents = int(partitioning.split("x")[0])
n_segs_per_agents = int(partitioning.split("x")[1])
n_segs = n_agents * n_segs_per_agents
except Exception as e:
raise Exception("UNKNOWN partitioning config: {}".format(partitioning))
# Note: Default Swimmer corresponds to n_segs = 3
# define Mujoco-Graph
joints = [Node("rot{:d}".format(i), -n_segs + i, -n_segs + i, i) for i in range(0, n_segs)]
edges = [HyperEdge(joints[i], joints[i + 1]) for i in range(n_segs - 1)]
globals = {}
parts = [tuple(joints[i * n_segs_per_agents:(i + 1) * n_segs_per_agents]) for i in range(n_agents)]
return parts, edges, globals
elif label in ["manyagent_ant"]: # TODO: FIX!
# Generate asset file
try:
n_agents = int(partitioning.split("x")[0])
n_segs_per_agents = int(partitioning.split("x")[1])
n_segs = n_agents * n_segs_per_agents
except Exception as e:
raise Exception("UNKNOWN partitioning config: {}".format(partitioning))
# # define Mujoco graph
# torso = 1
# front_left_leg = 2
# aux_1 = 3
# ankle_1 = 4
# right_back_leg = 11
# aux_4 = 12
# ankle_4 = 13
#
# off = -4*(n_segs-1)
# hip1 = Node("hip1", -4-off, -4-off, 2, bodies=[torso, front_left_leg], body_fn=lambda _id, x:np.clip(x, -1, 1).tolist()) #
# ankle1 = Node("ankle1", -3-off, -3-off, 3, bodies=[front_left_leg, aux_1, ankle_1], body_fn=lambda _id, x:np.clip(x, -1, 1).tolist())#,
# hip4 = Node("hip4", -2-off, -2-off, 0, bodies=[torso, right_back_leg], body_fn=lambda _id, x:np.clip(x, -1, 1).tolist())#,
# ankle4 = Node("ankle4", -1-off, -1-off, 1, bodies=[right_back_leg, aux_4, ankle_4], body_fn=lambda _id, x:np.clip(x, -1, 1).tolist())#,
#
# edges = [HyperEdge(ankle4, hip4),
# HyperEdge(ankle1, hip1),
# HyperEdge(hip4, hip1),
# ]
edges = []
joints = []
for si in range(n_segs):
torso = 1 + si * 7
front_right_leg = 2 + si * 7
aux1 = 3 + si * 7
ankle1 = 4 + si * 7
back_leg = 5 + si * 7
aux2 = 6 + si * 7
ankle2 = 7 + si * 7
off = -4 * (n_segs - 1 - si)
hip1n = Node(
"hip1_{:d}".format(si),
-4 - off,
-4 - off,
2 + 4 * si,
bodies=[torso, front_right_leg],
body_fn=lambda _id, x: np.clip(x, -1, 1).tolist()
)
ankle1n = Node(
"ankle1_{:d}".format(si),
-3 - off,
-3 - off,
3 + 4 * si,
bodies=[front_right_leg, aux1, ankle1],
body_fn=lambda _id, x: np.clip(x, -1, 1).tolist()
)
hip2n = Node(
"hip2_{:d}".format(si),
-2 - off,
-2 - off,
0 + 4 * si,
bodies=[torso, back_leg],
body_fn=lambda _id, x: np.clip(x, -1, 1).tolist()
)
ankle2n = Node(
"ankle2_{:d}".format(si),
-1 - off,
-1 - off,
1 + 4 * si,
bodies=[back_leg, aux2, ankle2],
body_fn=lambda _id, x: np.clip(x, -1, 1).tolist()
)
edges += [HyperEdge(ankle1n, hip1n), HyperEdge(ankle2n, hip2n), HyperEdge(hip1n, hip2n)]
if si:
edges += [HyperEdge(hip1m, hip2m, hip1n, hip2n)]
hip1m = deepcopy(hip1n)
hip2m = deepcopy(hip2n)
joints.append([hip1n, ankle1n, hip2n, ankle2n])
free_joint = Node(
"free",
0,
0,
-1,
extra_obs={
"qpos": lambda env: env.sim.data.qpos[:7],
"qvel": lambda env: env.sim.data.qvel[:6],
"cfrc_ext": lambda env: np.clip(env.sim.data.cfrc_ext[0:1], -1, 1)
}
)
globals = {"joints": [free_joint]}
parts = [
[x for sublist in joints[i * n_segs_per_agents:(i + 1) * n_segs_per_agents] for x in sublist]
for i in range(n_agents)
]
return parts, edges, globals