|
import glob |
|
import json |
|
import os |
|
from internal import image as lib_image |
|
from internal import math |
|
from internal import utils |
|
import numpy as np |
|
import rawpy |
|
|
|
|
|
def postprocess_raw(raw, camtorgb, exposure=None): |
|
"""Converts demosaicked raw to sRGB with a minimal postprocessing pipeline. |
|
|
|
Args: |
|
raw: [H, W, 3], demosaicked raw camera image. |
|
camtorgb: [3, 3], color correction transformation to apply to raw image. |
|
exposure: color value to be scaled to pure white after color correction. |
|
If None, "autoexposes" at the 97th percentile. |
|
|
|
Returns: |
|
srgb: [H, W, 3], color corrected + exposed + gamma mapped image. |
|
""" |
|
if raw.shape[-1] != 3: |
|
raise ValueError(f'raw.shape[-1] is {raw.shape[-1]}, expected 3') |
|
if camtorgb.shape != (3, 3): |
|
raise ValueError(f'camtorgb.shape is {camtorgb.shape}, expected (3, 3)') |
|
|
|
rgb_linear = np.matmul(raw, camtorgb.T) |
|
if exposure is None: |
|
exposure = np.percentile(rgb_linear, 97) |
|
|
|
rgb_linear_scaled = np.clip(rgb_linear / exposure, 0, 1) |
|
|
|
srgb = lib_image.linear_to_srgb_np(rgb_linear_scaled) |
|
return srgb |
|
|
|
|
|
def pixels_to_bayer_mask(pix_x, pix_y): |
|
"""Computes binary RGB Bayer mask values from integer pixel coordinates.""" |
|
|
|
r = (pix_x % 2 == 0) * (pix_y % 2 == 0) |
|
|
|
g = (pix_x % 2 == 1) * (pix_y % 2 == 0) + (pix_x % 2 == 0) * (pix_y % 2 == 1) |
|
|
|
b = (pix_x % 2 == 1) * (pix_y % 2 == 1) |
|
return np.stack([r, g, b], -1).astype(np.float32) |
|
|
|
|
|
def bilinear_demosaic(bayer): |
|
"""Converts Bayer data into a full RGB image using bilinear demosaicking. |
|
|
|
Input data should be ndarray of shape [height, width] with 2x2 mosaic pattern: |
|
------------- |
|
|red |green| |
|
------------- |
|
|green|blue | |
|
------------- |
|
Red and blue channels are bilinearly upsampled 2x, missing green channel |
|
elements are the average of the neighboring 4 values in a cross pattern. |
|
|
|
Args: |
|
bayer: [H, W] array, Bayer mosaic pattern input image. |
|
|
|
Returns: |
|
rgb: [H, W, 3] array, full RGB image. |
|
""" |
|
|
|
def reshape_quads(*planes): |
|
"""Reshape pixels from four input images to make tiled 2x2 quads.""" |
|
planes = np.stack(planes, -1) |
|
shape = planes.shape[:-1] |
|
|
|
zup = planes.reshape(shape + (2, 2,)) |
|
|
|
zup = np.transpose(zup, (0, 2, 1, 3)) |
|
|
|
zup = zup.reshape((shape[0] * 2, shape[1] * 2)) |
|
return zup |
|
|
|
def bilinear_upsample(z): |
|
"""2x bilinear image upsample.""" |
|
|
|
|
|
|
|
|
|
zx = .5 * (z + np.roll(z, -1, axis=-1)) |
|
|
|
zy = .5 * (z + np.roll(z, -1, axis=-2)) |
|
|
|
zxy = .5 * (zx + np.roll(zx, -1, axis=-2)) |
|
return reshape_quads(z, zx, zy, zxy) |
|
|
|
def upsample_green(g1, g2): |
|
"""Special 2x upsample from the two green channels.""" |
|
z = np.zeros_like(g1) |
|
z = reshape_quads(z, g1, g2, z) |
|
alt = 0 |
|
|
|
for i in range(4): |
|
axis = -1 - (i // 2) |
|
roll = -1 + 2 * (i % 2) |
|
alt = alt + .25 * np.roll(z, roll, axis=axis) |
|
|
|
|
|
return alt + z |
|
|
|
r, g1, g2, b = [bayer[(i // 2)::2, (i % 2)::2] for i in range(4)] |
|
r = bilinear_upsample(r) |
|
|
|
|
|
b = bilinear_upsample(b[::-1, ::-1])[::-1, ::-1] |
|
g = upsample_green(g1, g2) |
|
rgb = np.stack([r, g, b], -1) |
|
return rgb |
|
|
|
|
|
def load_raw_images(image_dir, image_names=None): |
|
"""Loads raw images and their metadata from disk. |
|
|
|
Args: |
|
image_dir: directory containing raw image and EXIF data. |
|
image_names: files to load (ignores file extension), loads all DNGs if None. |
|
|
|
Returns: |
|
A tuple (images, exifs). |
|
images: [N, height, width, 3] array of raw sensor data. |
|
exifs: [N] list of dicts, one per image, containing the EXIF data. |
|
Raises: |
|
ValueError: The requested `image_dir` does not exist on disk. |
|
""" |
|
|
|
if not utils.file_exists(image_dir): |
|
raise ValueError(f'Raw image folder {image_dir} does not exist.') |
|
|
|
|
|
def load_raw_exif(image_name): |
|
base = os.path.join(image_dir, os.path.splitext(image_name)[0]) |
|
with utils.open_file(base + '.dng', 'rb') as f: |
|
raw = rawpy.imread(f).raw_image |
|
with utils.open_file(base + '.json', 'rb') as f: |
|
exif = json.load(f)[0] |
|
return raw, exif |
|
|
|
if image_names is None: |
|
image_names = [ |
|
os.path.basename(f) |
|
for f in sorted(glob.glob(os.path.join(image_dir, '*.dng'))) |
|
] |
|
|
|
data = [load_raw_exif(x) for x in image_names] |
|
raws, exifs = zip(*data) |
|
raws = np.stack(raws, axis=0).astype(np.float32) |
|
|
|
return raws, exifs |
|
|
|
|
|
|
|
_PERCENTILE_LIST = (80, 90, 97, 99, 100) |
|
|
|
|
|
|
|
|
|
_EXIF_KEYS = ( |
|
'BlackLevel', |
|
'WhiteLevel', |
|
'AsShotNeutral', |
|
'ColorMatrix2', |
|
'NoiseProfile', |
|
) |
|
|
|
|
|
|
|
_RGB2XYZ = np.array([[0.4124564, 0.3575761, 0.1804375], |
|
[0.2126729, 0.7151522, 0.0721750], |
|
[0.0193339, 0.1191920, 0.9503041]]) |
|
|
|
|
|
def process_exif(exifs): |
|
"""Processes list of raw image EXIF data into useful metadata dict. |
|
|
|
Input should be a list of dictionaries loaded from JSON files. |
|
These JSON files are produced by running |
|
$ exiftool -json IMAGE.dng > IMAGE.json |
|
for each input raw file. |
|
|
|
We extract only the parameters relevant to |
|
1. Rescaling the raw data to [0, 1], |
|
2. White balance and color correction, and |
|
3. Noise level estimation. |
|
|
|
Args: |
|
exifs: a list of dicts containing EXIF data as loaded from JSON files. |
|
|
|
Returns: |
|
meta: a dict of the relevant metadata for running RawNeRF. |
|
""" |
|
meta = {} |
|
exif = exifs[0] |
|
|
|
for key in _EXIF_KEYS: |
|
exif_value = exif.get(key) |
|
if exif_value is None: |
|
continue |
|
|
|
if isinstance(exif_value, int) or isinstance(exif_value, float): |
|
vals = [x[key] for x in exifs] |
|
|
|
elif isinstance(exif_value, str): |
|
vals = [[float(z) for z in x[key].split(' ')] for x in exifs] |
|
meta[key] = np.squeeze(np.array(vals)) |
|
|
|
meta['ShutterSpeed'] = np.fromiter( |
|
(1. / float(exif['ShutterSpeed'].split('/')[1]) for exif in exifs), float) |
|
|
|
|
|
|
|
|
|
|
|
whitebalance = meta['AsShotNeutral'].reshape(-1, 3) |
|
cam2camwb = np.array([np.diag(1. / x) for x in whitebalance]) |
|
|
|
|
|
xyz2camwb = meta['ColorMatrix2'].reshape(-1, 3, 3) |
|
rgb2camwb = xyz2camwb @ _RGB2XYZ |
|
|
|
|
|
rgb2camwb /= rgb2camwb.sum(axis=-1, keepdims=True) |
|
|
|
cam2rgb = np.linalg.inv(rgb2camwb) @ cam2camwb |
|
meta['cam2rgb'] = cam2rgb |
|
|
|
return meta |
|
|
|
|
|
def load_raw_dataset(split, data_dir, image_names, exposure_percentile, n_downsample): |
|
"""Loads and processes a set of RawNeRF input images. |
|
|
|
Includes logic necessary for special "test" scenes that include a noiseless |
|
ground truth frame, produced by HDR+ merge. |
|
|
|
Args: |
|
split: DataSplit.TRAIN or DataSplit.TEST, only used for test scene logic. |
|
data_dir: base directory for scene data. |
|
image_names: which images were successfully posed by COLMAP. |
|
exposure_percentile: what brightness percentile to expose to white. |
|
n_downsample: returned images are downsampled by a factor of n_downsample. |
|
|
|
Returns: |
|
A tuple (images, meta, testscene). |
|
images: [N, height // n_downsample, width // n_downsample, 3] array of |
|
demosaicked raw image data. |
|
meta: EXIF metadata and other useful processing parameters. Includes per |
|
image exposure information that can be passed into the NeRF model with |
|
each ray: the set of unique exposure times is determined and each image |
|
assigned a corresponding exposure index (mapping to an exposure value). |
|
These are keys 'unique_shutters', 'exposure_idx', and 'exposure_value' in |
|
the `meta` dictionary. |
|
We rescale so the maximum `exposure_value` is 1 for convenience. |
|
testscene: True when dataset includes ground truth test image, else False. |
|
""" |
|
|
|
image_dir = os.path.join(data_dir, 'raw') |
|
|
|
testimg_file = os.path.join(data_dir, 'hdrplus_test/merged.dng') |
|
testscene = utils.file_exists(testimg_file) |
|
if testscene: |
|
|
|
image_dir = os.path.join(image_dir, split.value) |
|
if split == utils.DataSplit.TEST: |
|
|
|
image_names = None |
|
else: |
|
|
|
image_names = image_names[1:] |
|
|
|
raws, exifs = load_raw_images(image_dir, image_names) |
|
meta = process_exif(exifs) |
|
|
|
if testscene and split == utils.DataSplit.TEST: |
|
|
|
with utils.open_file(testimg_file, 'rb') as imgin: |
|
testraw = rawpy.imread(imgin).raw_image |
|
|
|
testraw = testraw.astype(np.float32) / 4. |
|
|
|
fast_shutter = meta['ShutterSpeed'][0] |
|
slow_shutter = meta['ShutterSpeed'][-1] |
|
shutter_ratio = fast_shutter / slow_shutter |
|
|
|
raws = testraw[None] |
|
|
|
meta = {k: meta[k][:1] for k in meta} |
|
else: |
|
shutter_ratio = 1. |
|
|
|
|
|
shutter_speeds = meta['ShutterSpeed'] |
|
|
|
|
|
unique_shutters = np.sort(np.unique(shutter_speeds))[::-1] |
|
exposure_idx = np.zeros_like(shutter_speeds, dtype=np.int32) |
|
for i, shutter in enumerate(unique_shutters): |
|
|
|
exposure_idx[shutter_speeds == shutter] = i |
|
meta['exposure_idx'] = exposure_idx |
|
meta['unique_shutters'] = unique_shutters |
|
|
|
|
|
meta['exposure_values'] = shutter_speeds / unique_shutters[0] |
|
|
|
|
|
blacklevel = meta['BlackLevel'].reshape(-1, 1, 1) |
|
whitelevel = meta['WhiteLevel'].reshape(-1, 1, 1) |
|
images = (raws - blacklevel) / (whitelevel - blacklevel) * shutter_ratio |
|
|
|
|
|
|
|
image0_raw_demosaic = np.array(bilinear_demosaic(images[0])) |
|
image0_rgb = image0_raw_demosaic @ meta['cam2rgb'][0].T |
|
exposure = np.percentile(image0_rgb, exposure_percentile) |
|
meta['exposure'] = exposure |
|
|
|
exposure_levels = {p: np.percentile(image0_rgb, p) for p in _PERCENTILE_LIST} |
|
meta['exposure_levels'] = exposure_levels |
|
|
|
|
|
cam2rgb0 = meta['cam2rgb'][0] |
|
meta['postprocess_fn'] = lambda z, x=exposure: postprocess_raw(z, cam2rgb0, x) |
|
|
|
def processing_fn(x): |
|
x_ = np.array(x) |
|
x_demosaic = bilinear_demosaic(x_) |
|
if n_downsample > 1: |
|
x_demosaic = lib_image.downsample(x_demosaic, n_downsample) |
|
return np.array(x_demosaic) |
|
|
|
images = np.stack([processing_fn(im) for im in images], axis=0) |
|
|
|
return images, meta, testscene |
|
|
|
|
|
def best_fit_affine(x, y, axis): |
|
"""Computes best fit a, b such that a * x + b = y, in a least square sense.""" |
|
x_m = x.mean(axis=axis) |
|
y_m = y.mean(axis=axis) |
|
xy_m = (x * y).mean(axis=axis) |
|
xx_m = (x * x).mean(axis=axis) |
|
|
|
a = (xy_m - x_m * y_m) / (xx_m - x_m * x_m) |
|
b = y_m - a * x_m |
|
return a, b |
|
|
|
|
|
def match_images_affine(est, gt, axis=(0, 1)): |
|
"""Computes affine best fit of gt->est, then maps est back to match gt.""" |
|
|
|
a, b = best_fit_affine(gt, est, axis=axis) |
|
|
|
est_matched = (est - b) / a |
|
return est_matched |
|
|