|
from enum import IntEnum
|
|
from typing import Any, Dict, List, Tuple
|
|
import cv2
|
|
import numpy as np
|
|
import numpy.typing as npt
|
|
|
|
import torch
|
|
from torchvision import transforms
|
|
|
|
from shapely import affinity
|
|
from shapely.geometry import Polygon, LineString
|
|
|
|
from nuplan.common.maps.abstract_map import AbstractMap, SemanticMapLayer, MapObject
|
|
from nuplan.common.actor_state.oriented_box import OrientedBox
|
|
from nuplan.common.actor_state.state_representation import StateSE2
|
|
from nuplan.common.actor_state.tracked_objects_types import TrackedObjectType
|
|
|
|
from navsim.agents.transfuser.transfuser_config import TransfuserConfig
|
|
from navsim.common.dataclasses import AgentInput, Scene, Annotations
|
|
from navsim.common.enums import BoundingBoxIndex, LidarIndex
|
|
from navsim.planning.scenario_builder.navsim_scenario_utils import tracked_object_types
|
|
from navsim.planning.training.abstract_feature_target_builder import (
|
|
AbstractFeatureBuilder,
|
|
AbstractTargetBuilder,
|
|
)
|
|
|
|
|
|
class TransfuserFeatureBuilder(AbstractFeatureBuilder):
|
|
def __init__(self, config: TransfuserConfig):
|
|
self._config = config
|
|
|
|
def get_unique_name(self) -> str:
|
|
"""Inherited, see superclass."""
|
|
return "transfuser_feature"
|
|
|
|
def compute_features(self, agent_input: AgentInput) -> Dict[str, torch.Tensor]:
|
|
"""Inherited, see superclass."""
|
|
features = {}
|
|
|
|
features["camera_feature"] = self._get_camera_feature(agent_input)
|
|
features["lidar_feature"] = self._get_lidar_feature(agent_input)
|
|
features["status_feature"] = torch.concatenate(
|
|
[
|
|
torch.tensor(agent_input.ego_statuses[-1].driving_command, dtype=torch.float32),
|
|
torch.tensor(agent_input.ego_statuses[-1].ego_velocity, dtype=torch.float32),
|
|
torch.tensor(agent_input.ego_statuses[-1].ego_acceleration, dtype=torch.float32),
|
|
],
|
|
)
|
|
|
|
return features
|
|
|
|
def _get_camera_feature(self, agent_input: AgentInput) -> torch.Tensor:
|
|
"""
|
|
Extract stitched camera from AgentInput
|
|
:param agent_input: input dataclass
|
|
:return: stitched front view image as torch tensor
|
|
"""
|
|
|
|
cameras = agent_input.cameras[-1]
|
|
|
|
|
|
l0 = cameras.cam_l0.image[28:-28, 416:-416]
|
|
f0 = cameras.cam_f0.image[28:-28]
|
|
r0 = cameras.cam_r0.image[28:-28, 416:-416]
|
|
|
|
|
|
stitched_image = np.concatenate([l0, f0, r0], axis=1)
|
|
resized_image = cv2.resize(stitched_image, (1024, 256))
|
|
tensor_image = transforms.ToTensor()(resized_image)
|
|
|
|
return tensor_image
|
|
|
|
def _get_lidar_feature(self, agent_input: AgentInput) -> torch.Tensor:
|
|
"""
|
|
Compute LiDAR feature as 2D histogram, according to Transfuser
|
|
:param agent_input: input dataclass
|
|
:return: LiDAR histogram as torch tensors
|
|
"""
|
|
|
|
|
|
lidar_pc = agent_input.lidars[-1].lidar_pc[LidarIndex.POSITION].T
|
|
|
|
|
|
|
|
def splat_points(point_cloud):
|
|
|
|
xbins = np.linspace(
|
|
self._config.lidar_min_x,
|
|
self._config.lidar_max_x,
|
|
(self._config.lidar_max_x - self._config.lidar_min_x)
|
|
* int(self._config.pixels_per_meter)
|
|
+ 1,
|
|
)
|
|
ybins = np.linspace(
|
|
self._config.lidar_min_y,
|
|
self._config.lidar_max_y,
|
|
(self._config.lidar_max_y - self._config.lidar_min_y)
|
|
* int(self._config.pixels_per_meter)
|
|
+ 1,
|
|
)
|
|
hist = np.histogramdd(point_cloud[:, :2], bins=(xbins, ybins))[0]
|
|
hist[hist > self._config.hist_max_per_pixel] = self._config.hist_max_per_pixel
|
|
overhead_splat = hist / self._config.hist_max_per_pixel
|
|
return overhead_splat
|
|
|
|
|
|
lidar_pc = lidar_pc[lidar_pc[..., 2] < self._config.max_height_lidar]
|
|
below = lidar_pc[lidar_pc[..., 2] <= self._config.lidar_split_height]
|
|
above = lidar_pc[lidar_pc[..., 2] > self._config.lidar_split_height]
|
|
above_features = splat_points(above)
|
|
if self._config.use_ground_plane:
|
|
below_features = splat_points(below)
|
|
features = np.stack([below_features, above_features], axis=-1)
|
|
else:
|
|
features = np.stack([above_features], axis=-1)
|
|
features = np.transpose(features, (2, 0, 1)).astype(np.float32)
|
|
|
|
return torch.tensor(features)
|
|
|
|
|
|
class TransfuserTargetBuilder(AbstractTargetBuilder):
|
|
def __init__(self, config: TransfuserConfig):
|
|
self._config = config
|
|
|
|
def get_unique_name(self) -> str:
|
|
"""Inherited, see superclass."""
|
|
return "transfuser_target"
|
|
|
|
def compute_targets(self, scene: Scene) -> Dict[str, torch.Tensor]:
|
|
"""Inherited, see superclass."""
|
|
|
|
trajectory = torch.tensor(
|
|
scene.get_future_trajectory(
|
|
num_trajectory_frames=self._config.trajectory_sampling.num_poses
|
|
).poses
|
|
)
|
|
frame_idx = scene.scene_metadata.num_history_frames - 1
|
|
annotations = scene.frames[frame_idx].annotations
|
|
ego_pose = StateSE2(*scene.frames[frame_idx].ego_status.ego_pose)
|
|
|
|
agent_states, agent_labels = self._compute_agent_targets(annotations)
|
|
bev_semantic_map = self._compute_bev_semantic_map(annotations, scene.map_api, ego_pose)
|
|
|
|
return {
|
|
"trajectory": trajectory,
|
|
"agent_states": agent_states,
|
|
"agent_labels": agent_labels,
|
|
"bev_semantic_map": bev_semantic_map,
|
|
}
|
|
|
|
def _compute_agent_targets(self, annotations: Annotations) -> Tuple[torch.Tensor, torch.Tensor]:
|
|
"""
|
|
Extracts 2D agent bounding boxes in ego coordinates
|
|
:param annotations: annotation dataclass
|
|
:return: tuple of bounding box values and labels (binary)
|
|
"""
|
|
|
|
max_agents = self._config.num_bounding_boxes
|
|
agent_states_list: List[npt.NDArray[np.float32]] = []
|
|
|
|
def _xy_in_lidar(x: float, y: float, config: TransfuserConfig) -> bool:
|
|
return (config.lidar_min_x <= x <= config.lidar_max_x) and (
|
|
config.lidar_min_y <= y <= config.lidar_max_y
|
|
)
|
|
|
|
for box, name in zip(annotations.boxes, annotations.names):
|
|
box_x, box_y, box_heading, box_length, box_width = (
|
|
box[BoundingBoxIndex.X],
|
|
box[BoundingBoxIndex.Y],
|
|
box[BoundingBoxIndex.HEADING],
|
|
box[BoundingBoxIndex.LENGTH],
|
|
box[BoundingBoxIndex.WIDTH],
|
|
)
|
|
|
|
if name == "vehicle" and _xy_in_lidar(box_x, box_y, self._config):
|
|
agent_states_list.append(
|
|
np.array([box_x, box_y, box_heading, box_length, box_width], dtype=np.float32)
|
|
)
|
|
|
|
agents_states_arr = np.array(agent_states_list)
|
|
|
|
|
|
agent_states = np.zeros((max_agents, BoundingBox2DIndex.size()), dtype=np.float32)
|
|
agent_labels = np.zeros(max_agents, dtype=bool)
|
|
|
|
if len(agents_states_arr) > 0:
|
|
distances = np.linalg.norm(agents_states_arr[..., BoundingBox2DIndex.POINT], axis=-1)
|
|
argsort = np.argsort(distances)[:max_agents]
|
|
|
|
|
|
agents_states_arr = agents_states_arr[argsort]
|
|
agent_states[: len(agents_states_arr)] = agents_states_arr
|
|
agent_labels[: len(agents_states_arr)] = True
|
|
|
|
return torch.tensor(agent_states), torch.tensor(agent_labels)
|
|
|
|
def _compute_bev_semantic_map(
|
|
self, annotations: Annotations, map_api: AbstractMap, ego_pose: StateSE2
|
|
) -> torch.Tensor:
|
|
"""
|
|
Creates sematic map in BEV
|
|
:param annotations: annotation dataclass
|
|
:param map_api: map interface of nuPlan
|
|
:param ego_pose: ego pose in global frame
|
|
:return: 2D torch tensor of semantic labels
|
|
"""
|
|
|
|
bev_semantic_map = np.zeros(self._config.bev_semantic_frame, dtype=np.int64)
|
|
for label, (entity_type, layers) in self._config.bev_semantic_classes.items():
|
|
if entity_type == "polygon":
|
|
entity_mask = self._compute_map_polygon_mask(map_api, ego_pose, layers)
|
|
elif entity_type == "linestring":
|
|
entity_mask = self._compute_map_linestring_mask(map_api, ego_pose, layers)
|
|
else:
|
|
entity_mask = self._compute_box_mask(annotations, layers)
|
|
bev_semantic_map[entity_mask] = label
|
|
|
|
return torch.Tensor(bev_semantic_map)
|
|
|
|
def _compute_map_polygon_mask(
|
|
self, map_api: AbstractMap, ego_pose: StateSE2, layers: List[SemanticMapLayer]
|
|
) -> npt.NDArray[np.bool_]:
|
|
"""
|
|
Compute binary mask given a map layer class
|
|
:param map_api: map interface of nuPlan
|
|
:param ego_pose: ego pose in global frame
|
|
:param layers: map layers
|
|
:return: binary mask as numpy array
|
|
"""
|
|
|
|
map_object_dict = map_api.get_proximal_map_objects(
|
|
point=ego_pose.point, radius=self._config.bev_radius, layers=layers
|
|
)
|
|
map_polygon_mask = np.zeros(self._config.bev_semantic_frame[::-1], dtype=np.uint8)
|
|
for layer in layers:
|
|
for map_object in map_object_dict[layer]:
|
|
polygon: Polygon = self._geometry_local_coords(map_object.polygon, ego_pose)
|
|
exterior = np.array(polygon.exterior.coords).reshape((-1, 1, 2))
|
|
exterior = self._coords_to_pixel(exterior)
|
|
cv2.fillPoly(map_polygon_mask, [exterior], color=255)
|
|
|
|
map_polygon_mask = np.rot90(map_polygon_mask)[::-1]
|
|
return map_polygon_mask > 0
|
|
|
|
def _compute_map_linestring_mask(
|
|
self, map_api: AbstractMap, ego_pose: StateSE2, layers: List[SemanticMapLayer]
|
|
) -> npt.NDArray[np.bool_]:
|
|
"""
|
|
Compute binary of linestring given a map layer class
|
|
:param map_api: map interface of nuPlan
|
|
:param ego_pose: ego pose in global frame
|
|
:param layers: map layers
|
|
:return: binary mask as numpy array
|
|
"""
|
|
map_object_dict = map_api.get_proximal_map_objects(
|
|
point=ego_pose.point, radius=self._config.bev_radius, layers=layers
|
|
)
|
|
map_linestring_mask = np.zeros(self._config.bev_semantic_frame[::-1], dtype=np.uint8)
|
|
for layer in layers:
|
|
for map_object in map_object_dict[layer]:
|
|
linestring: LineString = self._geometry_local_coords(
|
|
map_object.baseline_path.linestring, ego_pose
|
|
)
|
|
points = np.array(linestring.coords).reshape((-1, 1, 2))
|
|
points = self._coords_to_pixel(points)
|
|
cv2.polylines(map_linestring_mask, [points], isClosed=False, color=255, thickness=2)
|
|
|
|
map_linestring_mask = np.rot90(map_linestring_mask)[::-1]
|
|
return map_linestring_mask > 0
|
|
|
|
def _compute_box_mask(
|
|
self, annotations: Annotations, layers: TrackedObjectType
|
|
) -> npt.NDArray[np.bool_]:
|
|
"""
|
|
Compute binary of bounding boxes in BEV space
|
|
:param annotations: annotation dataclass
|
|
:param layers: bounding box labels to include
|
|
:return: binary mask as numpy array
|
|
"""
|
|
box_polygon_mask = np.zeros(self._config.bev_semantic_frame[::-1], dtype=np.uint8)
|
|
for name_value, box_value in zip(annotations.names, annotations.boxes):
|
|
agent_type = tracked_object_types[name_value]
|
|
if agent_type in layers:
|
|
|
|
x, y, heading = box_value[0], box_value[1], box_value[-1]
|
|
box_length, box_width, box_height = box_value[3], box_value[4], box_value[5]
|
|
agent_box = OrientedBox(StateSE2(x, y, heading), box_length, box_width, box_height)
|
|
exterior = np.array(agent_box.geometry.exterior.coords).reshape((-1, 1, 2))
|
|
exterior = self._coords_to_pixel(exterior)
|
|
cv2.fillPoly(box_polygon_mask, [exterior], color=255)
|
|
|
|
box_polygon_mask = np.rot90(box_polygon_mask)[::-1]
|
|
return box_polygon_mask > 0
|
|
|
|
@staticmethod
|
|
def _query_map_objects(
|
|
self, map_api: AbstractMap, ego_pose: StateSE2, layers: List[SemanticMapLayer]
|
|
) -> List[MapObject]:
|
|
"""
|
|
Queries map objects
|
|
:param map_api: map interface of nuPlan
|
|
:param ego_pose: ego pose in global frame
|
|
:param layers: map layers
|
|
:return: list of map objects
|
|
"""
|
|
|
|
|
|
map_object_dict = map_api.get_proximal_map_objects(
|
|
point=ego_pose.point, radius=self, layers=layers
|
|
)
|
|
map_objects: List[MapObject] = []
|
|
for layer in layers:
|
|
map_objects += map_object_dict[layer]
|
|
return map_objects
|
|
|
|
@staticmethod
|
|
def _geometry_local_coords(geometry: Any, origin: StateSE2) -> Any:
|
|
"""
|
|
Transform shapely geometry in local coordinates of origin.
|
|
:param geometry: shapely geometry
|
|
:param origin: pose dataclass
|
|
:return: shapely geometry
|
|
"""
|
|
|
|
a = np.cos(origin.heading)
|
|
b = np.sin(origin.heading)
|
|
d = -np.sin(origin.heading)
|
|
e = np.cos(origin.heading)
|
|
xoff = -origin.x
|
|
yoff = -origin.y
|
|
|
|
translated_geometry = affinity.affine_transform(geometry, [1, 0, 0, 1, xoff, yoff])
|
|
rotated_geometry = affinity.affine_transform(translated_geometry, [a, b, d, e, 0, 0])
|
|
|
|
return rotated_geometry
|
|
|
|
def _coords_to_pixel(self, coords):
|
|
"""
|
|
Transform local coordinates in pixel indices of BEV map
|
|
:param coords: _description_
|
|
:return: _description_
|
|
"""
|
|
|
|
|
|
pixel_center = np.array([[0, self._config.bev_pixel_width / 2.0]])
|
|
coords_idcs = (coords / self._config.bev_pixel_size) + pixel_center
|
|
|
|
return coords_idcs.astype(np.int32)
|
|
|
|
|
|
class BoundingBox2DIndex(IntEnum):
|
|
|
|
_X = 0
|
|
_Y = 1
|
|
_HEADING = 2
|
|
_LENGTH = 3
|
|
_WIDTH = 4
|
|
|
|
@classmethod
|
|
def size(cls):
|
|
valid_attributes = [
|
|
attribute
|
|
for attribute in dir(cls)
|
|
if attribute.startswith("_")
|
|
and not attribute.startswith("__")
|
|
and not callable(getattr(cls, attribute))
|
|
]
|
|
return len(valid_attributes)
|
|
|
|
@classmethod
|
|
@property
|
|
def X(cls):
|
|
return cls._X
|
|
|
|
@classmethod
|
|
@property
|
|
def Y(cls):
|
|
return cls._Y
|
|
|
|
@classmethod
|
|
@property
|
|
def HEADING(cls):
|
|
return cls._HEADING
|
|
|
|
@classmethod
|
|
@property
|
|
def LENGTH(cls):
|
|
return cls._LENGTH
|
|
|
|
@classmethod
|
|
@property
|
|
def WIDTH(cls):
|
|
return cls._WIDTH
|
|
|
|
@classmethod
|
|
@property
|
|
def POINT(cls):
|
|
|
|
return slice(cls._X, cls._Y + 1)
|
|
|
|
@classmethod
|
|
@property
|
|
def STATE_SE2(cls):
|
|
|
|
return slice(cls._X, cls._HEADING + 1)
|
|
|