Spaces:
Running
on
L4
Running
on
L4
#!/usr/bin/env python3 | |
""" | |
Preprocess the MVImgNet dataset. | |
This script processes MVImgNet sequences by: | |
- Loading a sparse SFM reconstruction. | |
- Undistorting and rescaling RGB images. | |
- Converting COLMAP intrinsics between conventions. | |
- Saving the processed images and camera metadata. | |
Usage: | |
python preprocess_mvimgnet.py --data_dir /path/to/MVImgNet_data \ | |
--pcd_dir /path/to/MVPNet \ | |
--output_dir /path/to/processed_mvimgnet | |
""" | |
import os | |
import os.path as osp | |
import argparse | |
import numpy as np | |
import open3d as o3d | |
import pyrender | |
import PIL.Image as Image | |
import cv2 | |
import shutil | |
from tqdm import tqdm | |
import matplotlib.pyplot as plt | |
# Import your custom SFM processing function. | |
from read_write_model import run # Assumed to be available | |
# Try to set up resampling filters from PIL. | |
try: | |
lanczos = Image.Resampling.LANCZOS | |
bicubic = Image.Resampling.BICUBIC | |
except AttributeError: | |
lanczos = Image.LANCZOS | |
bicubic = Image.BICUBIC | |
# Conversion matrix from COLMAP (or OpenGL) to OpenCV conventions. | |
OPENGL_TO_OPENCV = np.float32( | |
[[1, 0, 0, 0], [0, -1, 0, 0], [0, 0, -1, 0], [0, 0, 0, 1]] | |
) | |
# ----------------------------------------------------------------------------- | |
# Helper Classes and Functions | |
# ----------------------------------------------------------------------------- | |
class ImageList: | |
"""Convenience class to apply operations to a list of images.""" | |
def __init__(self, images): | |
if not isinstance(images, (list, tuple)): | |
images = [images] | |
self.images = [] | |
for image in images: | |
if not isinstance(image, Image.Image): | |
image = Image.fromarray(image) | |
self.images.append(image) | |
def __len__(self): | |
return len(self.images) | |
def to_pil(self): | |
return tuple(self.images) if len(self.images) > 1 else self.images[0] | |
def size(self): | |
sizes = [im.size for im in self.images] | |
assert all(s == sizes[0] for s in sizes) | |
return sizes[0] | |
def resize(self, *args, **kwargs): | |
return ImageList([im.resize(*args, **kwargs) for im in self.images]) | |
def crop(self, *args, **kwargs): | |
return ImageList([im.crop(*args, **kwargs) for im in self.images]) | |
def colmap_to_opencv_intrinsics(K): | |
""" | |
Convert COLMAP intrinsics (with pixel centers at (0.5, 0.5)) to OpenCV convention. | |
""" | |
K = K.copy() | |
K[0, 2] -= 0.5 | |
K[1, 2] -= 0.5 | |
return K | |
def opencv_to_colmap_intrinsics(K): | |
""" | |
Convert OpenCV intrinsics (with pixel centers at (0, 0)) to COLMAP convention. | |
""" | |
K = K.copy() | |
K[0, 2] += 0.5 | |
K[1, 2] += 0.5 | |
return K | |
def rescale_image_depthmap( | |
image, depthmap, camera_intrinsics, output_resolution, force=True | |
): | |
""" | |
Jointly rescale an image (and its depthmap) so that the output resolution is at least the desired value. | |
Args: | |
image: Input image (as a PIL.Image or compatible object). | |
depthmap: A corresponding depth map (or None). | |
camera_intrinsics: A 3x3 NumPy array of intrinsics. | |
output_resolution: (width, height) desired resolution. | |
force: If True, always rescale even if the image is smaller. | |
Returns: | |
Tuple of (rescaled image, rescaled depthmap, updated intrinsics). | |
""" | |
image = ImageList(image) | |
input_resolution = np.array(image.size) # (W, H) | |
output_resolution = np.array(output_resolution) | |
if depthmap is not None: | |
assert tuple(depthmap.shape[:2]) == image.size[::-1] | |
scale_final = max(output_resolution / image.size) + 1e-8 | |
if scale_final >= 1 and not force: | |
return image.to_pil(), depthmap, camera_intrinsics | |
output_resolution = np.floor(input_resolution * scale_final).astype(int) | |
image = image.resize( | |
tuple(output_resolution), resample=lanczos if scale_final < 1 else bicubic | |
) | |
if depthmap is not None: | |
depthmap = cv2.resize( | |
depthmap, tuple(output_resolution), interpolation=cv2.INTER_NEAREST | |
) | |
camera_intrinsics = camera_matrix_of_crop( | |
camera_intrinsics, input_resolution, output_resolution, scaling=scale_final | |
) | |
return image.to_pil(), depthmap, camera_intrinsics | |
def camera_matrix_of_crop( | |
input_camera_matrix, | |
input_resolution, | |
output_resolution, | |
scaling=1, | |
offset_factor=0.5, | |
offset=None, | |
): | |
""" | |
Update the camera intrinsics to account for a rescaling (or cropping) of the image. | |
""" | |
margins = np.asarray(input_resolution) * scaling - output_resolution | |
assert np.all(margins >= 0.0) | |
if offset is None: | |
offset = offset_factor * margins | |
output_camera_matrix_colmap = opencv_to_colmap_intrinsics(input_camera_matrix) | |
output_camera_matrix_colmap[:2, :] *= scaling | |
output_camera_matrix_colmap[:2, 2] -= offset | |
output_camera_matrix = colmap_to_opencv_intrinsics(output_camera_matrix_colmap) | |
return output_camera_matrix | |
def pose_from_qwxyz_txyz(elems): | |
""" | |
Convert a quaternion (qw, qx, qy, qz) and translation (tx, ty, tz) to a 4x4 pose. | |
Returns the inverse of the computed pose (i.e. cam2world). | |
""" | |
from scipy.spatial.transform import Rotation | |
qw, qx, qy, qz, tx, ty, tz = map(float, elems) | |
pose = np.eye(4) | |
pose[:3, :3] = Rotation.from_quat((qx, qy, qz, qw)).as_matrix() | |
pose[:3, 3] = (tx, ty, tz) | |
return np.linalg.inv(pose) | |
def load_sfm(sfm_dir): | |
""" | |
Load sparse SFM data from COLMAP output files. | |
Returns a tuple (img_idx, img_infos) where: | |
- img_idx: A dict mapping image filename to index. | |
- img_infos: A dict of image information (including intrinsics, file path, and camera pose). | |
""" | |
with open(osp.join(sfm_dir, "cameras.txt"), "r") as f: | |
raw = f.read().splitlines()[3:] # skip header | |
intrinsics = {} | |
for camera in raw: | |
camera = camera.split(" ") | |
intrinsics[int(camera[0])] = [camera[1]] + [float(x) for x in camera[2:]] | |
with open(osp.join(sfm_dir, "images.txt"), "r") as f: | |
raw = f.read().splitlines() | |
raw = [line for line in raw if not line.startswith("#")] | |
img_idx = {} | |
img_infos = {} | |
for image, points in zip(raw[0::2], raw[1::2]): | |
image = image.split(" ") | |
points = points.split(" ") | |
idx = image[0] | |
img_name = image[-1] | |
assert img_name not in img_idx, f"Duplicate image: {img_name}" | |
img_idx[img_name] = idx | |
current_points2D = { | |
int(i): (float(x), float(y)) | |
for i, x, y in zip(points[2::3], points[0::3], points[1::3]) | |
if i != "-1" | |
} | |
img_infos[idx] = dict( | |
intrinsics=intrinsics[int(image[-2])], | |
path=img_name, | |
frame_id=img_name, | |
cam_to_world=pose_from_qwxyz_txyz(image[1:-2]), | |
sparse_pts2d=current_points2D, | |
) | |
return img_idx, img_infos | |
def undistort_images(intrinsics, rgb): | |
""" | |
Given camera intrinsics (in COLMAP convention) and an RGB image, compute and return | |
the corresponding OpenCV intrinsics along with the (unchanged) image. | |
""" | |
width = int(intrinsics[1]) | |
height = int(intrinsics[2]) | |
fx = intrinsics[3] | |
fy = intrinsics[4] | |
cx = intrinsics[5] | |
cy = intrinsics[6] | |
K = np.zeros([3, 3]) | |
K[0, 0] = fx | |
K[0, 2] = cx | |
K[1, 1] = fy | |
K[1, 2] = cy | |
K[2, 2] = 1 | |
return width, height, K, rgb | |
# ----------------------------------------------------------------------------- | |
# Processing Functions | |
# ----------------------------------------------------------------------------- | |
def process_sequence(category, obj, data_dir, output_dir): | |
""" | |
Process a single sequence from MVImgNet. | |
Steps: | |
1. Load the point cloud (from the MVPNet directory) and create a mesh (using Pyrender) for visualization. | |
2. Load the SFM reconstruction from COLMAP files. | |
3. For each image in the SFM output: | |
a. Load the image. | |
b. Undistort and rescale it. | |
c. Update the camera intrinsics. | |
d. Save the processed image and camera metadata. | |
""" | |
# Define directories. | |
seq_dir = osp.join(data_dir, "MVImgNet_by_categories", category, obj[:-4]) | |
rgb_dir = osp.join(seq_dir, "images") | |
sfm_dir = osp.join(seq_dir, "sparse", "0") | |
output_scene_dir = osp.join(output_dir, f"{category}_{obj[:-4]}") | |
output_rgb_dir = osp.join(output_scene_dir, "rgb") | |
output_cam_dir = osp.join(output_scene_dir, "cam") | |
os.makedirs(output_rgb_dir, exist_ok=True) | |
os.makedirs(output_cam_dir, exist_ok=True) | |
# Run custom SFM processing. | |
run(sfm_dir, sfm_dir) | |
img_idx, img_infos = load_sfm(sfm_dir) | |
for imgname in img_idx: | |
idx = img_idx[imgname] | |
info = img_infos[idx] | |
rgb_path = osp.join(rgb_dir, info["path"]) | |
if not osp.exists(rgb_path): | |
continue | |
rgb = np.array(Image.open(rgb_path)) | |
_, _, K, rgb = undistort_images(info["intrinsics"], rgb) | |
intrinsics = colmap_to_opencv_intrinsics(K) | |
# Rescale image to a target resolution (e.g., 640x480) preserving aspect ratio. | |
image, _, intrinsics = rescale_image_depthmap( | |
rgb, None, intrinsics, (640, int(640 * 3.0 / 4)) | |
) | |
intrinsics = opencv_to_colmap_intrinsics(intrinsics) | |
out_img_path = osp.join(output_rgb_dir, info["path"][:-3] + "jpg") | |
image.save(out_img_path) | |
out_cam_path = osp.join(output_cam_dir, info["path"][:-3] + "npz") | |
np.savez(out_cam_path, intrinsics=intrinsics, pose=info["cam_to_world"]) | |
def main(): | |
parser = argparse.ArgumentParser( | |
description="Preprocess MVImgNet dataset: undistort, rescale images, and save camera parameters." | |
) | |
parser.add_argument( | |
"--data_dir", | |
type=str, | |
default="/path/to/MVImgNet_data", | |
help="Directory containing MVImgNet data (images and point clouds).", | |
) | |
parser.add_argument( | |
"--output_dir", | |
type=str, | |
default="/path/to/processed_mvimgnet", | |
help="Directory where processed data will be saved.", | |
) | |
args = parser.parse_args() | |
data_dir = args.data_dir | |
output_dir = args.output_dir | |
# Get list of categories. | |
categories = sorted( | |
[ | |
d | |
for d in os.listdir(osp.join(data_dir, "MVImgNet_by_categories")) | |
if osp.isdir(osp.join(data_dir, "MVImgNet_by_categories", d)) | |
] | |
) | |
for cat in categories: | |
objects = sorted(os.listdir(osp.join(data_dir, "MVImgNet_by_categories", cat))) | |
for obj in objects: | |
process_sequence(cat, obj, data_dir, output_dir) | |
if __name__ == "__main__": | |
main() | |