|
import argparse |
|
import sqlite3 |
|
from tqdm import tqdm |
|
from collections import defaultdict |
|
import numpy as np |
|
from pathlib import Path |
|
import logging |
|
|
|
from ...colmap_from_nvm import ( |
|
recover_database_images_and_ids, |
|
camera_center_to_translation, |
|
) |
|
from ...utils.read_write_model import Camera, Image, Point3D, CAMERA_MODEL_IDS |
|
from ...utils.read_write_model import write_model |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
def read_nvm_model(nvm_path, database_path, image_ids, camera_ids, skip_points=False): |
|
|
|
|
|
db = sqlite3.connect(str(database_path)) |
|
ret = db.execute("SELECT camera_id, model, width, height, params FROM cameras;") |
|
cameras = {} |
|
for camera_id, camera_model, width, height, params in ret: |
|
params = np.fromstring(params, dtype=np.double).reshape(-1) |
|
camera_model = CAMERA_MODEL_IDS[camera_model] |
|
assert len(params) == camera_model.num_params, ( |
|
len(params), |
|
camera_model.num_params, |
|
) |
|
camera = Camera( |
|
id=camera_id, |
|
model=camera_model.model_name, |
|
width=int(width), |
|
height=int(height), |
|
params=params, |
|
) |
|
cameras[camera_id] = camera |
|
|
|
nvm_f = open(nvm_path, "r") |
|
line = nvm_f.readline() |
|
while line == "\n" or line.startswith("NVM_V3"): |
|
line = nvm_f.readline() |
|
num_images = int(line) |
|
|
|
|
|
logger.info(f"Reading {num_images} images...") |
|
image_idx_to_db_image_id = [] |
|
image_data = [] |
|
i = 0 |
|
while i < num_images: |
|
line = nvm_f.readline() |
|
if line == "\n": |
|
continue |
|
data = line.strip("\n").lstrip("./").split(" ") |
|
image_data.append(data) |
|
image_idx_to_db_image_id.append(image_ids[data[0]]) |
|
i += 1 |
|
|
|
line = nvm_f.readline() |
|
while line == "\n": |
|
line = nvm_f.readline() |
|
num_points = int(line) |
|
|
|
if skip_points: |
|
logger.info(f"Skipping {num_points} points.") |
|
num_points = 0 |
|
else: |
|
logger.info(f"Reading {num_points} points...") |
|
points3D = {} |
|
image_idx_to_keypoints = defaultdict(list) |
|
i = 0 |
|
pbar = tqdm(total=num_points, unit="pts") |
|
while i < num_points: |
|
line = nvm_f.readline() |
|
if line == "\n": |
|
continue |
|
|
|
data = line.strip("\n").split(" ") |
|
x, y, z, r, g, b, num_observations = data[:7] |
|
obs_image_ids, point2D_idxs = [], [] |
|
for j in range(int(num_observations)): |
|
s = 7 + 4 * j |
|
img_index, kp_index, kx, ky = data[s : s + 4] |
|
image_idx_to_keypoints[int(img_index)].append( |
|
(int(kp_index), float(kx), float(ky), i) |
|
) |
|
db_image_id = image_idx_to_db_image_id[int(img_index)] |
|
obs_image_ids.append(db_image_id) |
|
point2D_idxs.append(kp_index) |
|
|
|
point = Point3D( |
|
id=i, |
|
xyz=np.array([x, y, z], float), |
|
rgb=np.array([r, g, b], int), |
|
error=1.0, |
|
image_ids=np.array(obs_image_ids, int), |
|
point2D_idxs=np.array(point2D_idxs, int), |
|
) |
|
points3D[i] = point |
|
|
|
i += 1 |
|
pbar.update(1) |
|
pbar.close() |
|
|
|
logger.info("Parsing image data...") |
|
images = {} |
|
for i, data in enumerate(image_data): |
|
|
|
name, _, qw, qx, qy, qz, cx, cy, cz, _, _ = data |
|
qvec = np.array([qw, qx, qy, qz], float) |
|
c = np.array([cx, cy, cz], float) |
|
t = camera_center_to_translation(c, qvec) |
|
|
|
if i in image_idx_to_keypoints: |
|
|
|
keypoints = image_idx_to_keypoints[i] |
|
point2D_idxs = np.array([d[0] for d in keypoints]) |
|
tri_xys = np.array([[x, y] for _, x, y, _ in keypoints]) |
|
tri_ids = np.array([i for _, _, _, i in keypoints]) |
|
|
|
num_2Dpoints = max(point2D_idxs) + 1 |
|
xys = np.zeros((num_2Dpoints, 2), float) |
|
point3D_ids = np.full(num_2Dpoints, -1, int) |
|
xys[point2D_idxs] = tri_xys |
|
point3D_ids[point2D_idxs] = tri_ids |
|
else: |
|
xys = np.zeros((0, 2), float) |
|
point3D_ids = np.full(0, -1, int) |
|
|
|
image_id = image_ids[name] |
|
image = Image( |
|
id=image_id, |
|
qvec=qvec, |
|
tvec=t, |
|
camera_id=camera_ids[name], |
|
name=name.replace("png", "jpg"), |
|
xys=xys, |
|
point3D_ids=point3D_ids, |
|
) |
|
images[image_id] = image |
|
|
|
return cameras, images, points3D |
|
|
|
|
|
def main(nvm, database, output, skip_points=False): |
|
assert nvm.exists(), nvm |
|
assert database.exists(), database |
|
|
|
image_ids, camera_ids = recover_database_images_and_ids(database) |
|
|
|
logger.info("Reading the NVM model...") |
|
model = read_nvm_model( |
|
nvm, database, image_ids, camera_ids, skip_points=skip_points |
|
) |
|
|
|
logger.info("Writing the COLMAP model...") |
|
output.mkdir(exist_ok=True, parents=True) |
|
write_model(*model, path=str(output), ext=".bin") |
|
logger.info("Done.") |
|
|
|
|
|
if __name__ == "__main__": |
|
parser = argparse.ArgumentParser() |
|
parser.add_argument("--nvm", required=True, type=Path) |
|
parser.add_argument("--database", required=True, type=Path) |
|
parser.add_argument("--output", required=True, type=Path) |
|
parser.add_argument("--skip_points", action="store_true") |
|
args = parser.parse_args() |
|
main(**args.__dict__) |
|
|