| from __future__ import annotations |
|
|
| import json |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Any |
|
|
| import cv2 |
| import numpy as np |
| import pandas as pd |
| from PIL import Image |
|
|
|
|
| |
| |
| TABLE_CORNERS_PX = np.array( |
| [ |
| [192.0, 138.0], |
| [1635.0, 46.0], |
| [1712.0, 787.0], |
| [185.0, 858.0], |
| ], |
| dtype=np.float64, |
| ) |
|
|
| |
| TABLE_SIZE_M = (1.52, 0.76) |
|
|
| |
| CAMERA_FOCAL_PX = 650.0 |
| ROBOT_BASE_WORLD = np.array([0.50, 0.45, -0.45], dtype=np.float64) |
| ROBOT_BASE_YAW_RAD = -2.0 |
| DEFAULT_ROBOT_WORLD_RVEC = np.array([0.06924932, -0.1712883, -1.95079235], dtype=np.float64) |
| DEFAULT_ROBOT_WORLD_TVEC = np.array([0.50851769, 0.49565435, -0.50654742], dtype=np.float64) |
|
|
| |
| TOOL_OFFSET_M = 0.12 |
|
|
|
|
| @dataclass(frozen=True) |
| class SceneCalibration: |
| session_root: Path |
| sync_row_index: int |
| azure_rgb_seq: int |
| azure_depth_seq: int |
| robot_seq: int |
| rgb_path: Path |
| depth_path: Path |
| rgb: np.ndarray |
| depth: np.ndarray |
| sync_row: dict[str, Any] |
| camera_matrix: np.ndarray |
| rvec: np.ndarray |
| tvec: np.ndarray |
| table_corners_px: np.ndarray |
| table_size_m: tuple[float, float] |
| robot_base_world: np.ndarray |
| robot_base_yaw_rad: float |
| robot_world_rvec: np.ndarray |
| robot_world_tvec: np.ndarray |
| box_mask: np.ndarray |
| teddy_mask: np.ndarray |
| box_world_polygon: np.ndarray |
| teddy_world_center: np.ndarray |
| teddy_world_extent: np.ndarray |
| table_depth_mm: float |
| box_height_m: float |
| teddy_height_m: float |
|
|
|
|
| def _to_rgb(path: Path) -> np.ndarray: |
| return np.array(Image.open(path).convert("RGB")) |
|
|
|
|
| def _to_depth(path: Path) -> np.ndarray: |
| return np.array(Image.open(path)) |
|
|
|
|
| def load_sync_dataframe(session_root: Path) -> pd.DataFrame: |
| return pd.read_csv(session_root / "sync_index.csv") |
|
|
|
|
| def load_scene_calibration(session_root: str | Path, sync_row_index: int = 0) -> SceneCalibration: |
| session_root = Path(session_root) |
| sync = load_sync_dataframe(session_root) |
| row = sync.iloc[sync_row_index] |
|
|
| azure_rgb_seq = int(row["azure_rgb_seq"]) |
| azure_depth_seq = int(row["azure_depth_seq"]) |
| robot_seq = int(row["robot_seq"]) |
|
|
| rgb_path = session_root / row["azure_rgb_file"] |
| depth_path = session_root / row["azure_depth_file"] |
| rgb = _to_rgb(rgb_path) |
| depth = _to_depth(depth_path) |
|
|
| camera_matrix, rvec, tvec = solve_table_camera(TABLE_CORNERS_PX, TABLE_SIZE_M, CAMERA_FOCAL_PX) |
| box_mask = detect_red_box(rgb) |
| teddy_mask = detect_teddy(rgb) |
| box_world_polygon = image_mask_to_world_polygon(box_mask, camera_matrix, rvec, tvec) |
| teddy_world_center, teddy_world_extent = mask_centroid_and_extent_world(teddy_mask, camera_matrix, rvec, tvec) |
|
|
| table_depth_mm = estimate_table_depth_mm(depth) |
| box_height_m = estimate_height_from_depth(rgb, depth, box_mask, table_depth_mm) |
| teddy_height_m = estimate_height_from_depth(rgb, depth, teddy_mask, table_depth_mm) |
|
|
| return SceneCalibration( |
| session_root=session_root, |
| sync_row_index=sync_row_index, |
| azure_rgb_seq=azure_rgb_seq, |
| azure_depth_seq=azure_depth_seq, |
| robot_seq=robot_seq, |
| rgb_path=rgb_path, |
| depth_path=depth_path, |
| rgb=rgb, |
| depth=depth, |
| sync_row=row.to_dict(), |
| camera_matrix=camera_matrix, |
| rvec=rvec, |
| tvec=tvec, |
| table_corners_px=TABLE_CORNERS_PX.copy(), |
| table_size_m=TABLE_SIZE_M, |
| robot_base_world=ROBOT_BASE_WORLD.copy(), |
| robot_base_yaw_rad=float(ROBOT_BASE_YAW_RAD), |
| robot_world_rvec=DEFAULT_ROBOT_WORLD_RVEC.copy(), |
| robot_world_tvec=DEFAULT_ROBOT_WORLD_TVEC.copy(), |
| box_mask=box_mask, |
| teddy_mask=teddy_mask, |
| box_world_polygon=box_world_polygon, |
| teddy_world_center=teddy_world_center, |
| teddy_world_extent=teddy_world_extent, |
| table_depth_mm=table_depth_mm, |
| box_height_m=box_height_m, |
| teddy_height_m=teddy_height_m, |
| ) |
|
|
|
|
| def solve_table_camera( |
| table_corners_px: np.ndarray, table_size_m: tuple[float, float], focal_px: float |
| ) -> tuple[np.ndarray, np.ndarray, np.ndarray]: |
| width_m, height_m = table_size_m |
| object_points = np.array( |
| [ |
| [-width_m / 2, -height_m / 2, 0.0], |
| [width_m / 2, -height_m / 2, 0.0], |
| [width_m / 2, height_m / 2, 0.0], |
| [-width_m / 2, height_m / 2, 0.0], |
| ], |
| dtype=np.float64, |
| ) |
| camera_matrix = np.array( |
| [[focal_px, 0.0, 960.0], [0.0, focal_px, 540.0], [0.0, 0.0, 1.0]], |
| dtype=np.float64, |
| ) |
| ok, rvec, tvec = cv2.solvePnP(object_points, table_corners_px, camera_matrix, None, flags=cv2.SOLVEPNP_IPPE) |
| if not ok: |
| raise RuntimeError("solvePnP failed for table camera fit") |
| return camera_matrix, rvec, tvec |
|
|
|
|
| def detect_red_box(rgb: np.ndarray) -> np.ndarray: |
| hsv = cv2.cvtColor(rgb, cv2.COLOR_RGB2HSV) |
| h, s, v = hsv[..., 0], hsv[..., 1], hsv[..., 2] |
| mask = (((h < 12) | (h > 170)) & (s > 60) & (v > 80)).astype(np.uint8) |
| num, labels, stats, centers = cv2.connectedComponentsWithStats(mask, connectivity=8) |
| best = None |
| best_area = -1 |
| for i in range(1, num): |
| x, y, w, h_box, area = stats[i] |
| cx, cy = centers[i] |
| if area > 50_000 and x > 900 and y < 700 and area > best_area: |
| best_area = int(area) |
| best = labels == i |
| if best is None: |
| raise RuntimeError("failed to detect red box component") |
| return best |
|
|
|
|
| def detect_teddy(rgb: np.ndarray) -> np.ndarray: |
| hsv = cv2.cvtColor(rgb, cv2.COLOR_RGB2HSV) |
| h, s, v = hsv[..., 0], hsv[..., 1], hsv[..., 2] |
| mask = ((h > 5) & (h < 22) & (s > 15) & (s < 120) & (v > 110) & (v < 245)).astype(np.uint8) |
| num, labels, stats, centers = cv2.connectedComponentsWithStats(mask, connectivity=8) |
| best = None |
| best_area = -1 |
| for i in range(1, num): |
| x, y, w, h_box, area = stats[i] |
| cx, cy = centers[i] |
| if 5_000 < area < 50_000 and 850 < cx < 1250 and 350 < cy < 700 and area > best_area: |
| best_area = int(area) |
| best = labels == i |
| if best is None: |
| raise RuntimeError("failed to detect teddy component") |
| return best |
|
|
|
|
| def estimate_table_depth_mm(depth: np.ndarray) -> float: |
| center_region = depth[150:430, 120:520] |
| valid = center_region[center_region > 0] |
| if len(valid) == 0: |
| raise RuntimeError("no valid table depth values") |
| return float(np.percentile(valid, 50)) |
|
|
|
|
| def estimate_height_from_depth(rgb: np.ndarray, depth: np.ndarray, rgb_mask: np.ndarray, table_depth_mm: float) -> float: |
| ys, xs = np.where(rgb_mask) |
| if len(xs) == 0: |
| return 0.0 |
| xd = np.clip((xs * depth.shape[1] / rgb.shape[1]).astype(int), 0, depth.shape[1] - 1) |
| yd = np.clip((ys * depth.shape[0] / rgb.shape[0]).astype(int), 0, depth.shape[0] - 1) |
| values = depth[yd, xd] |
| values = values[values > 0] |
| if len(values) == 0: |
| return 0.0 |
| |
| object_depth_mm = float(np.percentile(values, 10)) |
| return max(0.0, (table_depth_mm - object_depth_mm) / 1000.0) |
|
|
|
|
| def image_mask_to_world_polygon(mask: np.ndarray, camera_matrix: np.ndarray, rvec: np.ndarray, tvec: np.ndarray) -> np.ndarray: |
| points = np.column_stack(np.where(mask > 0))[:, ::-1].astype(np.float64) |
| rect = cv2.minAreaRect(points.astype(np.float32)) |
| corners = cv2.boxPoints(rect).astype(np.float64) |
| return pixels_to_world_on_table(corners, camera_matrix, rvec, tvec) |
|
|
|
|
| def mask_centroid_and_extent_world( |
| mask: np.ndarray, camera_matrix: np.ndarray, rvec: np.ndarray, tvec: np.ndarray |
| ) -> tuple[np.ndarray, np.ndarray]: |
| points = np.column_stack(np.where(mask > 0))[:, ::-1].astype(np.float64) |
| world_points = pixels_to_world_on_table(points, camera_matrix, rvec, tvec) |
| center = world_points.mean(axis=0) |
| extent = world_points.max(axis=0) - world_points.min(axis=0) |
| return center, extent |
|
|
|
|
| def pixels_to_world_on_table( |
| pixels_uv: np.ndarray, camera_matrix: np.ndarray, rvec: np.ndarray, tvec: np.ndarray |
| ) -> np.ndarray: |
| rotation, _ = cv2.Rodrigues(rvec) |
| camera_position = (-rotation.T @ tvec).reshape(3) |
| inv_camera = np.linalg.inv(camera_matrix) |
| world_points = [] |
| for u, v in pixels_uv: |
| ray_cam = inv_camera @ np.array([u, v, 1.0], dtype=np.float64) |
| ray_cam /= np.linalg.norm(ray_cam) |
| ray_world = rotation.T @ ray_cam |
| if abs(ray_world[2]) < 1e-8: |
| continue |
| scale = -camera_position[2] / ray_world[2] |
| world_points.append(camera_position + scale * ray_world) |
| if not world_points: |
| raise RuntimeError("failed to back-project pixels to the table plane") |
| return np.stack(world_points, axis=0) |
|
|
|
|
| def create_background_inpaint(rgb: np.ndarray) -> np.ndarray: |
| mask = np.zeros(rgb.shape[:2], dtype=np.uint8) |
| polygon = np.array( |
| [[1080, 520], [1919, 520], [1919, 1079], [980, 1079], [980, 850], [1080, 760]], |
| dtype=np.int32, |
| ) |
| cv2.fillPoly(mask, [polygon], 255) |
| cv2.rectangle(mask, (1120, 300), (1450, 680), 0, -1) |
| cv2.rectangle(mask, (860, 350), (1180, 700), 0, -1) |
| return cv2.cvtColor(cv2.inpaint(cv2.cvtColor(rgb, cv2.COLOR_RGB2BGR), mask, 7, cv2.INPAINT_TELEA), cv2.COLOR_BGR2RGB) |
|
|
|
|
| def kinova_fk_points_world(q_deg: np.ndarray, base_world: np.ndarray, base_yaw_rad: float) -> np.ndarray: |
| points, _ = kinova_fk_points_and_tool_pose(q_deg) |
| cy, sy = np.cos(base_yaw_rad), np.sin(base_yaw_rad) |
| base_rotation = np.array([[cy, -sy, 0.0], [sy, cy, 0.0], [0.0, 0.0, 1.0]], dtype=np.float64) |
| return points @ base_rotation.T + base_world.reshape(1, 3) |
|
|
|
|
| def transform_robot_points(points_robot: np.ndarray, robot_world_rvec: np.ndarray, robot_world_tvec: np.ndarray) -> np.ndarray: |
| rotation, _ = cv2.Rodrigues(np.asarray(robot_world_rvec, dtype=np.float64).reshape(3, 1)) |
| translation = np.asarray(robot_world_tvec, dtype=np.float64).reshape(1, 3) |
| return np.asarray(points_robot, dtype=np.float64) @ rotation.T + translation |
|
|
|
|
| def robot_tool_position_world( |
| tool_position_robot: np.ndarray, robot_world_rvec: np.ndarray, robot_world_tvec: np.ndarray |
| ) -> np.ndarray: |
| return transform_robot_points(np.asarray(tool_position_robot, dtype=np.float64).reshape(1, 3), robot_world_rvec, robot_world_tvec)[0] |
|
|
|
|
| def kinova_fk_points_and_tool_pose(q_deg: np.ndarray) -> tuple[np.ndarray, np.ndarray]: |
| q = np.deg2rad(np.asarray(q_deg, dtype=np.float64)) |
| params = [ |
| (np.pi, 0.0, 0.0, 0.0), |
| (np.pi / 2, 0.0, -(0.1564 + 0.1284), q[0]), |
| (np.pi / 2, 0.0, -(0.0054 + 0.0064), q[1] + np.pi), |
| (np.pi / 2, 0.0, -(0.2104 + 0.2104), q[2] + np.pi), |
| (np.pi / 2, 0.0, -(0.0064 + 0.0064), q[3] + np.pi), |
| (np.pi / 2, 0.0, -(0.2084 + 0.1059), q[4] + np.pi), |
| (np.pi / 2, 0.0, 0.0, q[5] + np.pi), |
| (np.pi, 0.0, -(0.1059 + 0.0615), q[6] + np.pi), |
| ] |
|
|
| def dh(alpha: float, a: float, d: float, theta: float) -> np.ndarray: |
| ca, sa = np.cos(alpha), np.sin(alpha) |
| ct, st = np.cos(theta), np.sin(theta) |
| return np.array( |
| [ |
| [ct, -st * ca, st * sa, a * ct], |
| [st, ct * ca, -ct * sa, a * st], |
| [0.0, sa, ca, d], |
| [0.0, 0.0, 0.0, 1.0], |
| ], |
| dtype=np.float64, |
| ) |
|
|
| transform = np.eye(4, dtype=np.float64) |
| points = [] |
| for alpha, a, d, theta in params: |
| transform = transform @ dh(alpha, a, d, theta) |
| points.append(transform[:3, 3].copy()) |
|
|
| tool_transform = transform.copy() |
| tool_transform[:3, 3] = tool_transform[:3, 3] + tool_transform[:3, 2] * TOOL_OFFSET_M |
| points.append(tool_transform[:3, 3].copy()) |
| return np.stack(points, axis=0), tool_transform |
|
|
|
|
| def project_world_points(points_world: np.ndarray, camera_matrix: np.ndarray, rvec: np.ndarray, tvec: np.ndarray) -> tuple[np.ndarray, np.ndarray]: |
| image_points, _ = cv2.projectPoints(points_world.astype(np.float64), rvec, tvec, camera_matrix, None) |
| rotation, _ = cv2.Rodrigues(rvec) |
| camera_points = (rotation @ points_world.T + tvec).T |
| return image_points[:, 0, :], camera_points[:, 2] |
|
|
|
|
| def draw_robot( |
| image: np.ndarray, |
| q_deg: np.ndarray, |
| camera_matrix: np.ndarray, |
| rvec: np.ndarray, |
| tvec: np.ndarray, |
| base_world: np.ndarray | None = None, |
| base_yaw_rad: float | None = None, |
| robot_world_rvec: np.ndarray | None = None, |
| robot_world_tvec: np.ndarray | None = None, |
| color: tuple[int, int, int] = (232, 242, 250), |
| alpha: float = 1.0, |
| ) -> np.ndarray: |
| canvas = image.copy() |
| if robot_world_rvec is not None and robot_world_tvec is not None: |
| points_robot, _ = kinova_fk_points_and_tool_pose(q_deg) |
| points_world = transform_robot_points(points_robot, robot_world_rvec, robot_world_tvec) |
| else: |
| if base_world is None or base_yaw_rad is None: |
| raise ValueError("base_world/base_yaw_rad or robot_world_rvec/robot_world_tvec must be provided") |
| points_world = kinova_fk_points_world(q_deg, base_world, base_yaw_rad) |
| uv, z = project_world_points(points_world, camera_matrix, rvec, tvec) |
|
|
| overlay = canvas.copy() |
| link_palette = [ |
| (214, 224, 232), |
| (222, 232, 239), |
| (214, 224, 232), |
| (206, 220, 229), |
| (198, 214, 224), |
| (190, 208, 220), |
| (182, 202, 215), |
| (172, 196, 212), |
| ] |
| outline = (70, 86, 98) |
| joint_fill = (230, 236, 240) |
|
|
| for idx, (p0, p1, depth_value) in enumerate(zip(uv[:-1], uv[1:], z[:-1], strict=False)): |
| base_thickness = max(14, int(52 / max(depth_value, 0.35))) |
| fill_color = link_palette[min(idx, len(link_palette) - 1)] |
| cv2.line( |
| overlay, |
| tuple(np.round(p0).astype(int)), |
| tuple(np.round(p1).astype(int)), |
| outline, |
| base_thickness + 8, |
| lineType=cv2.LINE_AA, |
| ) |
| cv2.line( |
| overlay, |
| tuple(np.round(p0).astype(int)), |
| tuple(np.round(p1).astype(int)), |
| fill_color, |
| base_thickness, |
| lineType=cv2.LINE_AA, |
| ) |
| for point, depth_value in zip(uv, z, strict=False): |
| radius = max(10, int(24 / max(depth_value, 0.35))) |
| center = tuple(np.round(point).astype(int)) |
| cv2.circle(overlay, center, radius + 4, outline, -1, lineType=cv2.LINE_AA) |
| cv2.circle(overlay, center, radius, joint_fill, -1, lineType=cv2.LINE_AA) |
| if alpha >= 1.0: |
| return overlay |
| return cv2.addWeighted(overlay, alpha, canvas, 1.0 - alpha, 0.0) |
|
|
|
|
| def render_robot_mask( |
| image_shape: tuple[int, int] | tuple[int, int, int], |
| q_deg: np.ndarray, |
| camera_matrix: np.ndarray, |
| rvec: np.ndarray, |
| tvec: np.ndarray, |
| robot_world_rvec: np.ndarray, |
| robot_world_tvec: np.ndarray, |
| extra_dilate: int = 0, |
| ) -> np.ndarray: |
| height, width = image_shape[:2] |
| points_robot, _ = kinova_fk_points_and_tool_pose(q_deg) |
| points_world = transform_robot_points(points_robot, robot_world_rvec, robot_world_tvec) |
| uv, z = project_world_points(points_world, camera_matrix, rvec, tvec) |
| mask = np.zeros((height, width), dtype=np.uint8) |
| for p0, p1, depth_value in zip(uv[:-1], uv[1:], z[:-1], strict=False): |
| thickness = max(18, int(60 / max(depth_value, 0.35))) |
| cv2.line( |
| mask, |
| tuple(np.round(p0).astype(int)), |
| tuple(np.round(p1).astype(int)), |
| 255, |
| thickness, |
| lineType=cv2.LINE_AA, |
| ) |
| for point, depth_value in zip(uv, z, strict=False): |
| radius = max(12, int(26 / max(depth_value, 0.35))) |
| cv2.circle(mask, tuple(np.round(point).astype(int)), radius, 255, -1, lineType=cv2.LINE_AA) |
| if extra_dilate > 0: |
| kernel = np.ones((extra_dilate, extra_dilate), dtype=np.uint8) |
| mask = cv2.dilate(mask, kernel, iterations=1) |
| return mask |
|
|
|
|
| def render_scene( |
| calibration: SceneCalibration, |
| q_deg: np.ndarray, |
| background: np.ndarray | None = None, |
| color: tuple[int, int, int] = (232, 242, 250), |
| alpha: float = 1.0, |
| ) -> np.ndarray: |
| if background is None: |
| background = create_background_inpaint(calibration.rgb) |
| return draw_robot( |
| background, |
| q_deg, |
| calibration.camera_matrix, |
| calibration.rvec, |
| calibration.tvec, |
| base_world=calibration.robot_base_world, |
| base_yaw_rad=calibration.robot_base_yaw_rad, |
| robot_world_rvec=calibration.robot_world_rvec, |
| robot_world_tvec=calibration.robot_world_tvec, |
| color=color, |
| alpha=alpha, |
| ) |
|
|
|
|
| def scene_to_jsonable(calibration: SceneCalibration) -> dict[str, Any]: |
| return { |
| "session_root": str(calibration.session_root), |
| "sync_row_index": calibration.sync_row_index, |
| "azure_rgb_seq": calibration.azure_rgb_seq, |
| "azure_depth_seq": calibration.azure_depth_seq, |
| "robot_seq": calibration.robot_seq, |
| "rgb_path": str(calibration.rgb_path), |
| "depth_path": str(calibration.depth_path), |
| "table_corners_px": calibration.table_corners_px.tolist(), |
| "table_size_m": list(calibration.table_size_m), |
| "camera_focal_px": float(calibration.camera_matrix[0, 0]), |
| "rvec": calibration.rvec.reshape(-1).tolist(), |
| "tvec": calibration.tvec.reshape(-1).tolist(), |
| "robot_base_world": calibration.robot_base_world.tolist(), |
| "robot_base_yaw_rad": float(calibration.robot_base_yaw_rad), |
| "robot_world_rvec": calibration.robot_world_rvec.tolist(), |
| "robot_world_tvec": calibration.robot_world_tvec.tolist(), |
| "sync_row": calibration.sync_row, |
| "table_depth_mm": float(calibration.table_depth_mm), |
| "box_height_m": float(calibration.box_height_m), |
| "teddy_height_m": float(calibration.teddy_height_m), |
| "box_world_polygon": calibration.box_world_polygon.tolist(), |
| "teddy_world_center": calibration.teddy_world_center.tolist(), |
| "teddy_world_extent": calibration.teddy_world_extent.tolist(), |
| } |
|
|
|
|
| def save_scene_json(calibration: SceneCalibration, path: str | Path) -> None: |
| path = Path(path) |
| path.write_text(json.dumps(scene_to_jsonable(calibration), indent=2)) |
|
|