# Adapted from prepare.py import asyncio import argparse from collections import defaultdict import json import shutil from pathlib import Path from typing import List, Dict import numpy as np import cv2 from tqdm import tqdm from tqdm.contrib.concurrent import thread_map from omegaconf import DictConfig, OmegaConf from opensfm.pygeometry import Camera from opensfm.pymap import Shot from opensfm.undistort import ( perspective_camera_from_fisheye, perspective_camera_from_perspective, ) from .. import logger # from ...osm.tiling import TileManager # from ...osm.viz import GeoPlotter from .geo import BoundaryBox, Projection from .utils import decompose_rotmat from .utils_sfm import ( keyframe_selection, perspective_camera_from_pano, scale_camera, CameraUndistorter, PanoramaUndistorter, undistort_shot, ) from .download import ( opensfm_shot_from_info, image_filename, ) default_cfg = OmegaConf.create( { "max_image_size": 512, "do_legacy_pano_offset": True, "min_dist_between_keyframes": 4, "tiling": { "tile_size": 128, "margin": 128, "ppm": 2, }, } ) def get_pano_offset(image_info: dict, do_legacy: bool = False) -> float: if do_legacy: seed = int(image_info["sfm_cluster"]["id"]) else: seed = image_info["sequence"].__hash__() seed = seed % (2**32 - 1) return np.random.RandomState(seed).uniform(-45, 45) def process_shot( shot: Shot, info: dict, image_path: Path, output_dir: Path, cfg: DictConfig ) -> List[Shot]: if not image_path.exists(): logger.warn(f"Image {image_path} does not exist !") return None image_orig = cv2.imread(str(image_path)) max_size = cfg.max_image_size pano_offset = None camera = shot.camera camera.width, camera.height = image_orig.shape[:2][::-1] if camera.is_panorama(camera.projection_type): camera_new = perspective_camera_from_pano(camera, max_size) undistorter = PanoramaUndistorter(camera, camera_new) pano_offset = get_pano_offset(info, cfg.do_legacy_pano_offset) elif camera.projection_type in ["fisheye", "perspective"]: if camera.projection_type == "fisheye": camera_new = perspective_camera_from_fisheye(camera) else: camera_new = perspective_camera_from_perspective(camera) camera_new = scale_camera(camera_new, max_size) camera_new.id = camera.id + "_undistorted" undistorter = CameraUndistorter(camera, camera_new) else: raise NotImplementedError(camera.projection_type) shots_undist, images_undist = undistort_shot( image_orig, shot, undistorter, pano_offset ) for shot, image in zip(shots_undist, images_undist): cv2.imwrite(str(output_dir / f"{shot.id}.jpg"), image) return shots_undist def pack_shot_dict(shot: Shot, info: dict) -> dict: latlong = info["computed_geometry.coordinates"][::-1] latlong_gps = info["geometry.coordinates"][::-1] w_p_c = shot.pose.get_origin() w_r_c = shot.pose.get_R_cam_to_world() rpy = decompose_rotmat(w_r_c) return dict( camera_id=shot.camera.id, latlong=latlong, t_c2w=w_p_c, R_c2w=w_r_c, roll_pitch_yaw=rpy, capture_time=info["captured_at"], gps_position=np.r_[latlong_gps, info["altitude"]], compass_angle=info["compass_angle"], chunk_id=int(info["sfm_cluster.id"]), ) def pack_camera_dict(camera: Camera) -> dict: assert camera.projection_type == "perspective" K = camera.get_K_in_pixel_coordinates(camera.width, camera.height) return dict( id=camera.id, model="PINHOLE", width=camera.width, height=camera.height, params=K[[0, 1, 0, 1], [0, 1, 2, 2]], ) def process_sequence( image_ids: List[int], image_infos: dict, projection: Projection, cfg: DictConfig, raw_image_dir: Path, out_image_dir: Path, ): shots = [] dump = {} processed_ids = list() if len(image_ids) == 0: return dump, processed_ids image_ids = sorted(image_ids, key=lambda i: image_infos[i]["captured_at"]) for i in image_ids: _, shot = opensfm_shot_from_info(image_infos[i], projection) shots.append(shot) shot_idxs = keyframe_selection(shots, min_dist=cfg.min_dist_between_keyframes) shots = [shots[i] for i in shot_idxs] shots_out = thread_map( lambda shot: process_shot( shot, image_infos[shot.id], raw_image_dir / image_filename.format(image_id=shot.id), out_image_dir, cfg, ), shots, disable=True, ) shots_out = [s for s in shots_out if s is not None] shots_out = [(i, s) for i, ss in enumerate(shots_out) for s in ss if ss is not None] for index, shot in shots_out: i, suffix = shot.id.rsplit("_", 1) processed_ids.append(i) info = image_infos[i] seq_id = info["sequence"] is_pano = not suffix.endswith("undistorted") if is_pano: seq_id += f"_{suffix}" if seq_id not in dump: dump[seq_id] = dict(views={}, cameras={}) view = pack_shot_dict(shot, info) view["index"] = index dump[seq_id]["views"][shot.id] = view dump[seq_id]["cameras"][shot.camera.id] = pack_camera_dict(shot.camera) return dump, processed_ids