|
from autogen import GroupChatManager |
|
import json |
|
import re |
|
import networkx as nx |
|
|
|
from agents import create_agents |
|
from agents import is_termination_msg, gpt4_config |
|
from corrector_agents import get_corrector_agents |
|
from refiner_agents import get_refiner_agents |
|
|
|
from chats import GroupChat, ChatWithEngineer, LayoutCorrectorGroupChat, ObjectDeletionGroupChat, LayoutRefinerGroupChat |
|
|
|
from utils import get_room_priors, extract_list_from_json |
|
from utils import preprocess_scene_graph, build_graph, remove_unnecessary_edges, handle_under_prepositions, get_conflicts, get_size_conflicts, get_object_from_scene_graph |
|
from utils import get_object_from_scene_graph, get_rotation, get_cluster_objects, clean_and_extract_edges |
|
from utils import get_cluster_size |
|
from utils import get_possible_positions, is_point_bbox, calculate_overlap, get_topological_ordering, place_object, get_depth, get_visualization |
|
|
|
class Designer: |
|
def __init__(self, no_of_objects, user_input, room_dimensions): |
|
self.no_of_objects = no_of_objects |
|
self.user_input = user_input |
|
self.room_dimensions = room_dimensions |
|
self.room_priors = get_room_priors(self.room_dimensions) |
|
self.scene_graph = None |
|
|
|
def create_initial_design(self): |
|
user_proxy, json_schema_debugger, interior_designer, interior_architect, engineer = create_agents(self.no_of_objects) |
|
|
|
groupchat = GroupChat( |
|
agents=[user_proxy, interior_designer, interior_architect], |
|
messages=[], |
|
max_round=3 |
|
) |
|
|
|
chat_with_engineer = ChatWithEngineer( |
|
agents =[user_proxy, engineer, json_schema_debugger], |
|
messages=[], |
|
max_round=15 |
|
) |
|
|
|
manager = GroupChatManager(groupchat=groupchat, llm_config=gpt4_config, is_termination_msg=is_termination_msg) |
|
user_proxy.initiate_chat( |
|
manager, |
|
message=f""" |
|
The room has the size {self.room_dimensions[0]}m x {self.room_dimensions[1]}m x {self.room_dimensions[2]}m |
|
User Preference (in triple backquotes): |
|
``` |
|
{self.user_input} |
|
``` |
|
Room layout elements in the room (in triple backquotes): |
|
``` |
|
['south_wall', 'north_wall', 'west_wall', 'east_wall', 'middle of the room', 'ceiling'] |
|
``` |
|
json |
|
""", |
|
) |
|
|
|
|
|
designer_response = json.loads(groupchat.messages[-2]["content"]) |
|
architect_response = json.loads(groupchat.messages[-1]["content"]) |
|
|
|
blocks_designer, blocks_architect = extract_list_from_json(designer_response), extract_list_from_json(architect_response) |
|
if len(blocks_designer) != len(blocks_architect): |
|
print("Lengths: ", len(blocks_designer), len(blocks_architect)) |
|
raise ValueError("The number of blocks from the designer and architect should be the same! Please generate again.") |
|
|
|
json_data = None |
|
|
|
for d_block, a_block in zip(blocks_designer, blocks_architect): |
|
engineer.reset(), json_schema_debugger.reset() |
|
prompt = str(d_block) + "\n" + str(a_block) |
|
|
|
object_ids = [item["new_object_id"] for item in json_data["objects_in_room"]] if json_data is not None else [] |
|
|
|
manager = GroupChatManager(groupchat=chat_with_engineer, |
|
llm_config=gpt4_config, |
|
human_input_mode="NEVER", |
|
is_termination_msg=is_termination_msg) |
|
user_proxy.initiate_chat( |
|
manager, |
|
message=f""" |
|
Room layout elements in the room (in triple backquotes): |
|
``` |
|
['south_wall', 'north_wall', 'west_wall', 'east_wall', 'middle of the floor', 'ceiling'] |
|
``` |
|
Array of objects in the room (in triple backquotes): |
|
``` |
|
{object_ids} |
|
``` |
|
Objects to be placed in the room (in triple backquotes): |
|
``` |
|
{prompt} |
|
``` |
|
json |
|
""", |
|
) |
|
if json_data is None: |
|
json_data = json.loads(chat_with_engineer.messages[-2]["content"]) |
|
else: |
|
json_data["objects_in_room"] += json.loads(chat_with_engineer.messages[-2]["content"])["objects_in_room"] |
|
|
|
self.scene_graph = json_data |
|
|
|
|
|
def correct_design(self, verbose=False, auto_prune=False): |
|
|
|
scene_graph = preprocess_scene_graph(self.scene_graph["objects_in_room"]) |
|
G = build_graph(scene_graph) |
|
G = remove_unnecessary_edges(G) |
|
G, scene_graph = handle_under_prepositions(G, scene_graph) |
|
|
|
conflicts = get_conflicts(G, scene_graph) |
|
|
|
if verbose: |
|
print("-------------------CONFLICTS-------------------") |
|
for conflict in conflicts: |
|
print(conflict) |
|
print("\n\n") |
|
|
|
user_proxy, spatial_corrector_agent, json_schema_debugger, object_deletion_agent = get_corrector_agents() |
|
|
|
while len(conflicts) > 0: |
|
spatial_corrector_agent.reset(), json_schema_debugger.reset() |
|
groupchat = LayoutCorrectorGroupChat( |
|
agents =[user_proxy, spatial_corrector_agent, json_schema_debugger], |
|
messages=[], |
|
max_round=15 |
|
) |
|
manager = GroupChatManager(groupchat=groupchat, llm_config=gpt4_config, is_termination_msg=is_termination_msg) |
|
user_proxy.initiate_chat( |
|
manager, |
|
message=f""" |
|
{conflicts[0]} |
|
""", |
|
) |
|
correction = groupchat.messages[-2] |
|
pattern = r'```json\s*([^`]+)\s*```' |
|
match = re.search(pattern, correction["content"], re.DOTALL).group(1) |
|
correction_json = json.loads(match) |
|
corr_obj = get_object_from_scene_graph(correction_json["corrected_object"]["new_object_id"], scene_graph) |
|
corr_obj["is_on_the_floor"] = correction_json["corrected_object"]["is_on_the_floor"] |
|
corr_obj["facing"] = correction_json["corrected_object"]["facing"] |
|
corr_obj["placement"] = correction_json["corrected_object"]["placement"] |
|
G = build_graph(scene_graph) |
|
conflicts = get_conflicts(G, scene_graph) |
|
|
|
if auto_prune: |
|
size_conflicts = get_size_conflicts(G, scene_graph, self.user_input, self.room_priors, verbose) |
|
|
|
if verbose: |
|
print("-------------------SIZE CONFLICTS-------------------") |
|
for conflict in size_conflicts: |
|
print(conflict) |
|
print("\n\n") |
|
|
|
while len(size_conflicts) > 0: |
|
object_deletion_agent.reset() |
|
groupchat = ObjectDeletionGroupChat( |
|
agents =[user_proxy, object_deletion_agent], |
|
messages=[], |
|
max_round=2 |
|
) |
|
manager = GroupChatManager(groupchat=groupchat, llm_config=gpt4_config, is_termination_msg=is_termination_msg) |
|
user_proxy.initiate_chat( |
|
manager, |
|
message=f""" |
|
{size_conflicts[0]} |
|
""", |
|
) |
|
correction = groupchat.messages[-1] |
|
correction_json = json.loads(correction["content"]) |
|
object_to_delete = correction_json["object_to_delete"] |
|
descendants = nx.descendants(G, object_to_delete) |
|
objs_to_delete = descendants.union({object_to_delete}) |
|
print("Objs to Delete: ", objs_to_delete) |
|
scene_graph = [x for x in scene_graph if x["new_object_id"] not in objs_to_delete] |
|
for obj in objs_to_delete: |
|
G.remove_node(obj) |
|
|
|
size_conflicts = get_size_conflicts(G, scene_graph, self.user_input, self.room_priors, verbose) |
|
self.scene_graph["objects_in_room"] = scene_graph |
|
|
|
|
|
def refine_design(self, verbose=False): |
|
|
|
cluster_dict = get_cluster_objects(self.scene_graph["objects_in_room"]) |
|
|
|
|
|
inputs = [] |
|
for key, value in cluster_dict.items(): |
|
key = list(key) |
|
if len(key[0]) == 2: |
|
parent_id = key[0][0][1] |
|
prep = key[0][1][1] |
|
elif len(key[0]) == 3: |
|
parent_id = key[0][1][1] |
|
prep = key[0][2][1] |
|
objs = value |
|
|
|
inputs.append((parent_id, prep, objs)) |
|
|
|
if verbose: |
|
if inputs == []: |
|
print("No clusters found") |
|
for parent_id, prep, objs in inputs: |
|
print(f"Parent Object : {parent_id}") |
|
print(f"Children Objects : {objs}") |
|
print(f"The children objects are '{prep}' the parent object") |
|
print("\n") |
|
|
|
|
|
for parent_id, prep, obj_names in inputs: |
|
objs = [get_object_from_scene_graph(obj, self.scene_graph["objects_in_room"]) for obj in obj_names] |
|
objs_rot = [get_rotation(obj, self.scene_graph["objects_in_room"]) for obj in objs] |
|
|
|
parent_obj = get_object_from_scene_graph(parent_id, self.scene_graph["objects_in_room"]) |
|
if parent_obj is None: |
|
parent_obj = [prior for prior in self.room_priors if prior.get("new_object_id") == parent_id][0] |
|
parent_obj_rot = get_rotation(parent_obj, self.scene_graph["objects_in_room"]) |
|
|
|
rot_diffs = [obj_rot - parent_obj_rot for obj_rot in objs_rot] |
|
direction_check = lambda diff, prep: (diff % 180 == 0 and prep in ["left of", "right of"]) or (diff % 180 != 0 and prep in ["in front", "behind"]) or (diff % 180 != 0 and prep == "on") |
|
possibilities_str = "Constraints:\n" + '\n'.join(["\t" + f"Place objects {'`behind` or `in front`' if direction_check(diff, prep) else '`left of` or `right of`'} of {name}!" for name, diff in zip(obj_names, rot_diffs)]) |
|
|
|
user_proxy, layout_refiner, json_schema_debugger = get_refiner_agents() |
|
|
|
layout_refiner.reset(), json_schema_debugger.reset() |
|
groupchat = LayoutRefinerGroupChat( |
|
agents =[user_proxy, layout_refiner, json_schema_debugger], |
|
messages=[], |
|
max_round=15 |
|
) |
|
manager = GroupChatManager(groupchat=groupchat, llm_config=gpt4_config, is_termination_msg=is_termination_msg) |
|
user_proxy.initiate_chat( |
|
manager, |
|
message=f""" |
|
Parent Object : {parent_id} |
|
Children Objects : {obj_names} |
|
|
|
{possibilities_str} |
|
|
|
The children objects are '{prep}' the parent object |
|
""", |
|
) |
|
|
|
new_relationships = json.loads(groupchat.messages[-2]["content"]) |
|
if "items" in new_relationships["children_objects"]: |
|
new_relationships = {"children_objects" : new_relationships["children_objects"]["items"]} |
|
|
|
invalid_name_ids = [] |
|
for child in new_relationships["children_objects"]: |
|
for other_child in child["placement"]["children_objects"]: |
|
other_child_rot = get_rotation(get_object_from_scene_graph(other_child["name_id"], self.scene_graph["objects_in_room"]), self.scene_graph["objects_in_room"]) |
|
if direction_check(other_child_rot - parent_obj_rot, prep) and other_child["preposition"] not in ["in front", "behind"]: |
|
invalid_name_ids.append(child["name_id"]) |
|
elif not direction_check(other_child_rot - parent_obj_rot, prep) and other_child["preposition"] not in ["left of", "right of"]: |
|
invalid_name_ids.append(child["name_id"]) |
|
|
|
if verbose: |
|
print("Invalid name IDs: ", invalid_name_ids) |
|
new_relationships["children_objects"] = [child for child in new_relationships["children_objects"] if child["name_id"] not in invalid_name_ids] |
|
|
|
if len(new_relationships["children_objects"]) == 0: |
|
continue |
|
|
|
edges, edges_to_flip = clean_and_extract_edges(new_relationships, parent_id, verbose=verbose) |
|
|
|
prep_correspondences ={ |
|
"left of" : "right of", |
|
"right of" : "left of", |
|
"in front" : "behind", |
|
"behind" : "in front", |
|
} |
|
|
|
|
|
for obj in new_relationships["children_objects"]: |
|
name_id = obj["name_id"] |
|
rel = obj["placement"]["children_objects"] |
|
for r in rel: |
|
if (name_id, r["name_id"]) in edges: |
|
to_flip = edges_to_flip[(name_id, r["name_id"])] |
|
if to_flip: |
|
corr_obj = get_object_from_scene_graph(r["name_id"], self.scene_graph["objects_in_room"]) |
|
corr_prep = prep_correspondences[r["preposition"]] |
|
corr_obj["placement"]["objects_in_room"].append({"object_id" : name_id, "preposition" : corr_prep, "is_adjacent" : r["is_adjacent"]}) |
|
else: |
|
corr_obj = get_object_from_scene_graph(name_id, self.scene_graph["objects_in_room"]) |
|
corr_obj["placement"]["objects_in_room"].append({"object_id" : r["name_id"], "preposition" : r["preposition"], "is_adjacent" : r["is_adjacent"]}) |
|
|
|
def create_object_clusters(self, verbose=False): |
|
|
|
for obj in self.scene_graph["objects_in_room"]: |
|
rot = get_rotation(obj, self.scene_graph["objects_in_room"]) |
|
obj["rotation"] = {"z_angle" : rot} |
|
|
|
ROOM_LAYOUT_ELEMENTS = ["south_wall", "north_wall", "west_wall", "east_wall", "ceiling", "middle of the room"] |
|
|
|
G = build_graph(self.scene_graph["objects_in_room"]) |
|
nodes = G.nodes() |
|
|
|
|
|
for node in nodes: |
|
if node not in ROOM_LAYOUT_ELEMENTS: |
|
cluster_size, children_objs = get_cluster_size(node, G, self.scene_graph["objects_in_room"]) |
|
if verbose: |
|
print("Node: ", node) |
|
print("Cluster size: ", cluster_size) |
|
print("Children: ", children_objs) |
|
print("\n") |
|
node_obj = get_object_from_scene_graph(node, self.scene_graph["objects_in_room"]) |
|
cluster_size = {"x_neg" : cluster_size["left of"], "x_pos" : cluster_size["right of"], "y_neg" : cluster_size["behind"], "y_pos" : cluster_size["in front"]} |
|
node_obj["cluster"] = {"constraint_area" : cluster_size} |
|
|
|
def backtrack(self, verbose=False): |
|
self.scene_graph = self.scene_graph["objects_in_room"] + self.room_priors |
|
prior_ids = ["south_wall", "north_wall", "east_wall", "west_wall", "ceiling", "middle of the room"] |
|
|
|
point_bbox = dict.fromkeys([item["new_object_id"] for item in self.scene_graph], False) |
|
|
|
|
|
for item in self.scene_graph: |
|
if item["new_object_id"] in prior_ids: |
|
continue |
|
possible_pos = get_possible_positions(item["new_object_id"], self.scene_graph, self.room_dimensions) |
|
|
|
overlap = None |
|
if len(possible_pos) == 1: |
|
overlap = possible_pos[0] |
|
elif len(possible_pos) > 1: |
|
overlap = possible_pos[0] |
|
for pos in possible_pos[1:]: |
|
overlap = calculate_overlap(overlap, pos) |
|
|
|
if overlap is not None and is_point_bbox(overlap) and len(possible_pos) > 0: |
|
item["position"] = {"x" : overlap[0], "y" : overlap[2], "z" : overlap[4]} |
|
point_bbox[item["new_object_id"]] = True |
|
|
|
scene_graph_wo_layout = [item for item in self.scene_graph if item["new_object_id"] not in prior_ids] |
|
object_ids = [item["new_object_id"] for item in scene_graph_wo_layout] |
|
|
|
depth_scene_graph = get_depth(scene_graph_wo_layout) |
|
max_depth = max(depth_scene_graph.values()) |
|
|
|
if verbose: |
|
print("Max depth: ", max_depth) |
|
print("Depth scene graph: ", depth_scene_graph) |
|
print("Point BBox: ", [key for key, value in point_bbox.items() if value]) |
|
|
|
get_visualization(self.scene_graph, self.room_priors) |
|
for obj in scene_graph_wo_layout: |
|
if "position" in obj.keys(): |
|
print(obj["new_object_id"], obj["position"]) |
|
|
|
topological_order = get_topological_ordering(scene_graph_wo_layout) |
|
topological_order = [item for item in topological_order if item not in prior_ids] |
|
if verbose: |
|
print("Topological order: ", topological_order) |
|
|
|
d = 1 |
|
while d <= max_depth: |
|
if verbose: |
|
print("Depth: ", d) |
|
error_flag = False |
|
|
|
|
|
nodes = [node for node in topological_order if depth_scene_graph[node] == d] |
|
if verbose: |
|
print(f"Nodes at depth {d}:", nodes) |
|
|
|
errors = {} |
|
for node in nodes: |
|
if point_bbox[node]: |
|
continue |
|
|
|
|
|
obj = next(item for item in scene_graph_wo_layout if item["new_object_id"] == node) |
|
errors = place_object(obj, self.scene_graph, self.room_dimensions, errors={}, verbose=verbose) |
|
if verbose: |
|
print(f"Errors for {obj['new_object_id']}:", errors) |
|
|
|
if errors: |
|
if d > 1: |
|
d -= 1 |
|
if verbose: |
|
print("Reducing depth to: ", d) |
|
|
|
error_flag = True |
|
|
|
for del_item in scene_graph_wo_layout: |
|
if depth_scene_graph[del_item["new_object_id"]] >= d: |
|
if "position" in del_item.keys() and not point_bbox[del_item["new_object_id"]]: |
|
if verbose: |
|
print("Deleting position for: ", del_item["new_object_id"]) |
|
del del_item["position"] |
|
errors = {} |
|
break |
|
|
|
if not error_flag: |
|
d += 1 |
|
if verbose: |
|
get_visualization(self.scene_graph, self.room_priors) |
|
|
|
def to_json(self, filename="scene_graph.json"): |
|
|
|
with open(filename, "w") as file: |
|
json.dump(self.scene_graph, file, indent=4) |
|
|