|
|
|
|
|
import asyncio |
|
import argparse |
|
from collections import defaultdict |
|
import json |
|
import shutil |
|
from pathlib import Path |
|
from typing import List, Dict |
|
|
|
import numpy as np |
|
import cv2 |
|
from tqdm import tqdm |
|
from tqdm.contrib.concurrent import thread_map |
|
from omegaconf import DictConfig, OmegaConf |
|
from opensfm.pygeometry import Camera |
|
from opensfm.pymap import Shot |
|
from opensfm.undistort import ( |
|
perspective_camera_from_fisheye, |
|
perspective_camera_from_perspective, |
|
) |
|
|
|
from .. import logger |
|
|
|
|
|
from .geo import BoundaryBox, Projection |
|
from .utils import decompose_rotmat |
|
from .utils_sfm import ( |
|
keyframe_selection, |
|
perspective_camera_from_pano, |
|
scale_camera, |
|
CameraUndistorter, |
|
PanoramaUndistorter, |
|
undistort_shot, |
|
) |
|
from .download import ( |
|
opensfm_shot_from_info, |
|
image_filename, |
|
) |
|
|
|
|
|
default_cfg = OmegaConf.create( |
|
{ |
|
"max_image_size": 512, |
|
"do_legacy_pano_offset": True, |
|
"min_dist_between_keyframes": 4, |
|
"tiling": { |
|
"tile_size": 128, |
|
"margin": 128, |
|
"ppm": 2, |
|
}, |
|
} |
|
) |
|
|
|
|
|
def get_pano_offset(image_info: dict, do_legacy: bool = False) -> float: |
|
if do_legacy: |
|
seed = int(image_info["sfm_cluster"]["id"]) |
|
else: |
|
seed = image_info["sequence"].__hash__() |
|
seed = seed % (2**32 - 1) |
|
return np.random.RandomState(seed).uniform(-45, 45) |
|
|
|
|
|
def process_shot( |
|
shot: Shot, info: dict, image_path: Path, output_dir: Path, cfg: DictConfig |
|
) -> List[Shot]: |
|
if not image_path.exists(): |
|
logger.warn(f"Image {image_path} does not exist !") |
|
return None |
|
|
|
image_orig = cv2.imread(str(image_path)) |
|
max_size = cfg.max_image_size |
|
pano_offset = None |
|
|
|
camera = shot.camera |
|
camera.width, camera.height = image_orig.shape[:2][::-1] |
|
if camera.is_panorama(camera.projection_type): |
|
camera_new = perspective_camera_from_pano(camera, max_size) |
|
undistorter = PanoramaUndistorter(camera, camera_new) |
|
pano_offset = get_pano_offset(info, cfg.do_legacy_pano_offset) |
|
elif camera.projection_type in ["fisheye", "perspective"]: |
|
if camera.projection_type == "fisheye": |
|
camera_new = perspective_camera_from_fisheye(camera) |
|
else: |
|
camera_new = perspective_camera_from_perspective(camera) |
|
camera_new = scale_camera(camera_new, max_size) |
|
camera_new.id = camera.id + "_undistorted" |
|
undistorter = CameraUndistorter(camera, camera_new) |
|
else: |
|
raise NotImplementedError(camera.projection_type) |
|
|
|
shots_undist, images_undist = undistort_shot( |
|
image_orig, shot, undistorter, pano_offset |
|
) |
|
for shot, image in zip(shots_undist, images_undist): |
|
cv2.imwrite(str(output_dir / f"{shot.id}.jpg"), image) |
|
|
|
return shots_undist |
|
|
|
|
|
def pack_shot_dict(shot: Shot, info: dict) -> dict: |
|
latlong = info["computed_geometry.coordinates"][::-1] |
|
latlong_gps = info["geometry.coordinates"][::-1] |
|
w_p_c = shot.pose.get_origin() |
|
w_r_c = shot.pose.get_R_cam_to_world() |
|
rpy = decompose_rotmat(w_r_c) |
|
return dict( |
|
camera_id=shot.camera.id, |
|
latlong=latlong, |
|
t_c2w=w_p_c, |
|
R_c2w=w_r_c, |
|
roll_pitch_yaw=rpy, |
|
capture_time=info["captured_at"], |
|
gps_position=np.r_[latlong_gps, info["altitude"]], |
|
compass_angle=info["compass_angle"], |
|
chunk_id=int(info["sfm_cluster.id"]), |
|
) |
|
|
|
|
|
def pack_camera_dict(camera: Camera) -> dict: |
|
assert camera.projection_type == "perspective" |
|
K = camera.get_K_in_pixel_coordinates(camera.width, camera.height) |
|
return dict( |
|
id=camera.id, |
|
model="PINHOLE", |
|
width=camera.width, |
|
height=camera.height, |
|
params=K[[0, 1, 0, 1], [0, 1, 2, 2]], |
|
) |
|
|
|
|
|
def process_sequence( |
|
image_ids: List[int], |
|
image_infos: dict, |
|
projection: Projection, |
|
cfg: DictConfig, |
|
raw_image_dir: Path, |
|
out_image_dir: Path, |
|
): |
|
shots = [] |
|
dump = {} |
|
processed_ids = list() |
|
if len(image_ids) == 0: |
|
return dump, processed_ids |
|
|
|
image_ids = sorted(image_ids, key=lambda i: image_infos[i]["captured_at"]) |
|
for i in image_ids: |
|
_, shot = opensfm_shot_from_info(image_infos[i], projection) |
|
shots.append(shot) |
|
shot_idxs = keyframe_selection(shots, min_dist=cfg.min_dist_between_keyframes) |
|
shots = [shots[i] for i in shot_idxs] |
|
|
|
shots_out = thread_map( |
|
lambda shot: process_shot( |
|
shot, |
|
image_infos[shot.id], |
|
raw_image_dir / image_filename.format(image_id=shot.id), |
|
out_image_dir, |
|
cfg, |
|
), |
|
shots, |
|
disable=True, |
|
) |
|
shots_out = [s for s in shots_out if s is not None] |
|
shots_out = [(i, s) for i, ss in enumerate(shots_out) for s in ss if ss is not None] |
|
|
|
for index, shot in shots_out: |
|
i, suffix = shot.id.rsplit("_", 1) |
|
processed_ids.append(i) |
|
info = image_infos[i] |
|
seq_id = info["sequence"] |
|
is_pano = not suffix.endswith("undistorted") |
|
if is_pano: |
|
seq_id += f"_{suffix}" |
|
if seq_id not in dump: |
|
dump[seq_id] = dict(views={}, cameras={}) |
|
|
|
view = pack_shot_dict(shot, info) |
|
view["index"] = index |
|
dump[seq_id]["views"][shot.id] = view |
|
dump[seq_id]["cameras"][shot.camera.id] = pack_camera_dict(shot.camera) |
|
return dump, processed_ids |