| import numpy as np |
| from nuscenes.prediction import ( |
| PredictHelper, |
| convert_local_coords_to_global, |
| convert_global_coords_to_local, |
| ) |
| from mmdet3d.core.bbox import Box3DMode, Coord3DMode, LiDARInstance3DBoxes |
| from nuscenes.eval.common.utils import quaternion_yaw, Quaternion |
|
|
| from mmcv.parallel import DataContainer as DC |
| from mmdet.datasets.pipelines import to_tensor |
|
|
| class NuScenesTraj(object): |
| def __init__( |
| self, |
| nusc, |
| predict_steps, |
| planning_steps, |
| past_steps, |
| fut_steps, |
| with_velocity, |
| CLASSES, |
| box_mode_3d, |
| use_nonlinear_optimizer=False, |
| ): |
| super().__init__() |
| self.nusc = nusc |
| self.prepare_sdc_vel_info() |
| self.predict_steps = predict_steps |
| self.planning_steps = planning_steps |
| self.past_steps = past_steps |
| self.fut_steps = fut_steps |
| self.with_velocity = with_velocity |
| self.CLASSES = CLASSES |
| self.box_mode_3d = box_mode_3d |
| self.predict_helper = PredictHelper(self.nusc) |
| self.use_nonlinear_optimizer = use_nonlinear_optimizer |
|
|
| def get_traj_label(self, sample_token, ann_tokens): |
| sd_rec = self.nusc.get("sample", sample_token) |
| fut_traj_all = [] |
| fut_traj_valid_mask_all = [] |
| past_traj_all = [] |
| past_traj_valid_mask_all = [] |
| _, boxes, _ = self.nusc.get_sample_data( |
| sd_rec["data"]["LIDAR_TOP"], selected_anntokens=ann_tokens |
| ) |
| for i, ann_token in enumerate(ann_tokens): |
| box = boxes[i] |
| instance_token = self.nusc.get("sample_annotation", ann_token)[ |
| "instance_token" |
| ] |
| fut_traj_local = self.predict_helper.get_future_for_agent( |
| instance_token, sample_token, seconds=6, in_agent_frame=True |
| ) |
| past_traj_local = self.predict_helper.get_past_for_agent( |
| instance_token, sample_token, seconds=2, in_agent_frame=True |
| ) |
|
|
| fut_traj = np.zeros((self.predict_steps, 2)) |
| fut_traj_valid_mask = np.zeros((self.predict_steps, 2)) |
| past_traj = np.zeros((self.past_steps + self.fut_steps, 2)) |
| past_traj_valid_mask = np.zeros((self.past_steps + self.fut_steps, 2)) |
| if fut_traj_local.shape[0] > 0: |
| if self.use_nonlinear_optimizer: |
| trans = box.center |
| else: |
| trans = np.array([0, 0, 0]) |
| rot = Quaternion(matrix=box.rotation_matrix) |
| fut_traj_scence_centric = convert_local_coords_to_global( |
| fut_traj_local, trans, rot |
| ) |
| fut_traj[ |
| : fut_traj_scence_centric.shape[0], : |
| ] = fut_traj_scence_centric |
| fut_traj_valid_mask[: fut_traj_scence_centric.shape[0], :] = 1 |
| if past_traj_local.shape[0] > 0: |
| |
| |
| if self.use_nonlinear_optimizer: |
| trans = box.center |
| else: |
| trans = np.array([0, 0, 0]) |
| rot = Quaternion(matrix=box.rotation_matrix) |
| past_traj_scence_centric = convert_local_coords_to_global( |
| past_traj_local, trans, rot |
| ) |
| past_traj[ |
| : past_traj_scence_centric.shape[0], : |
| ] = past_traj_scence_centric |
| past_traj_valid_mask[: past_traj_scence_centric.shape[0], :] = 1 |
|
|
| if fut_traj_local.shape[0] > 0: |
| fut_steps = min(self.fut_steps, fut_traj_scence_centric.shape[0]) |
| past_traj[ |
| self.past_steps : self.past_steps + fut_steps, : |
| ] = fut_traj_scence_centric[:fut_steps] |
| past_traj_valid_mask[ |
| self.past_steps : self.past_steps + fut_steps, : |
| ] = 1 |
|
|
| fut_traj_all.append(fut_traj) |
| fut_traj_valid_mask_all.append(fut_traj_valid_mask) |
| past_traj_all.append(past_traj) |
| past_traj_valid_mask_all.append(past_traj_valid_mask) |
| if len(ann_tokens) > 0: |
| fut_traj_all = np.stack(fut_traj_all, axis=0) |
| fut_traj_valid_mask_all = np.stack(fut_traj_valid_mask_all, axis=0) |
| past_traj_all = np.stack(past_traj_all, axis=0) |
| past_traj_valid_mask_all = np.stack(past_traj_valid_mask_all, axis=0) |
| else: |
| fut_traj_all = np.zeros((0, self.predict_steps, 2)) |
| fut_traj_valid_mask_all = np.zeros((0, self.predict_steps, 2)) |
| past_traj_all = np.zeros((0, self.predict_steps, 2)) |
| past_traj_valid_mask_all = np.zeros((0, self.predict_steps, 2)) |
| return ( |
| fut_traj_all, |
| fut_traj_valid_mask_all, |
| past_traj_all, |
| past_traj_valid_mask_all, |
| ) |
|
|
| def get_vel_transform_mats(self, sample): |
| sd_rec = self.nusc.get("sample_data", sample["data"]["LIDAR_TOP"]) |
| cs_record = self.nusc.get( |
| "calibrated_sensor", sd_rec["calibrated_sensor_token"] |
| ) |
| pose_record = self.nusc.get("ego_pose", sd_rec["ego_pose_token"]) |
|
|
| l2e_r = cs_record["rotation"] |
| l2e_t = cs_record["translation"] |
| e2g_r = pose_record["rotation"] |
| e2g_t = pose_record["translation"] |
| l2e_r_mat = Quaternion(l2e_r).rotation_matrix |
| e2g_r_mat = Quaternion(e2g_r).rotation_matrix |
|
|
| return l2e_r_mat, e2g_r_mat |
|
|
| def get_vel_and_time(self, sample): |
| lidar_token = sample["data"]["LIDAR_TOP"] |
| lidar_top = self.nusc.get("sample_data", lidar_token) |
| pose = self.nusc.get("ego_pose", lidar_top["ego_pose_token"]) |
| xyz = pose["translation"] |
| timestamp = sample["timestamp"] |
| return xyz, timestamp |
|
|
| def prepare_sdc_vel_info(self): |
| |
| |
| |
| |
|
|
| self.sdc_vel_info = {} |
| for scene in self.nusc.scene: |
| scene_token = scene["token"] |
|
|
| |
| last_sample_token = scene["last_sample_token"] |
| sample_token = scene["first_sample_token"] |
| sample = self.nusc.get("sample", sample_token) |
| xyz, time = self.get_vel_and_time(sample) |
| while sample["token"] != last_sample_token: |
| next_sample_token = sample["next"] |
| next_sample = self.nusc.get("sample", next_sample_token) |
| next_xyz, next_time = self.get_vel_and_time(next_sample) |
| |
| |
| dc = np.array(next_xyz) - np.array(xyz) |
| dt = (next_time - time) / 1e6 |
| vel = dc / dt |
|
|
| |
| l2e_r_mat, e2g_r_mat = self.get_vel_transform_mats(sample) |
| vel = vel @ np.linalg.inv(e2g_r_mat).T @ np.linalg.inv(l2e_r_mat).T |
| |
| |
| |
| |
| |
|
|
| |
| |
| |
| |
| |
| vel = vel[:2] |
|
|
| self.sdc_vel_info[sample["token"]] = vel |
| xyz, time = next_xyz, next_time |
| sample = next_sample |
|
|
| |
| last_sample = self.nusc.get("sample", last_sample_token) |
| second_last_sample_token = last_sample["prev"] |
| self.sdc_vel_info[last_sample_token] = self.sdc_vel_info[ |
| second_last_sample_token |
| ] |
|
|
| def generate_sdc_info(self, sdc_vel, as_lidar_instance3d_box=False, old_canbus=True): |
| |
| |
| |
| if old_canbus: |
| psudo_sdc_bbox = np.array([0.0, 0.0, 0.0, 1.73, 4.08, 1.56, -np.pi]) |
| |
|
|
| |
| else: |
| psudo_sdc_bbox = np.array([0.0, 0.0, 0.0, 4.08, 1.73, 1.56, np.pi/2]) |
|
|
| |
| |
| |
|
|
| if self.with_velocity: |
| psudo_sdc_bbox = np.concatenate([psudo_sdc_bbox, sdc_vel], axis=-1) |
| gt_bboxes_3d = np.array([psudo_sdc_bbox]).astype(np.float32) |
| |
|
|
| gt_names_3d = ["car"] |
| gt_labels_3d = [] |
| for cat in gt_names_3d: |
| if cat in self.CLASSES: |
| gt_labels_3d.append(self.CLASSES.index(cat)) |
| else: |
| gt_labels_3d.append(-1) |
| gt_labels_3d = np.array(gt_labels_3d) |
|
|
| |
| |
| gt_bboxes_3d = LiDARInstance3DBoxes( |
| gt_bboxes_3d, box_dim=gt_bboxes_3d.shape[-1], origin=(0.5, 0.5, 0.5) |
| ) |
| |
| |
| gt_bboxes_3d = gt_bboxes_3d.convert_to(self.box_mode_3d) |
| |
|
|
| if as_lidar_instance3d_box: |
| |
| return gt_bboxes_3d |
|
|
| gt_labels_3d = DC(to_tensor(gt_labels_3d)) |
| gt_bboxes_3d = DC(gt_bboxes_3d, cpu_only=True) |
|
|
| return gt_bboxes_3d, gt_labels_3d |
|
|
| def get_sdc_traj_label(self, sample_token): |
| sd_rec = self.nusc.get("sample", sample_token) |
| lidar_top_data_start = self.nusc.get("sample_data", sd_rec["data"]["LIDAR_TOP"]) |
| ego_pose_start = self.nusc.get( |
| "ego_pose", lidar_top_data_start["ego_pose_token"] |
| ) |
|
|
| sdc_fut_traj = [] |
| for _ in range(self.predict_steps): |
| next_annotation_token = sd_rec["next"] |
| if next_annotation_token == "": |
| break |
| sd_rec = self.nusc.get("sample", next_annotation_token) |
| lidar_top_data_next = self.nusc.get( |
| "sample_data", sd_rec["data"]["LIDAR_TOP"] |
| ) |
| ego_pose_next = self.nusc.get( |
| "ego_pose", lidar_top_data_next["ego_pose_token"] |
| ) |
| sdc_fut_traj.append( |
| ego_pose_next["translation"][:2] |
| ) |
|
|
| sdc_fut_traj_all = np.zeros((1, self.predict_steps, 2)) |
| sdc_fut_traj_valid_mask_all = np.zeros((1, self.predict_steps, 2)) |
| n_valid_timestep = len(sdc_fut_traj) |
| if n_valid_timestep > 0: |
| sdc_fut_traj = np.stack(sdc_fut_traj, axis=0) |
| sdc_fut_traj = convert_global_coords_to_local( |
| coordinates=sdc_fut_traj, |
| translation=ego_pose_start["translation"], |
| rotation=ego_pose_start["rotation"], |
| ) |
| sdc_fut_traj_all[:, :n_valid_timestep, :] = sdc_fut_traj |
| sdc_fut_traj_valid_mask_all[:, :n_valid_timestep, :] = 1 |
|
|
| return sdc_fut_traj_all, sdc_fut_traj_valid_mask_all |
|
|
| def get_l2g_transform(self, sample): |
| sd_rec = self.nusc.get("sample_data", sample["data"]["LIDAR_TOP"]) |
| cs_record = self.nusc.get( |
| "calibrated_sensor", sd_rec["calibrated_sensor_token"] |
| ) |
| pose_record = self.nusc.get("ego_pose", sd_rec["ego_pose_token"]) |
|
|
| l2e_r = cs_record["rotation"] |
| l2e_t = np.array(cs_record["translation"]) |
| e2g_r = pose_record["rotation"] |
| e2g_t = np.array(pose_record["translation"]) |
| l2e_r_mat = Quaternion(l2e_r).rotation_matrix |
| e2g_r_mat = Quaternion(e2g_r).rotation_matrix |
|
|
| return l2e_r_mat, l2e_t, e2g_r_mat, e2g_t |
|
|
| def get_sdc_planning_label(self, sample_token): |
|
|
| |
| sd_rec = self.nusc.get("sample", sample_token) |
| l2e_r_mat_init, l2e_t_init, e2g_r_mat_init, e2g_t_init = self.get_l2g_transform( |
| sd_rec |
| ) |
|
|
| |
| |
| planning = [] |
| planning_world = [] |
| for _ in range(self.planning_steps): |
| next_annotation_token = sd_rec["next"] |
| if next_annotation_token == "": |
| break |
| sd_rec = self.nusc.get("sample", next_annotation_token) |
| ( |
| l2e_r_mat_curr, |
| l2e_t_curr, |
| e2g_r_mat_curr, |
| e2g_t_curr, |
| ) = self.get_l2g_transform( |
| sd_rec |
| ) |
|
|
| |
| next_bbox3d = self.generate_sdc_info( |
| self.sdc_vel_info[next_annotation_token], as_lidar_instance3d_box=True |
| ) |
| |
|
|
| |
| next_bbox3d.rotate(l2e_r_mat_curr.T) |
| next_bbox3d.translate(l2e_t_curr) |
| |
|
|
| |
| next_bbox3d.rotate(e2g_r_mat_curr.T) |
| next_bbox3d.translate(e2g_t_curr) |
| planning_world.append(next_bbox3d.tensor.clone()) |
| |
|
|
| |
| next_bbox3d.translate(-e2g_t_init) |
| m1 = np.linalg.inv(e2g_r_mat_init) |
| next_bbox3d.rotate(m1.T) |
| |
|
|
| |
| next_bbox3d.translate(-l2e_t_init) |
| m2 = np.linalg.inv(l2e_r_mat_init) |
| next_bbox3d.rotate(m2.T) |
| |
| |
|
|
| planning.append(next_bbox3d) |
|
|
| planning_all = np.zeros((1, self.planning_steps, 3)) |
| planning_world_all = np.zeros((1, self.planning_steps, 3)) |
| planning_mask_all = np.zeros((1, self.planning_steps, 2)) |
| n_valid_timestep = len(planning) |
| if n_valid_timestep > 0: |
| planning = [p.tensor.squeeze(0) for p in planning] |
| planning = np.stack(planning, axis=0) |
| |
| planning = planning[:, [0, 1, 6]] |
| planning_all[:, :n_valid_timestep, :] = planning |
| planning_mask_all[:, :n_valid_timestep, :] = 1 |
|
|
| planning_world = [p.squeeze(0) for p in planning_world] |
| planning_world = np.stack(planning_world, axis=0) |
| planning_world = planning_world[:, [0, 1, 2]] |
| planning_world_all[:, :n_valid_timestep, :] = planning_world |
| mask = planning_mask_all[0].any(axis=1) |
|
|
| |
| |
| if mask.sum() == 0: |
| command = 2 |
| |
| elif planning_all[0, mask][-1][0] >= 2: |
| command = 0 |
| elif planning_all[0, mask][-1][0] <= -2: |
| command = 1 |
| else: |
| command = 2 |
|
|
| |
| |
| sd_rec = self.nusc.get("sample", sample_token) |
| l2e_r_mat_init, l2e_t_init, e2g_r_mat_init, e2g_t_init = self.get_l2g_transform( |
| sd_rec |
| ) |
|
|
| |
| planning_past = [] |
| for _ in range(self.past_steps): |
| prev_annotation_token = sd_rec["prev"] |
| if prev_annotation_token == "": |
| break |
| sd_rec = self.nusc.get("sample", prev_annotation_token) |
| ( |
| l2e_r_mat_curr, |
| l2e_t_curr, |
| e2g_r_mat_curr, |
| e2g_t_curr, |
| ) = self.get_l2g_transform( |
| sd_rec |
| ) |
|
|
| |
| prev_bbox3d = self.generate_sdc_info( |
| self.sdc_vel_info[prev_annotation_token], as_lidar_instance3d_box=True |
| ) |
|
|
| |
| prev_bbox3d.rotate(l2e_r_mat_curr.T) |
| prev_bbox3d.translate(l2e_t_curr) |
|
|
| |
| prev_bbox3d.rotate(e2g_r_mat_curr.T) |
| prev_bbox3d.translate(e2g_t_curr) |
|
|
| |
| prev_bbox3d.translate(-e2g_t_init) |
| m1 = np.linalg.inv(e2g_r_mat_init) |
| prev_bbox3d.rotate(m1.T) |
|
|
| |
| prev_bbox3d.translate(-l2e_t_init) |
| m2 = np.linalg.inv(l2e_r_mat_init) |
| prev_bbox3d.rotate(m2.T) |
|
|
| planning_past.append(prev_bbox3d) |
|
|
| |
| planning_all_past = np.zeros((1, self.past_steps, 3)) |
| planning_mask_all_past = np.zeros((1, self.past_steps, 2)) |
| n_valid_timestep = len(planning_past) |
| if n_valid_timestep > 0: |
| planning_past = [p.tensor.squeeze(0) for p in planning_past] |
| planning_past = np.stack(planning_past, axis=0) |
| planning_past = planning_past[:, [0, 1, 6]] |
| planning_all_past[:, :n_valid_timestep, :] = planning_past |
| planning_mask_all_past[:, :n_valid_timestep, :] = 1 |
| |
| |
| planning_all_past = np.flip(planning_all_past, axis=1) |
| planning_mask_all_past = np.flip(planning_mask_all_past, axis=1) |
|
|
| return planning_all, planning_mask_all, command, planning_all_past, planning_mask_all_past, planning_world_all |
|
|