| from __future__ import annotations
|
|
|
| from typing import List, Dict, Any
|
|
|
| import cv2
|
| import numpy as np
|
| import numpy.typing as npt
|
| import torch
|
| from nuplan.common.actor_state.state_representation import StateSE2
|
| from nuplan.common.maps.abstract_map import AbstractMap
|
| from nuplan.common.maps.maps_datatypes import SemanticMapLayer
|
| from shapely import affinity, LineString
|
| from shapely.geometry import Polygon
|
|
|
| from det_map.data.datasets.dataclasses import Scene
|
| from navsim.planning.training.abstract_feature_target_builder import AbstractTargetBuilder
|
| import networkx as nx
|
|
|
| from mmcv.parallel import DataContainer as DC
|
|
|
| class LiDARInstanceLines(object):
|
| """Line instance in LIDAR coordinates"""
|
|
|
| def __init__(self,
|
| instance_line_list,
|
| instance_labels,
|
| sample_dist=1,
|
| num_samples=250,
|
| padding=False,
|
| fixed_num=-1,
|
| padding_value=-10000,
|
| patch_size=None):
|
| assert isinstance(instance_line_list, list)
|
| assert patch_size is not None
|
| if len(instance_line_list) != 0:
|
| assert isinstance(instance_line_list[0], LineString)
|
| self.patch_size = patch_size
|
| self.max_x = self.patch_size / 2
|
| self.max_y = self.patch_size / 2
|
| self.sample_dist = sample_dist
|
| self.num_samples = num_samples
|
| self.padding = padding
|
| self.fixed_num = fixed_num
|
| self.padding_value = padding_value
|
|
|
| self.instance_list = instance_line_list
|
| self.instance_labels = instance_labels
|
|
|
| @property
|
| def fixed_num_sampled_points(self):
|
| """
|
| return torch.Tensor([N,fixed_num,2]), in xmin, ymin, xmax, ymax form
|
| N means the num of instances
|
| """
|
| assert len(self.instance_list) != 0
|
| instance_points_list = []
|
| for instance in self.instance_list:
|
| distances = np.linspace(0, instance.length, self.fixed_num)
|
| sampled_points = np.array([list(instance.interpolate(distance).coords) for distance in distances]).reshape(
|
| -1, 2)
|
| instance_points_list.append(sampled_points)
|
| instance_points_array = np.array(instance_points_list)
|
| instance_points_tensor = torch.tensor(instance_points_array)
|
| instance_points_tensor = instance_points_tensor.to(
|
| dtype=torch.float32)
|
| instance_points_tensor[:, :, 0] = torch.clamp(instance_points_tensor[:, :, 0], min=-self.max_x, max=self.max_x)
|
| instance_points_tensor[:, :, 1] = torch.clamp(instance_points_tensor[:, :, 1], min=-self.max_y, max=self.max_y)
|
| return instance_points_tensor
|
|
|
| @property
|
| def shift_fixed_num_sampled_points_v2(self):
|
| """
|
| return [instances_num, num_shifts, fixed_num, 2]
|
| """
|
| assert len(self.instance_list) != 0
|
| instances_list = []
|
| for idx, instance in enumerate(self.instance_list):
|
|
|
|
|
| distances = np.linspace(0, instance.length, self.fixed_num)
|
| poly_pts = np.array(list(instance.coords))
|
| start_pts = poly_pts[0]
|
| end_pts = poly_pts[-1]
|
| is_poly = np.equal(start_pts, end_pts)
|
| is_poly = is_poly.all()
|
| shift_pts_list = []
|
| pts_num, coords_num = poly_pts.shape
|
| shift_num = pts_num - 1
|
| final_shift_num = self.fixed_num - 1
|
|
|
|
|
|
|
| sampled_points = np.array(
|
| [list(instance.interpolate(distance).coords) for distance in distances]).reshape(-1, 2)
|
| shift_pts_list.append(sampled_points)
|
|
|
|
|
| multi_shifts_pts = np.stack(shift_pts_list, axis=0)
|
| shifts_num, _, _ = multi_shifts_pts.shape
|
|
|
| if shifts_num > final_shift_num:
|
| index = np.random.choice(multi_shifts_pts.shape[0], final_shift_num, replace=False)
|
| multi_shifts_pts = multi_shifts_pts[index]
|
|
|
| multi_shifts_pts_tensor = torch.tensor(multi_shifts_pts)
|
| multi_shifts_pts_tensor = multi_shifts_pts_tensor.to(
|
| dtype=torch.float32)
|
|
|
| multi_shifts_pts_tensor[:, :, 0] = torch.clamp(multi_shifts_pts_tensor[:, :, 0], min=-self.max_x,
|
| max=self.max_x)
|
| multi_shifts_pts_tensor[:, :, 1] = torch.clamp(multi_shifts_pts_tensor[:, :, 1], min=-self.max_y,
|
| max=self.max_y)
|
|
|
| if multi_shifts_pts_tensor.shape[0] < final_shift_num:
|
| padding = torch.full([final_shift_num - multi_shifts_pts_tensor.shape[0], self.fixed_num, 2],
|
| self.padding_value)
|
| multi_shifts_pts_tensor = torch.cat([multi_shifts_pts_tensor, padding], dim=0)
|
| instances_list.append(multi_shifts_pts_tensor)
|
| instances_tensor = torch.stack(instances_list, dim=0)
|
| instances_tensor = instances_tensor.to(
|
| dtype=torch.float32)
|
| return instances_tensor
|
|
|
|
|
| class MapTargetBuilder(AbstractTargetBuilder):
|
| def __init__(self):
|
| super().__init__()
|
| lidar_resolution_width = 256
|
| lidar_resolution_height = 256
|
| self.dense_layers: List[SemanticMapLayer] = [
|
| SemanticMapLayer.DRIVABLE_AREA,
|
| SemanticMapLayer.CROSSWALK
|
| ]
|
| self.dense_layers_labels = [
|
| 1, 2
|
| ]
|
|
|
| self.discrete_layers: List[SemanticMapLayer] = [
|
| SemanticMapLayer.LANE,
|
| SemanticMapLayer.LANE_CONNECTOR,
|
| ]
|
|
|
| self.radius = 32.0
|
| self.bev_pixel_width: int = lidar_resolution_width
|
| self.bev_pixel_height: int = lidar_resolution_height
|
| self.bev_pixel_size: float = 0.25
|
| self.bev_semantic_frame = (self.bev_pixel_height, self.bev_pixel_width)
|
| self.padding_value = -10000
|
| self.sample_dist = 1
|
| self.num_samples = 250
|
| self.padding = False
|
| self.fixed_num = 20
|
|
|
| def _geometry_local_coords(self, geometry: Any, origin: StateSE2) -> Any:
|
| """
|
| Transform shapely geometry in local coordinates of origin.
|
| :param geometry: shapely geometry
|
| :param origin: pose dataclass
|
| :return: shapely geometry
|
| """
|
|
|
| a = np.cos(origin.heading)
|
| b = np.sin(origin.heading)
|
| d = -np.sin(origin.heading)
|
| e = np.cos(origin.heading)
|
| xoff = -origin.x
|
| yoff = -origin.y
|
|
|
| translated_geometry = affinity.affine_transform(geometry, [1, 0, 0, 1, xoff, yoff])
|
| rotated_geometry = affinity.affine_transform(translated_geometry, [a, b, d, e, 0, 0])
|
|
|
| return rotated_geometry
|
|
|
| def _coords_to_pixel(self, coords):
|
| """
|
| Transform local coordinates in pixel indices of BEV map
|
| :param coords: _description_
|
| :return: _description_
|
| """
|
|
|
|
|
| pixel_center = np.array([[0, self.bev_pixel_width / 2.0]])
|
| coords_idcs = (coords / self.bev_pixel_size) + pixel_center
|
|
|
| return coords_idcs.astype(np.int32)
|
|
|
| def _compute_map_polygon_mask(
|
| self, map_api: AbstractMap, ego_pose: StateSE2, layers: List[SemanticMapLayer]
|
| ) -> npt.NDArray[np.bool_]:
|
| """
|
| Compute binary mask given a map layer class
|
| :param map_api: map interface of nuPlan
|
| :param ego_pose: ego pose in global frame
|
| :param layers: map layers
|
| :return: binary mask as numpy array
|
| """
|
|
|
| map_object_dict = map_api.get_proximal_map_objects(
|
| point=ego_pose.point, radius=self.radius, layers=layers
|
| )
|
| map_polygon_mask = np.zeros(self.bev_semantic_frame[::-1], dtype=np.uint8)
|
| for layer in layers:
|
| for map_object in map_object_dict[layer]:
|
| polygon: Polygon = self._geometry_local_coords(map_object.polygon, ego_pose)
|
| exterior = np.array(polygon.exterior.coords).reshape((-1, 1, 2))
|
| exterior = self._coords_to_pixel(exterior)
|
| cv2.fillPoly(map_polygon_mask, [exterior], color=255)
|
|
|
| map_polygon_mask = np.rot90(map_polygon_mask)[::-1]
|
| return map_polygon_mask > 0
|
|
|
| def _compute_map_linestrings(
|
| self, map_api: AbstractMap, ego_pose: StateSE2, layers: List[SemanticMapLayer]
|
| ) -> npt.NDArray[np.bool_]:
|
| """
|
| Compute binary of linestring given a map layer class
|
| :param map_api: map interface of nuPlan
|
| :param ego_pose: ego pose in global frame
|
| :param layers: map layers
|
| :return: binary mask as numpy array
|
| """
|
| map_object_dict = map_api.get_proximal_map_objects(
|
| point=ego_pose.point, radius=self.radius, layers=layers
|
| )
|
| something = []
|
| incoming_something = []
|
| outcoming_something = []
|
| for layer in layers:
|
| for map_object in map_object_dict[layer]:
|
| linestring: LineString = self._geometry_local_coords(
|
| map_object.baseline_path.linestring, ego_pose
|
| )
|
| something.append(linestring)
|
| for incoming_edge in map_object.incoming_edges:
|
| incomingstring: LineString = self._geometry_local_coords(
|
| incoming_edge.baseline_path.linestring, ego_pose
|
| )
|
| incoming_something.append(incomingstring)
|
|
|
| for outgoing_edge in map_object.outgoing_edges:
|
| outcomingstring: LineString = self._geometry_local_coords(
|
| outgoing_edge.baseline_path.linestring, ego_pose
|
| )
|
| outcoming_something.append(outcomingstring)
|
|
|
| points = np.array(linestring.coords).reshape((-1, 1, 2))
|
|
|
| return something, incoming_something, outcoming_something
|
|
|
| def union_centerline(self, centerline_list, incoming_list, outcoming_list):
|
| pts_G = nx.DiGraph()
|
| junction_pts_list = []
|
| start_pt = np.array(centerline_list[0].coords).round(3)[0]
|
| end_pt = np.array(centerline_list[-1].coords).round(3)[-1]
|
| for centerline_geom in centerline_list:
|
| centerline_pts = np.array(centerline_geom.coords).round(3)
|
| start_pt = centerline_pts[0]
|
| end_pt = centerline_pts[-1]
|
| for idx, pts in enumerate(centerline_pts[:-1]):
|
| pts_G.add_edge(tuple(centerline_pts[idx]),tuple(centerline_pts[idx+1]))
|
|
|
| valid_incoming_num = 0
|
| for pred_geom in incoming_list:
|
| valid_incoming_num += 1
|
| pred_pt = np.array(pred_geom.coords).round(3)[-1]
|
| pts_G.add_edge(tuple(pred_pt), tuple(start_pt))
|
|
|
| valid_outgoing_num = 0
|
| for succ_geom in outcoming_list:
|
| valid_outgoing_num += 1
|
| succ_pt = np.array(succ_geom.coords).round(3)[0]
|
| pts_G.add_edge(tuple(end_pt), tuple(succ_pt))
|
|
|
| roots = (v for v, d in pts_G.in_degree() if d == 0)
|
| leaves = [v for v, d in pts_G.out_degree() if d == 0]
|
| all_paths = []
|
| for root in roots:
|
| paths = nx.all_simple_paths(pts_G, root, leaves)
|
| all_paths.extend(paths)
|
| final_centerline_paths = []
|
| for path in all_paths:
|
| merged_line = LineString(path)
|
| merged_line = merged_line.simplify(0.2, preserve_topology=True)
|
| final_centerline_paths.append(merged_line)
|
| return final_centerline_paths
|
|
|
| def compute_targets(self, scene: Scene) -> Dict[str, torch.Tensor]:
|
| map_api = scene.map_api
|
| ego_statuses = [frame.ego_status for frame in scene.frames]
|
| ego2globals = [frame.ego2global for frame in scene.frames]
|
|
|
| ego_status_curr = StateSE2(*ego_statuses[-1].ego_pose)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| map_dict = {'centerline': []}
|
| line_strings, incoming_line_strings, outcoming_line_strings = self._compute_map_linestrings(map_api, ego_status_curr, list(self.discrete_layers))
|
| centerline_list = self.union_centerline(line_strings, incoming_line_strings, outcoming_line_strings)
|
| for instance in centerline_list:
|
| map_dict['centerline'].append(np.array(instance.coords))
|
|
|
| vectors = []
|
| gt_labels = []
|
| gt_instance = []
|
| instance_list = map_dict['centerline']
|
| for instance in instance_list:
|
| vectors.append(LineString(np.array(instance)))
|
| for instance in vectors:
|
| gt_instance.append(instance)
|
| gt_labels.append(0)
|
| gt_semantic_mask=None
|
| gt_pv_semantic_mask=None
|
| gt_labels = torch.tensor(gt_labels)
|
|
|
|
|
| gt_instance = LiDARInstanceLines(gt_instance, gt_labels, self.sample_dist, self.num_samples,
|
| self.padding, self.fixed_num, self.padding_value, patch_size=self.radius * 2)
|
|
|
|
|
| return {"dense_el": None,
|
| "gt_bboxes_3d": gt_instance,
|
| "gt_labels_3d": gt_labels}
|
|
|