Spaces:
Runtime error
Runtime error
| import os | |
| import random | |
| import torch | |
| import hydra | |
| import numpy as np | |
| import zipfile | |
| import time | |
| import uuid | |
| from typing import Any | |
| from hydra import compose, initialize | |
| from omegaconf import DictConfig, OmegaConf | |
| from huggingface_hub import hf_hub_download | |
| from utils.misc import compute_model_dim | |
| from datasets.base import create_dataset | |
| from datasets.misc import collate_fn_general, collate_fn_squeeze_pcd_batch | |
| from models.base import create_model | |
| from models.visualizer import create_visualizer | |
| from models.environment import create_enviroment | |
| def pretrain_pointtrans_weight_path(): | |
| return hf_hub_download('SceneDiffuser/SceneDiffuser', 'weights/POINTTRANS_C_32768/model.pth') | |
| def model_weight_path(task, has_observation=False): | |
| if task == 'pose_gen': | |
| return hf_hub_download('SceneDiffuser/SceneDiffuser', 'weights/2022-11-09_11-22-52_PoseGen_ddm4_lr1e-4_ep100/ckpts/model.pth') | |
| elif task == 'motion_gen' and has_observation == True: | |
| return hf_hub_download('SceneDiffuser/SceneDiffuser', 'weights/2022-11-09_14-28-12_MotionGen_ddm_T200_lr1e-4_ep300_obser/ckpts/model.pth') | |
| elif task == 'motion_gen' and has_observation == False: | |
| return hf_hub_download('SceneDiffuser/SceneDiffuser', 'weights/2022-11-09_12-54-50_MotionGen_ddm_T200_lr1e-4_ep300/ckpts/model.pth') | |
| elif task == 'path_planning': | |
| return hf_hub_download('SceneDiffuser/SceneDiffuser', 'weights/2022-11-25_20-57-28_Path_ddm4_LR1e-4_E100_REL/ckpts/model.pth') | |
| else: | |
| raise Exception('Unexcepted task.') | |
| def pose_motion_data_path(): | |
| zip_path = hf_hub_download('SceneDiffuser/SceneDiffuser', 'hf_data/pose_motion.zip') | |
| with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
| zip_ref.extractall(os.path.dirname(zip_path)) | |
| rpath = os.path.join(os.path.dirname(zip_path), 'pose_motion') | |
| return ( | |
| os.path.join(rpath, 'PROXD_temp'), | |
| os.path.join(rpath, 'models_smplx_v1_1/models/'), | |
| os.path.join(rpath, 'PROX'), | |
| os.path.join(rpath, 'PROX/V02_05') | |
| ) | |
| def path_planning_data_path(): | |
| zip_path = hf_hub_download('SceneDiffuser/SceneDiffuser', 'hf_data/path_planning.zip') | |
| with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
| zip_ref.extractall(os.path.dirname(zip_path)) | |
| return os.path.join(os.path.dirname(zip_path), 'path_planning') | |
| def load_ckpt(model: torch.nn.Module, path: str) -> None: | |
| """ load ckpt for current model | |
| Args: | |
| model: current model | |
| path: save path | |
| """ | |
| assert os.path.exists(path), 'Can\'t find provided ckpt.' | |
| saved_state_dict = torch.load(path)['model'] | |
| model_state_dict = model.state_dict() | |
| for key in model_state_dict: | |
| if key in saved_state_dict: | |
| model_state_dict[key] = saved_state_dict[key] | |
| ## model is trained with ddm | |
| if 'module.'+key in saved_state_dict: | |
| model_state_dict[key] = saved_state_dict['module.'+key] | |
| model.load_state_dict(model_state_dict) | |
| def _sampling(cfg: DictConfig, scene: str) -> Any: | |
| ## compute modeling dimension according to task | |
| cfg.model.d_x = compute_model_dim(cfg.task) | |
| if cfg.gpu is not None: | |
| device = f'cuda:{cfg.gpu}' | |
| else: | |
| device = 'cpu' | |
| dataset = create_dataset(cfg.task.dataset, 'test', cfg.slurm, case_only=True, specific_scene=scene) | |
| if cfg.model.scene_model.name == 'PointTransformer': | |
| collate_fn = collate_fn_squeeze_pcd_batch | |
| else: | |
| collate_fn = collate_fn_general | |
| dataloader = dataset.get_dataloader( | |
| batch_size=1, | |
| collate_fn=collate_fn, | |
| shuffle=True, | |
| ) | |
| ## create model and load ckpt | |
| model = create_model(cfg, slurm=cfg.slurm, device=device) | |
| model.to(device=device) | |
| load_ckpt(model, path=model_weight_path(cfg.task.name, cfg.task.has_observation if 'has_observation' in cfg.task else False)) | |
| ## create visualizer and visualize | |
| visualizer = create_visualizer(cfg.task.visualizer) | |
| results = visualizer.visualize(model, dataloader) | |
| return results | |
| def _planning(cfg: DictConfig, scene: str) -> Any: | |
| ## compute modeling dimension according to task | |
| cfg.model.d_x = compute_model_dim(cfg.task) | |
| if cfg.gpu is not None: | |
| device = f'cuda:{cfg.gpu}' | |
| else: | |
| device = 'cpu' | |
| dataset = create_dataset(cfg.task.dataset, 'test', cfg.slurm, case_only=True, specific_scene=scene) | |
| if cfg.model.scene_model.name == 'PointTransformer': | |
| collate_fn = collate_fn_squeeze_pcd_batch | |
| else: | |
| collate_fn = collate_fn_general | |
| dataloader = dataset.get_dataloader( | |
| batch_size=1, | |
| collate_fn=collate_fn, | |
| shuffle=True, | |
| ) | |
| ## create model and load ckpt | |
| model = create_model(cfg, slurm=cfg.slurm, device=device) | |
| model.to(device=device) | |
| load_ckpt(model, path=model_weight_path(cfg.task.name, cfg.task.has_observation if 'has_observation' in cfg.task else False)) | |
| ## create environment for planning task and run | |
| env = create_enviroment(cfg.task.env) | |
| results = env.run(model, dataloader) | |
| return results | |
| ## interface for five task | |
| ## real-time model: | |
| ## - pose generation | |
| ## - motion generation | |
| ## - path planning | |
| def pose_generation(scene, count, seed, opt, scale) -> Any: | |
| scene_model_weight_path = pretrain_pointtrans_weight_path() | |
| data_dir, smpl_dir, prox_dir, vposer_dir = pose_motion_data_path() | |
| override_config = [ | |
| "diffuser=ddpm", | |
| "model=unet", | |
| f"model.scene_model.pretrained_weights={scene_model_weight_path}", | |
| "task=pose_gen", | |
| "task.visualizer.name=PoseGenVisualizerHF", | |
| f"task.visualizer.ksample={count}", | |
| f"task.dataset.data_dir={data_dir}", | |
| f"task.dataset.smpl_dir={smpl_dir}", | |
| f"task.dataset.prox_dir={prox_dir}", | |
| f"task.dataset.vposer_dir={vposer_dir}", | |
| ] | |
| if opt == True: | |
| override_config += [ | |
| "optimizer=pose_in_scene", | |
| "optimizer.scale_type=div_var", | |
| f"optimizer.scale={scale}", | |
| "optimizer.vposer=false", | |
| "optimizer.contact_weight=0.02", | |
| "optimizer.collision_weight=1.0" | |
| ] | |
| initialize(config_path="./scenediffuser/configs", version_base=None) | |
| config = compose(config_name="default", overrides=override_config) | |
| random.seed(seed) | |
| np.random.seed(seed) | |
| torch.manual_seed(seed) | |
| torch.cuda.manual_seed(seed) | |
| torch.cuda.manual_seed_all(seed) | |
| res = _sampling(config, scene) | |
| hydra.core.global_hydra.GlobalHydra.instance().clear() | |
| return res | |
| def motion_generation(scene, count, seed, withstart, opt, scale) -> Any: | |
| scene_model_weight_path = pretrain_pointtrans_weight_path() | |
| data_dir, smpl_dir, prox_dir, vposer_dir = pose_motion_data_path() | |
| override_config = [ | |
| "diffuser=ddpm", | |
| "diffuser.steps=200", | |
| "model=unet", | |
| "model.use_position_embedding=true", | |
| f"model.scene_model.pretrained_weights={scene_model_weight_path}", | |
| "task=motion_gen", | |
| f"task.has_observation={withstart}", | |
| "task.dataset.repr_type=absolute", | |
| "task.dataset.frame_interval_test=20", | |
| "task.visualizer.name=MotionGenVisualizerHF", | |
| f"task.visualizer.ksample={count}", | |
| f"task.dataset.data_dir={data_dir}", | |
| f"task.dataset.smpl_dir={smpl_dir}", | |
| f"task.dataset.prox_dir={prox_dir}", | |
| f"task.dataset.vposer_dir={vposer_dir}", | |
| ] | |
| if opt == True: | |
| override_config += [ | |
| "optimizer=motion_in_scene", | |
| "optimizer.scale_type=div_var", | |
| f"optimizer.scale={scale}", | |
| "optimizer.vposer=false", | |
| "optimizer.contact_weight=0.02", | |
| "optimizer.collision_weight=1.0", | |
| "optimizer.smoothness_weight=0.001", | |
| "optimizer.frame_interval=1", | |
| ] | |
| initialize(config_path="./scenediffuser/configs", version_base=None) | |
| config = compose(config_name="default", overrides=override_config) | |
| random.seed(seed) | |
| np.random.seed(seed) | |
| torch.manual_seed(seed) | |
| torch.cuda.manual_seed(seed) | |
| torch.cuda.manual_seed_all(seed) | |
| res_gifs = _sampling(config, scene) | |
| ## save sampled motion as .gif file | |
| datestr = time.strftime("%Y-%m-%d", time.localtime(time.time())) | |
| target_dir = os.path.join('./results/motion_generation/', f'd-{datestr}') | |
| os.makedirs(target_dir, exist_ok=True) | |
| res = [] | |
| uuid_str = uuid.uuid4() | |
| for i, imgs in enumerate(res_gifs): | |
| target_path = os.path.join(target_dir, f'{uuid_str}--{i}.gif') | |
| imgs = [im.resize((720, 405)) for im in imgs] # resize image for low resolution to save space | |
| img, *img_rest = imgs | |
| img.save(fp=target_path, format='GIF', append_images=img_rest, save_all=True, duration=33.33, loop=0) | |
| res.append(target_path) | |
| hydra.core.global_hydra.GlobalHydra.instance().clear() | |
| return res | |
| def grasp_generation(case_id): | |
| assert isinstance(case_id, str) | |
| res = f"./results/grasp_generation/results/{case_id}/{random.randint(0, 19)}.glb" | |
| if not os.path.exists(res): | |
| results_path = hf_hub_download('SceneDiffuser/SceneDiffuser', 'results/grasp_generation/results.zip') | |
| os.makedirs('./results/grasp_generation/', exist_ok=True) | |
| with zipfile.ZipFile(results_path, 'r') as zip_ref: | |
| zip_ref.extractall('./results/grasp_generation/') | |
| return res | |
| def path_planning(scene, mode, count, seed, opt, scale_opt, pla, scale_pla): | |
| scene_model_weight_path = pretrain_pointtrans_weight_path() | |
| data_dir = path_planning_data_path() | |
| override_config = [ | |
| "diffuser=ddpm", | |
| "model=unet", | |
| "model.use_position_embedding=true", | |
| f"model.scene_model.pretrained_weights={scene_model_weight_path}", | |
| "task=path_planning", | |
| "task.visualizer.name=PathPlanningRenderingVisualizerHF", | |
| f"task.visualizer.ksample={count}", | |
| f"task.dataset.data_dir={data_dir}", | |
| "task.dataset.repr_type=relative", | |
| "task.env.name=PathPlanningEnvWrapperHF", | |
| "task.env.inpainting_horizon=16", | |
| "task.env.robot_top=3.0", | |
| "task.env.env_adaption=false" | |
| ] | |
| if opt == True: | |
| override_config += [ | |
| "optimizer=path_in_scene", | |
| "optimizer.scale_type=div_var", | |
| "optimizer.continuity=false", | |
| f"optimizer.scale={scale_opt}", | |
| ] | |
| if pla == True: | |
| override_config += [ | |
| "planner=greedy_path_planning", | |
| f"planner.scale={scale_pla}", | |
| "planner.scale_type=div_var", | |
| "planner.greedy_type=all_frame_exp" | |
| ] | |
| initialize(config_path="./scenediffuser/configs", version_base=None) | |
| config = compose(config_name="default", overrides=override_config) | |
| random.seed(seed) | |
| np.random.seed(seed) | |
| torch.manual_seed(seed) | |
| torch.cuda.manual_seed(seed) | |
| torch.cuda.manual_seed_all(seed) | |
| if mode == 'Sampling': | |
| img = _sampling(config, scene) | |
| res = (img, 0) | |
| elif mode == 'Planning': | |
| res = _planning(config, scene) | |
| else: | |
| res = (None, 0) | |
| hydra.core.global_hydra.GlobalHydra.instance().clear() | |
| return res | |