| import argparse |
| import os |
| import subprocess |
| import sys |
| import urllib.request |
|
|
|
|
| def download_file(url, file_path, max_retries=3): |
| if os.path.exists(file_path): |
| return |
| print(f"Downloading {url} to {file_path}") |
| for retry in range(max_retries): |
| try: |
| urllib.request.urlretrieve(url, file_path) |
| return |
| except Exception as exc: |
| print( |
| f"Failed to download {url} (trial={retry + 1}) to {file_path} due to {exc}" |
| ) |
|
|
|
|
| def check_small_errors_or_exit( |
| dataset_name, |
| max_rotation_error, |
| max_proj_center_error, |
| expected_num_images, |
| errors_csv_path, |
| ): |
| print(f"Evaluating errors for {dataset_name}") |
|
|
| error = False |
| with open(errors_csv_path, "r") as fid: |
| num_images = 0 |
| for line in fid: |
| line = line.strip() |
| if len(line) == 0 or line.startswith("#"): |
| continue |
| rotation_error, proj_center_error = map(float, line.split(",")) |
| num_images += 1 |
| if rotation_error > max_rotation_error: |
| print("Exceeded rotation error threshold:", rotation_error) |
| error = True |
| if proj_center_error > max_proj_center_error: |
| print( |
| "Exceeded projection center error threshold:", |
| proj_center_error, |
| ) |
| error = True |
|
|
| if num_images != expected_num_images: |
| print("Unexpected number of images:", num_images) |
| error = True |
|
|
| if error: |
| sys.exit(1) |
|
|
|
|
| def process_dataset(args, dataset_name): |
| print("Processing dataset:", dataset_name) |
|
|
| workspace_path = os.path.join( |
| os.path.realpath(args.workspace_path), dataset_name |
| ) |
| os.makedirs(workspace_path, exist_ok=True) |
|
|
| dataset_archive_path = os.path.join(workspace_path, f"{dataset_name}.7z") |
| download_file( |
| f"https://www.eth3d.net/data/{dataset_name}_dslr_undistorted.7z", |
| dataset_archive_path, |
| ) |
|
|
| subprocess.check_call( |
| ["7zz", "x", "-y", f"{dataset_name}.7z"], cwd=workspace_path |
| ) |
|
|
| |
| with open( |
| os.path.join( |
| workspace_path, |
| f"{dataset_name}/dslr_calibration_undistorted/cameras.txt", |
| ), |
| "r", |
| ) as fid: |
| for line in fid: |
| if not line.startswith("#"): |
| first_camera_data = line.split() |
| camera_model = first_camera_data[1] |
| assert camera_model == "PINHOLE" |
| camera_params = first_camera_data[4:] |
| assert len(camera_params) == 4 |
| break |
|
|
| |
| expected_num_images = 0 |
| with open( |
| os.path.join( |
| workspace_path, |
| f"{dataset_name}/dslr_calibration_undistorted/images.txt", |
| ), |
| "r", |
| ) as fid: |
| for line in fid: |
| if not line.startswith("#") and line.strip(): |
| expected_num_images += 1 |
| |
| assert expected_num_images % 2 == 0 |
| expected_num_images /= 2 |
|
|
| |
| subprocess.check_call( |
| [ |
| os.path.realpath(args.colmap_path), |
| "automatic_reconstructor", |
| "--image_path", |
| f"{dataset_name}/images/", |
| "--workspace_path", |
| workspace_path, |
| "--use_gpu", |
| "1" if args.use_gpu else "0", |
| "--num_threads", |
| str(args.num_threads), |
| "--quality", |
| args.quality, |
| "--camera_model", |
| "PINHOLE", |
| "--camera_params", |
| ",".join(camera_params), |
| ], |
| cwd=workspace_path, |
| ) |
|
|
| |
| subprocess.check_call( |
| [ |
| os.path.realpath(args.colmap_path), |
| "model_comparer", |
| "--input_path1", |
| "sparse/0", |
| "--input_path2", |
| f"{dataset_name}/dslr_calibration_undistorted/", |
| "--output_path", |
| ".", |
| "--alignment_error", |
| "proj_center", |
| "--max_proj_center_error", |
| str(args.max_proj_center_error), |
| ], |
| cwd=workspace_path, |
| ) |
|
|
| |
| check_small_errors_or_exit( |
| dataset_name, |
| args.max_rotation_error, |
| args.max_proj_center_error, |
| expected_num_images, |
| os.path.join(workspace_path, "errors.csv"), |
| ) |
|
|
|
|
| def parse_args(): |
| parser = argparse.ArgumentParser() |
| parser.add_argument("--dataset_names", required=True) |
| parser.add_argument("--workspace_path", required=True) |
| parser.add_argument("--colmap_path", required=True) |
| parser.add_argument("--use_gpu", default=True, action="store_true") |
| parser.add_argument("--use_cpu", dest="use_gpu", action="store_false") |
| parser.add_argument("--num_threads", type=int, default=-1) |
| parser.add_argument("--quality", default="medium") |
| parser.add_argument("--max_rotation_error", type=float, default=1.0) |
| parser.add_argument("--max_proj_center_error", type=float, default=0.1) |
| return parser.parse_args() |
|
|
|
|
| def main(): |
| args = parse_args() |
|
|
| for dataset_name in args.dataset_names.split(","): |
| process_dataset(args, dataset_name.strip()) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|