dynamo_ssl / envs /libero /utils /task_generation_utils.py
jeffacce
initial commit
393d3de
import os
from collections import namedtuple
from libero.utils.mu_utils import get_scene_class
from libero.utils.bddl_generation_utils import *
TASK_INFO = {}
TaskInfoTuple = namedtuple(
"TaskInfoTuple", "scene_name language objects_of_interest goal_states"
)
def register_task_info(language, scene_name, objects_of_interest=[], goal_states=[]):
if scene_name not in TASK_INFO:
TASK_INFO[scene_name] = []
scene = get_scene_class(scene_name)()
possible_objects_of_interest = scene.possible_objects_of_interest
for object_name in objects_of_interest:
if object_name not in possible_objects_of_interest:
print(f"Error!! {scene_name} not having valid objects: {object_name}")
print(possible_objects_of_interest)
raise ValueError
task_goal = [("And", *goal_states)]
TASK_INFO[scene_name].append(
TaskInfoTuple(scene_name, language, objects_of_interest, task_goal)
)
def get_task_info(scene_name=None):
if scene_name is None:
return TASK_INFO
else:
return TASK_INFO[scene_name]
def get_suite_generator_func(workspace_name):
if workspace_name == "main_table":
return tabletop_task_suites_generator
elif workspace_name == "kitchen_table":
return kitchen_table_task_suites_generator
elif workspace_name == "living_room_table":
return living_room_table_task_suites_generator
elif workspace_name == "study_table":
return study_table_task_suites_generator
elif workspace_name == "coffee_table":
return coffee_table_task_suites_generator
else:
return floor_task_suites_generator
def generate_bddl_from_task_info(folder="/tmp/pddl"):
results = []
failures = []
bddl_file_names = []
os.makedirs(folder, exist_ok=True)
registered_task_info_dict = get_task_info()
for scene_name in registered_task_info_dict:
for task_info_tuple in registered_task_info_dict[scene_name]:
scene_name = task_info_tuple.scene_name
language = task_info_tuple.language
objects_of_interest = task_info_tuple.objects_of_interest
goal_states = task_info_tuple.goal_states
scene = get_scene_class(scene_name)()
try:
result = get_suite_generator_func(scene.workspace_name)(
language=language,
xy_region_kwargs_list=scene.xy_region_kwargs_list,
affordance_region_kwargs_list=scene.affordance_region_kwargs_list,
fixture_object_dict=scene.fixture_object_dict,
movable_object_dict=scene.movable_object_dict,
objects_of_interest=objects_of_interest,
init_states=scene.init_states,
goal_states=goal_states,
)
result = get_result(result)
bddl_file_name = save_to_file(
result, scene_name=scene_name, language=language, folder=folder
)
if bddl_file_name in bddl_file_names:
print(bddl_file_name)
bddl_file_names.append(bddl_file_name)
results.append(result)
except:
failures.append((scene_name, language))
print(f"Succefully generated: {len(results)}")
return bddl_file_names, failures