code / generate.py
MSheng-Lee's picture
Upload folder using huggingface_hub
f20b100 verified
from autogen import GroupChatManager
import json
import re, os
import networkx as nx
from agents import create_parse_agents, create_graph_agents, language_summary_agents, calculation_summary_agents
from agents import is_termination_msg, is_termination_require, gpt4_config
from corrector_agents import get_corrector_agents
from refiner_agents import get_refiner_agents
from chats import InputParserGroupChat, RequirementGroupChat, LanguageGroupChat, CalculationGroupChat, SceneGraphGroupChat, SchemaGroupChat, LayoutCorrectorGroupChat, ObjectDeletionGroupChat, LayoutRefinerGroupChat
from utils import get_room_priors, extract_list_from_json
from utils import preprocess_scene_graph, build_graph, remove_unnecessary_edges, handle_under_prepositions, get_conflicts, get_size_conflicts, get_object_from_scene_graph
from utils import get_object_from_scene_graph, get_rotation, get_cluster_objects, clean_and_extract_edges
from utils import get_cluster_size
from utils import get_possible_positions, is_point_bbox, calculate_overlap, get_topological_ordering, place_object, get_depth, get_visualization
import openshape
import torch
import numpy as np
import transformers
import threading
import multiprocessing
import sys, shutil
import pandas as pd
from torch.nn import functional as F
import objaverse
import trimesh
import certifi
import ssl
ssl._create_default_https_context = ssl._create_unverified_context
os.environ['SSL_CERT_FILE'] = certifi.where()
class Generator:
def __init__(self, layout_elements=['south_wall', 'north_wall', 'west_wall', 'east_wall', 'middle of the room', 'ceiling'], room_dimensions=[5.0, 5.0, 3.0], result_file="./results/layout_w_cot.json"):
self.room_dimensions = room_dimensions
self.room_priors = get_room_priors(self.room_dimensions)
self.layout_elements = list(layout_elements)
self.result_file = result_file
self.scene_graph = None
self.cot_info = {}
os.environ["TOKENIZERS_PARALLELISM"] = "false"
meta = json.load(
open('./embeddings/objaverse_meta.json')
)
self.meta = {x['u']: x for x in meta['entries']}
deser = torch.load('./embeddings/objaverse.pt')
self.us = deser['us']
self.feats = deser['feats']
local_assets = pd.read_excel("./assets/copy.xlsx", skiprows=2)
captions = local_assets["caption_clip"].tolist()
file_paths = []
bbx_values = []
for index, row in local_assets.iterrows():
model_name = row['name_en']
model_path = os.path.join("./assets/lvm_2032fbx", f"{model_name}.fbx")
file_paths.append(model_path)
bbx_values.append(row['bbx'])
self.caption_to_file = [
{
"caption": caption,
"file_path": path,
"bbx": bbx
}
for caption, path, bbx in zip(captions, file_paths, bbx_values)
]
self.clip_model, self.clip_prep = transformers.CLIPModel.from_pretrained(
"./ckpts/CLIP-ViT-bigG-14-laion2B-39B-b160k",
low_cpu_mem_usage=True, torch_dtype=torch.float16,
offload_state_dict=True,
), transformers.CLIPProcessor.from_pretrained("./ckpts/CLIP-ViT-bigG-14-laion2B-39B-b160k")
self.local_embeddings = torch.load("./embeddings/local.pt")
def parse_input(self, user_input, max_number_of_objects):
self.user_input = user_input
self.max_number_of_objects = max_number_of_objects
user_proxy, requirements_analyzer, substructure_analyzer, substructure_analyzer_checker, interior_designer, designer_checker = create_parse_agents(self.max_number_of_objects)
init_groupchat = RequirementGroupChat(
agents=[user_proxy, requirements_analyzer, substructure_analyzer, interior_designer, designer_checker],
messages=[],
max_round=16
)
manager = GroupChatManager(groupchat=init_groupchat, llm_config=gpt4_config, is_termination_msg=is_termination_require)
user_proxy.initiate_chat(
manager,
message=f"""
The room has the size {self.room_dimensions[0]}m x {self.room_dimensions[1]}m x {self.room_dimensions[2]}m
User Input (in triple backquotes):
```
{self.user_input}
```
Room layout elements in the room (in triple backquotes):
```
['south_wall', 'north_wall', 'west_wall', 'east_wall', 'middle of the room', 'ceiling']
```
json
""",
)
# correction = init_groupchat.messages[-2]
# pattern = r'```json\s*([^`]+)\s*```'
# match = re.search(pattern, correction["content"], re.DOTALL).group(1)
# self.designer_response = json.loads(match)
self.designer_response = json.loads(init_groupchat.messages[-2]["content"])
self.cot_info["parse_cot"] = self.designer_response["chain_of_thought"]
# reason_designer, blocks_designer = extract_list_from_json(designer_response, 'Reason'), extract_list_from_json(designer_response, 'Objects')
# self.reason_designer = reason_designer
def retrieve_local_assets(self):
print("Locking...")
sys.clip_move_lock = threading.Lock()
print("Locked.")
if torch.cuda.is_available():
with sys.clip_move_lock:
self.clip_model.cuda()
torch.set_grad_enabled(False)
def preprocess(input_string):
wo_numericals = re.sub(r'\d', '', input_string)
output = wo_numericals.replace("_", " ")
return output
def retrieve_local(query_embedding, top=1, sim_th=0.5):
query_embedding = F.normalize(query_embedding.detach().cpu(), dim=-1).squeeze()
sims = []
for embedding in torch.split(self.local_embeddings, 10240):
sims.append(query_embedding @ F.normalize(embedding.float(), dim=-1).T)
sims = torch.cat(sims)
sims, indices = torch.sort(sims, descending=True)
results = []
for i, sim in zip(indices, sims):
if sim > sim_th:
results.append({
"caption": self.caption_to_file[i]["caption"],
"file_path": self.caption_to_file[i]["file_path"],
"bbx": self.caption_to_file[i]["bbx"],
"sim": sim.item()
})
if len(results) >= top:
break
return results
def retrieve(embedding, top=1, sim_th=0.1, filter_fn=None):
sims = []
embedding = F.normalize(embedding.detach().cpu(), dim=-1).squeeze()
for chunk in torch.split(self.feats, 10240):
sims.append(embedding @ F.normalize(chunk.float(), dim=-1).T)
sims = torch.cat(sims)
sims, idx = torch.sort(sims, descending=True)
sim_mask = sims > sim_th
sims = sims[sim_mask]
idx = idx[sim_mask]
results = []
for i, sim in zip(idx, sims):
if self.us[i] in self.meta:
if filter_fn is None or filter_fn(self.meta[self.us[i]]):
results.append(dict(self.meta[self.us[i]], sim=sim))
if len(results) >= top:
break
return results
def get_filter_fn():
face_min = 0
face_max = 34985808
anim_min = 0
anim_max = 563
anim_n = not (anim_min > 0 or anim_max < 563)
face_n = not (face_min > 0 or face_max < 34985808)
filter_fn = lambda x: (
(anim_n or anim_min <= x['anims'] <= anim_max)
and (face_n or face_min <= x['faces'] <= face_max)
)
return filter_fn
def get_model_dimensions(file_path):
mesh = trimesh.load(file_path)
bounding_box = mesh.bounding_box.extents
length = bounding_box[0] / 100
width = bounding_box[2] / 100
height = bounding_box[1] / 100
return length, width, height
# Extract objects from designer_response
objects = extract_list_from_json(self.designer_response, 'objects')
for obj in objects:
text = preprocess("A high-poly " + obj['object_id']) + f" with {obj['material']} material and in {obj['style']} style, high quality"
device = self.clip_model.device
tn = self.clip_prep(
text=[text], return_tensors='pt', truncation=True, max_length=76
).to(device)
enc = self.clip_model.get_text_features(**tn).float().cpu()
retrieved_local = retrieve_local(enc, top=1, sim_th=0.5)
if retrieved_local:
retrieved_obj = retrieved_local[0]
print("Retrieved object: ", retrieved_obj["file_path"])
# destination_folder = os.path.join(os.getcwd(), f"Assets/")
# if not os.path.exists(destination_folder):
# os.makedirs(destination_folder)
source_file = retrieved_obj["file_path"]
file_extension = os.path.splitext(source_file)[1]
# destination_path = os.path.join(destination_folder, f"{obj['object_id']}{file_extension}")
# shutil.copy(source_file, destination_path)
# print(f"File moved to {destination_path}")
if retrieved_obj["sim"] > 0.5:
length, width, height = map(float, retrieved_obj["bbx"].split(','))
obj['bounding_box_size'] = {'Length': length, 'Width': width, 'Height': height}
else:
retrieved_obj = retrieve(enc, top=1, sim_th=0.1, filter_fn=get_filter_fn())[0]
print(f"Retrieved object from Objaverse: {retrieved_obj['u']}")
processes = multiprocessing.cpu_count()
objaverse_objects = objaverse.load_objects(
uids=[retrieved_obj['u']],
download_processes=processes
)
# destination_folder = os.path.join(os.getcwd(), f"Assets/")
# if not os.path.exists(destination_folder):
# os.makedirs(destination_folder)
for item_id, file_path in objaverse_objects.items():
# destination_path = f"{destination_folder}{obj['object_id']}.glb"
# shutil.move(file_path, destination_path)
# print(f"File {item_id} moved from {file_path} to {destination_path}")
if retrieved_obj["sim"] > 0.18:
length, width, height = get_model_dimensions(file_path)
obj['bounding_box_size'] = {'Length': length, 'Width': width, 'Height': height}
self.designer_response['objects'] = objects
print(self.designer_response)
def create_scene_graph(self):
cot_data_1 = []
user_proxy, interior_architect, schema_engineer = create_graph_agents()
scene_graph_groupchat = SceneGraphGroupChat(
agents =[user_proxy, interior_architect, schema_engineer],
messages=[],
max_round=10
)
cot_data, json_info, json_data = {}, {}, {}
blocks_designer = extract_list_from_json(self.designer_response, 'objects')
for d_block in blocks_designer:
object_id = d_block["object_id"]
prompt = str(d_block)
manager_scene_graph = GroupChatManager(groupchat=scene_graph_groupchat,
llm_config=gpt4_config,
human_input_mode="NEVER",
is_termination_msg=is_termination_msg)
user_proxy.initiate_chat(
manager_scene_graph,
message=f"""
The room has the size {self.room_dimensions[0]}m x {self.room_dimensions[1]}m x {self.room_dimensions[2]}m
User Input (in triple backquotes):
```
{self.user_input}
```
Room layout elements in the room (in triple backquotes):
```
['south_wall', 'north_wall', 'west_wall', 'east_wall', 'middle of the floor', 'ceiling']
```
Previously placed objects in the room (in triple backquotes):
```
{json_data}
```
Object to be placed (in triple backticks):
```
{prompt}
```
""",
)
if not json_info:
json_info["objects_in_room"] = []
json_info["objects_in_room"] += json.loads(scene_graph_groupchat.messages[-2]["content"])["objects_in_room"]
object_data = json.loads(scene_graph_groupchat.messages[-2]["content"])["objects_in_room"][0]
if 'new_object_id' in object_data:
del object_data['new_object_id']
json_data[str(object_id)] = object_data
if str(object_id) not in cot_data:
cot_data[str(object_id)] = []
indices_to_collect = list(range(1, len(scene_graph_groupchat.messages), 2))
for idx in indices_to_collect:
cot_data[str(object_id)].append(json.loads(scene_graph_groupchat.messages[idx]["content"])["chain_of_thought"])
user_proxy.reset(), interior_architect.reset(), schema_engineer.reset(), scene_graph_groupchat.reset()
self.cot_info["scene_graph_cot"] = cot_data
self.scene_graph = json_info
self.conflict_data = []
# TODO: Modify
scene_graph = preprocess_scene_graph(json_info["objects_in_room"], cot_data_1)
G = build_graph(scene_graph)
G = remove_unnecessary_edges(G, cot_data_1)
G, scene_graph = handle_under_prepositions(G, scene_graph, cot_data_1)
conflicts = get_conflicts(G, scene_graph, cot_data_1)
print("-------------------CONFLICTS-------------------")
for conflict in conflicts:
print(conflict)
print("\n\n")
self.conflict_data.append(conflicts)
user_proxy, spatial_corrector_agent, json_schema_debugger, object_deletion_agent = get_corrector_agents()
while len(conflicts) > 0:
spatial_corrector_agent.reset(), json_schema_debugger.reset()
groupchat = LayoutCorrectorGroupChat(
agents =[user_proxy, spatial_corrector_agent, json_schema_debugger],
messages=[],
max_round=15
)
manager = GroupChatManager(groupchat=groupchat, llm_config=gpt4_config, is_termination_msg=is_termination_msg)
user_proxy.initiate_chat(
manager,
message=f"""
{conflicts[0]}
""",
)
correction = groupchat.messages[-2]
pattern = r'```json\s*([^`]+)\s*```' # Match the json object
match = re.search(pattern, correction["content"], re.DOTALL).group(1)
correction_json = json.loads(match)
self.conflict_data.append(correction_json)
corr_obj = get_object_from_scene_graph(correction_json["corrected_object"]["new_object_id"], scene_graph)
corr_obj["is_on_the_floor"] = correction_json["corrected_object"]["is_on_the_floor"]
corr_obj["facing"] = correction_json["corrected_object"]["facing"]
corr_obj["placement"] = correction_json["corrected_object"]["placement"]
G = build_graph(scene_graph)
conflicts = get_conflicts(G, scene_graph, cot_data_1)
size_conflicts = get_size_conflicts(G, scene_graph, cot_data_1, self.user_input, self.room_priors)
print("-------------------SIZE CONFLICTS-------------------")
for conflict in size_conflicts:
print(conflict)
print("\n\n")
self.conflict_data.append(size_conflicts)
while len(size_conflicts) > 0:
object_deletion_agent.reset()
groupchat = ObjectDeletionGroupChat(
agents =[user_proxy, object_deletion_agent],
messages=[],
max_round=2
)
manager = GroupChatManager(groupchat=groupchat, llm_config=gpt4_config, is_termination_msg=is_termination_msg)
user_proxy.initiate_chat(
manager,
message=f"""
{size_conflicts[0]}
""",
)
correction = groupchat.messages[-1]
correction_json = json.loads(correction["content"])
object_to_delete = correction_json["object_to_delete"]
descendants = nx.descendants(G, object_to_delete)
objs_to_delete = descendants.union({object_to_delete})
print("Objs to Delete: ", objs_to_delete)
self.conflict_data.append(f"Objs to Delete: {objs_to_delete}")
scene_graph = [x for x in scene_graph if x["new_object_id"] not in objs_to_delete]
for obj in objs_to_delete:
G.remove_node(obj)
size_conflicts = get_size_conflicts(G, scene_graph, cot_data_1, self.user_input, self.room_priors)
self.scene_graph["objects_in_room"] = scene_graph
def summary_language(self):
user_proxy, language_architect = language_summary_agents()
groupchat = LanguageGroupChat(
agents=[user_proxy, language_architect],
messages=[],
max_round=2
)
manager = GroupChatManager(groupchat=groupchat, llm_config=gpt4_config, is_termination_msg=is_termination_msg)
user_proxy.initiate_chat(
manager,
message=f"""
The room has the size {self.room_dimensions[0]}m x {self.room_dimensions[1]}m x {self.room_dimensions[2]}m
User Input (in triple backquotes):
```
**chain of thought for requirements_analyzer, substructure_analyzer and interior_designer**
{self.cot_info["parse_cot"]}
```
**chain of thought for object placement**
{self.cot_info["scene_graph_cot"]}
```
**conflict data**
{self.conflict_data}
```
**scene graph**
{self.scene_graph}
```
Room layout elements in the room (in triple backquotes):
```
['south_wall', 'north_wall', 'west_wall', 'east_wall', 'middle of the room', 'ceiling']
```
json
""",
)
self.language_sum = groupchat.messages[-1]["content"]
def create_layout(self, debug=False):
# self.scene_graph = {'objects_in_room': [{'new_object_id': 'pool_table_1', 'style': 'modern', 'material': 'wood', 'functionality': 'playing', 'color': 'black', 'size_in_meters': {'length': 2.84, 'width': 1.42, 'height': 0.8}, 'is_on_the_floor': True, 'facing': 'north_wall', 'placement': {'room_layout_elements': [{'layout_element_id': 'middle of the room', 'preposition': 'on'}], 'objects_in_room': []}}, {'new_object_id': 'overhead_light_1', 'style': 'modern', 'material': 'metal', 'functionality': 'lighting', 'color': 'silver', 'size_in_meters': {'length': 1.0, 'width': 0.3, 'height': 0.3}, 'is_on_the_floor': False, 'facing': 'downwards', 'placement': {'room_layout_elements': [{'layout_element_id': 'ceiling', 'preposition': 'on'}], 'objects_in_room': [{'object_id': 'pool_table_1', 'preposition': 'above', 'is_adjacent': False}]}}, {'new_object_id': 'bar_stool_1', 'style': 'modern', 'material': 'metal', 'functionality': 'seating', 'color': 'black', 'size_in_meters': {'length': 0.45, 'width': 0.45, 'height': 0.75}, 'is_on_the_floor': True, 'facing': 'north_wall', 'placement': {'room_layout_elements': [], 'objects_in_room': [{'object_id': 'pool_table_1', 'preposition': 'right of', 'is_adjacent': False}]}}, {'new_object_id': 'bar_stool_2', 'style': 'modern', 'material': 'metal', 'functionality': 'seating', 'color': 'black', 'size_in_meters': {'length': 0.45, 'width': 0.45, 'height': 0.75}, 'is_on_the_floor': True, 'facing': 'north_wall', 'placement': {'room_layout_elements': [], 'objects_in_room': [{'object_id': 'pool_table_1', 'preposition': 'left of', 'is_adjacent': False}]}}, {'new_object_id': 'rug_1', 'style': 'modern', 'material': 'fabric', 'functionality': 'decor', 'color': 'grey', 'size_in_meters': {'length': 3.0, 'width': 2.0, 'height': 0.01}, 'is_on_the_floor': True, 'facing': 'north_wall', 'placement': {'room_layout_elements': [{'layout_element_id': 'middle of the room', 'preposition': 'on'}], 'objects_in_room': [{'object_id': 'pool_table_1', 'preposition': 'under', 'is_adjacent': False}]}}, {'new_object_id': 'scoreboard_1', 'style': 'modern', 'material': 'electronic', 'functionality': 'score keeping', 'color': 'black', 'size_in_meters': {'length': 0.6, 'width': 0.02, 'height': 0.4}, 'is_on_the_floor': False, 'facing': 'north_wall', 'placement': {'room_layout_elements': [{'layout_element_id': 'west_wall', 'preposition': 'on'}], 'objects_in_room': []}}]}
cot_data = []
G = build_graph(self.scene_graph["objects_in_room"])
nodes = G.nodes()
cot_data.append("Calculate constraint area for non-layout objects only.")
for node in nodes:
if node not in self.layout_elements:
cluster_size, _ = get_cluster_size(node, G, self.scene_graph["objects_in_room"], cot_data)
node_obj = get_object_from_scene_graph(node, self.scene_graph["objects_in_room"])
cluster_size = {"x_neg" : cluster_size["left of"], "x_pos" : cluster_size["right of"], "y_neg" : cluster_size["behind"], "y_pos" : cluster_size["in front"]}
node_obj["cluster"] = {"constraint_area" : cluster_size}
cot_data.append(f"The constraint area for {node} is {cluster_size}.")
self.scene_graph = self.scene_graph["objects_in_room"] + self.room_priors
prior_ids = ["south_wall", "north_wall", "east_wall", "west_wall", "ceiling", "middle of the room"]
point_bbox = dict.fromkeys([item["new_object_id"] for item in self.scene_graph], False)
# Place the objects that have an absolute position
for item in self.scene_graph:
if item["new_object_id"] in prior_ids:
continue
possible_pos = get_possible_positions(item["new_object_id"], self.scene_graph, self.room_dimensions, cot_data)
# Determine the overlap based on the possible positions
overlap = None
if len(possible_pos) == 1:
overlap = possible_pos[0]
elif len(possible_pos) > 1:
overlap = possible_pos[0]
for pos in possible_pos[1:]:
overlap = calculate_overlap(overlap, pos)
# If the overlap is a point bbox, assign the position
if overlap is not None and is_point_bbox(overlap) and len(possible_pos) > 0:
item["position"] = {"x" : overlap[0], "y" : overlap[2], "z" : overlap[4]}
point_bbox[item["new_object_id"]] = True
scene_graph_wo_layout = [item for item in self.scene_graph if item["new_object_id"] not in self.layout_elements]
depth_scene_graph = get_depth(scene_graph_wo_layout)
max_depth = max(depth_scene_graph.values())
topological_order = get_topological_ordering(scene_graph_wo_layout)
topological_order = [item for item in topological_order if item not in self.layout_elements]
d = 1
count = 0
while d <= max_depth and count < 20:
count += 1
error_flag = False
nodes = [node for node in topological_order if depth_scene_graph[node] == d]
if debug:
print(f"Nodes at depth {d}: ", nodes)
errors = {}
cot_data.append(f"Place objects: {[node for node in nodes]}.")
for node in nodes:
if point_bbox[node]:
continue
obj = next(item for item in scene_graph_wo_layout if item["new_object_id"] == node)
cot_data.append(f"Place the object {obj['new_object_id']} at the depth {d}.")
errors = place_object(obj, self.scene_graph, self.room_dimensions, cot_data, errors={}, debug=debug)
if debug:
print(f"Errors for {obj['new_object_id']}: ", errors)
# cot_data.append(f"Check whether there are any errors in placing {obj['new_object_id']}.")
if errors:
if d > 1:
d -= 1
cot_data.append(f"Errors occur for {obj['new_object_id']}: {errors}. Reduce depth to {d}.")
if debug:
print("Reducing depth to: ", d)
else:
cot_data.append(f"Errors occur for {obj['new_object_id']} with depth 1: {errors}. The layout creation failed.")
print(f"Errors occur for {obj['new_object_id']} with depth 1: {errors}. The layout creation failed.")
self.calculation_data = []
return errors
error_flag = True
cot_data.append(f"Delete positions for objects at or beyond the current depth {d} in order to reposition the objects.")
for del_item in scene_graph_wo_layout:
if depth_scene_graph[del_item["new_object_id"]] >= d:
if "position" in del_item.keys() and not point_bbox[del_item["new_object_id"]]:
if debug:
print("Deleting position for: ", del_item["new_object_id"])
del del_item["position"]
errors = {}
break
# else:
# cot_data.append(f"No error is found.")
if not error_flag:
d += 1
cot_data.append("Save the scene graph.")
self.calculation_data = cot_data
print(cot_data)
print("\n")
os.makedirs("./results", exist_ok=True)
jsonname = re.sub(r'[^a-zA-Z0-9]', '_', self.user_input) + '.json'
self.result_file = os.path.join("./results", jsonname)
with open(self.result_file, "w") as file:
json.dump(self.scene_graph, file, indent=4)
def summary_calculation(self):
if self.calculation_data:
user_proxy, calculation_architect = calculation_summary_agents()
groupchat = CalculationGroupChat(
agents=[user_proxy, calculation_architect],
messages=[],
max_round=2
)
manager = GroupChatManager(groupchat=groupchat, llm_config=gpt4_config, is_termination_msg=is_termination_msg)
user_proxy.initiate_chat(
manager,
message=f"""
The room has the size {self.room_dimensions[0]}m x {self.room_dimensions[1]}m x {self.room_dimensions[2]}m
User Input (in triple backquotes):
```
{self.calculation_data}
```
Room layout elements in the room (in triple backquotes):
```
['south_wall', 'north_wall', 'west_wall', 'east_wall', 'middle of the room', 'ceiling']
```
json
""",
)
self.calculation_sum = groupchat.messages[-1]["content"]
os.makedirs("./Results_data", exist_ok=True)
filename = re.sub(r'[^a-zA-Z0-9]', '_', self.user_input) + '.md'
full_path = os.path.join("./Results_data", filename)
with open(full_path, 'w', encoding='utf-8') as file:
file.write(self.language_sum)
file.write('\n\n## 6. **Object Placement**\n')
file.write(self.calculation_sum)
else:
pass