| import copy |
| import numpy as np |
| from torch.utils.data import DataLoader |
| from pfp import DATA_DIRS |
| from pfp.data.dataset_pcd import RobotDatasetPcd, augment_pcd_data |
| from pfp.common.visualization import RerunViewer as RV |
| from pfp.common.visualization import RerunTraj |
| import rerun as rr |
|
|
| rr_traj = { |
| "original_robot_obs": RerunTraj(), |
| "augmented_robot_obs": RerunTraj(), |
| "original_prediction": RerunTraj(), |
| "augmented_prediction": RerunTraj(), |
| } |
|
|
|
|
| def vis_batch(name, batch): |
| pcd, robot_state_obs, robot_state_pred = batch |
| pcd = pcd[0, -1].cpu().numpy() |
| robot_state_obs = robot_state_obs[0].cpu().numpy() |
| robot_state_pred = robot_state_pred[0].cpu().numpy() |
| RV.add_np_pointcloud( |
| f"vis/{name}_pcd", points=pcd[:, :3], colors_uint8=(pcd[:, 3:6] * 255).astype(np.uint8) |
| ) |
| rr_traj[f"{name}_robot_obs"].add_traj(f"{name}_robot_obs", robot_state_obs, size=0.008) |
| rr_traj[f"{name}_prediction"].add_traj(f"{name}_prediction", robot_state_pred) |
| return |
|
|
|
|
| RV("augmentation_vis") |
| RV.add_axis("vis/origin", np.eye(4), timeless=True) |
|
|
| task_name = "sponge_on_plate" |
|
|
| data_path_train = DATA_DIRS.PFP_REAL / task_name / "train" |
| dataset_train = RobotDatasetPcd( |
| data_path_train, |
| n_obs_steps=2, |
| n_pred_steps=32, |
| subs_factor=3, |
| use_pc_color=False, |
| n_points=4096, |
| ) |
| dataloader_train = DataLoader( |
| dataset_train, |
| shuffle=False, |
| batch_size=1, |
| persistent_workers=False, |
| ) |
|
|
| for i, batch in enumerate(dataloader_train): |
| rr.set_time_sequence("step", i) |
| original_batch = copy.deepcopy(batch) |
| vis_batch("original", original_batch) |
|
|
| augmented_batch = copy.deepcopy(batch) |
| augmented_batch = augment_pcd_data(augmented_batch) |
| vis_batch("augmented", augmented_batch) |
|
|
| if i > 500: |
| break |
|
|