import glob |
import json |
import os |
from internal import image as lib_image |
from internal import math |
from internal import utils |
import numpy as np |
import rawpy |
def postprocess_raw(raw, camtorgb, exposure=None): |
"""Converts demosaicked raw to sRGB with a minimal postprocessing pipeline. |
Args: |
raw: [H, W, 3], demosaicked raw camera image. |
camtorgb: [3, 3], color correction transformation to apply to raw image. |
exposure: color value to be scaled to pure white after color correction. |
If None, "autoexposes" at the 97th percentile. |
Returns: |
srgb: [H, W, 3], color corrected + exposed + gamma mapped image. |
""" |
if raw.shape[-1] != 3: |
raise ValueError(f'raw.shape[-1] is {raw.shape[-1]}, expected 3') |
if camtorgb.shape != (3, 3): |
raise ValueError(f'camtorgb.shape is {camtorgb.shape}, expected (3, 3)') |
rgb_linear = np.matmul(raw, camtorgb.T) |
if exposure is None: |
exposure = np.percentile(rgb_linear, 97) |
rgb_linear_scaled = np.clip(rgb_linear / exposure, 0, 1) |
srgb = lib_image.linear_to_srgb_np(rgb_linear_scaled) |
return srgb |
def pixels_to_bayer_mask(pix_x, pix_y): |
"""Computes binary RGB Bayer mask values from integer pixel coordinates.""" |
r = (pix_x % 2 == 0) * (pix_y % 2 == 0) |
g = (pix_x % 2 == 1) * (pix_y % 2 == 0) + (pix_x % 2 == 0) * (pix_y % 2 == 1) |
b = (pix_x % 2 == 1) * (pix_y % 2 == 1) |
return np.stack([r, g, b], -1).astype(np.float32) |
def bilinear_demosaic(bayer): |
"""Converts Bayer data into a full RGB image using bilinear demosaicking. |
Input data should be ndarray of shape [height, width] with 2x2 mosaic pattern: |
------------- |
|red |green| |
------------- |
|green|blue | |
------------- |
Red and blue channels are bilinearly upsampled 2x, missing green channel |
elements are the average of the neighboring 4 values in a cross pattern. |
Args: |
bayer: [H, W] array, Bayer mosaic pattern input image. |
Returns: |
rgb: [H, W, 3] array, full RGB image. |
""" |
def reshape_quads(*planes): |
"""Reshape pixels from four input images to make tiled 2x2 quads.""" |
planes = np.stack(planes, -1) |
shape = planes.shape[:-1] |
zup = planes.reshape(shape + (2, 2,)) |
zup = np.transpose(zup, (0, 2, 1, 3)) |
zup = zup.reshape((shape[0] * 2, shape[1] * 2)) |
return zup |
def bilinear_upsample(z): |
"""2x bilinear image upsample.""" |
zx = .5 * (z + np.roll(z, -1, axis=-1)) |
zy = .5 * (z + np.roll(z, -1, axis=-2)) |
zxy = .5 * (zx + np.roll(zx, -1, axis=-2)) |
return reshape_quads(z, zx, zy, zxy) |
def upsample_green(g1, g2): |
"""Special 2x upsample from the two green channels.""" |
z = np.zeros_like(g1) |
z = reshape_quads(z, g1, g2, z) |
alt = 0 |
for i in range(4): |
axis = -1 - (i // 2) |
roll = -1 + 2 * (i % 2) |
alt = alt + .25 * np.roll(z, roll, axis=axis) |
return alt + z |
r, g1, g2, b = [bayer[(i // 2)::2, (i % 2)::2] for i in range(4)] |
r = bilinear_upsample(r) |
b = bilinear_upsample(b[::-1, ::-1])[::-1, ::-1] |
g = upsample_green(g1, g2) |
rgb = np.stack([r, g, b], -1) |
return rgb |
def load_raw_images(image_dir, image_names=None): |
"""Loads raw images and their metadata from disk. |
Args: |
image_dir: directory containing raw image and EXIF data. |
image_names: files to load (ignores file extension), loads all DNGs if None. |
Returns: |
A tuple (images, exifs). |
images: [N, height, width, 3] array of raw sensor data. |
exifs: [N] list of dicts, one per image, containing the EXIF data. |
Raises: |
ValueError: The requested `image_dir` does not exist on disk. |
""" |
if not utils.file_exists(image_dir): |
raise ValueError(f'Raw image folder {image_dir} does not exist.') |
def load_raw_exif(image_name): |
base = os.path.join(image_dir, os.path.splitext(image_name)[0]) |
with utils.open_file(base + '.dng', 'rb') as f: |
raw = rawpy.imread(f).raw_image |
with utils.open_file(base + '.json', 'rb') as f: |
exif = json.load(f)[0] |
return raw, exif |
if image_names is None: |
image_names = [ |
os.path.basename(f) |
for f in sorted(glob.glob(os.path.join(image_dir, '*.dng'))) |
] |
data = [load_raw_exif(x) for x in image_names] |
raws, exifs = zip(*data) |
raws = np.stack(raws, axis=0).astype(np.float32) |
return raws, exifs |
_PERCENTILE_LIST = (80, 90, 97, 99, 100) |
_EXIF_KEYS = ( |
'BlackLevel', |
'WhiteLevel', |
'AsShotNeutral', |
'ColorMatrix2', |
'NoiseProfile', |
) |
_RGB2XYZ = np.array([[0.4124564, 0.3575761, 0.1804375], |
[0.2126729, 0.7151522, 0.0721750], |
[0.0193339, 0.1191920, 0.9503041]]) |
def process_exif(exifs): |
"""Processes list of raw image EXIF data into useful metadata dict. |
Input should be a list of dictionaries loaded from JSON files. |
These JSON files are produced by running |
$ exiftool -json IMAGE.dng > IMAGE.json |
for each input raw file. |
We extract only the parameters relevant to |
1. Rescaling the raw data to [0, 1], |
2. White balance and color correction, and |
3. Noise level estimation. |
Args: |
exifs: a list of dicts containing EXIF data as loaded from JSON files. |
Returns: |
meta: a dict of the relevant metadata for running RawNeRF. |
""" |
meta = {} |
exif = exifs[0] |
for key in _EXIF_KEYS: |
exif_value = exif.get(key) |
if exif_value is None: |
continue |
if isinstance(exif_value, int) or isinstance(exif_value, float): |
vals = [x[key] for x in exifs] |
elif isinstance(exif_value, str): |
vals = [[float(z) for z in x[key].split(' ')] for x in exifs] |
meta[key] = np.squeeze(np.array(vals)) |
meta['ShutterSpeed'] = np.fromiter( |
(1. / float(exif['ShutterSpeed'].split('/')[1]) for exif in exifs), float) |
whitebalance = meta['AsShotNeutral'].reshape(-1, 3) |
cam2camwb = np.array([np.diag(1. / x) for x in whitebalance]) |
xyz2camwb = meta['ColorMatrix2'].reshape(-1, 3, 3) |
rgb2camwb = xyz2camwb @ _RGB2XYZ |
rgb2camwb /= rgb2camwb.sum(axis=-1, keepdims=True) |
cam2rgb = np.linalg.inv(rgb2camwb) @ cam2camwb |
meta['cam2rgb'] = cam2rgb |
return meta |
def load_raw_dataset(split, data_dir, image_names, exposure_percentile, n_downsample): |
"""Loads and processes a set of RawNeRF input images. |
Includes logic necessary for special "test" scenes that include a noiseless |
ground truth frame, produced by HDR+ merge. |
Args: |
split: DataSplit.TRAIN or DataSplit.TEST, only used for test scene logic. |
data_dir: base directory for scene data. |
image_names: which images were successfully posed by COLMAP. |
exposure_percentile: what brightness percentile to expose to white. |
n_downsample: returned images are downsampled by a factor of n_downsample. |
Returns: |
A tuple (images, meta, testscene). |
images: [N, height // n_downsample, width // n_downsample, 3] array of |
demosaicked raw image data. |
meta: EXIF metadata and other useful processing parameters. Includes per |
image exposure information that can be passed into the NeRF model with |
each ray: the set of unique exposure times is determined and each image |
assigned a corresponding exposure index (mapping to an exposure value). |
These are keys 'unique_shutters', 'exposure_idx', and 'exposure_value' in |
the `meta` dictionary. |
We rescale so the maximum `exposure_value` is 1 for convenience. |
testscene: True when dataset includes ground truth test image, else False. |
""" |
image_dir = os.path.join(data_dir, 'raw') |
testimg_file = os.path.join(data_dir, 'hdrplus_test/merged.dng') |
testscene = utils.file_exists(testimg_file) |
if testscene: |
image_dir = os.path.join(image_dir, split.value) |
if split == utils.DataSplit.TEST: |
image_names = None |
else: |
image_names = image_names[1:] |
raws, exifs = load_raw_images(image_dir, image_names) |
meta = process_exif(exifs) |
if testscene and split == utils.DataSplit.TEST: |
with utils.open_file(testimg_file, 'rb') as imgin: |
testraw = rawpy.imread(imgin).raw_image |
testraw = testraw.astype(np.float32) / 4. |
fast_shutter = meta['ShutterSpeed'][0] |
slow_shutter = meta['ShutterSpeed'][-1] |
shutter_ratio = fast_shutter / slow_shutter |
raws = testraw[None] |
meta = {k: meta[k][:1] for k in meta} |
else: |
shutter_ratio = 1. |
shutter_speeds = meta['ShutterSpeed'] |
unique_shutters = np.sort(np.unique(shutter_speeds))[::-1] |
exposure_idx = np.zeros_like(shutter_speeds, dtype=np.int32) |
for i, shutter in enumerate(unique_shutters): |
exposure_idx[shutter_speeds == shutter] = i |
meta['exposure_idx'] = exposure_idx |
meta['unique_shutters'] = unique_shutters |
meta['exposure_values'] = shutter_speeds / unique_shutters[0] |
blacklevel = meta['BlackLevel'].reshape(-1, 1, 1) |
whitelevel = meta['WhiteLevel'].reshape(-1, 1, 1) |
images = (raws - blacklevel) / (whitelevel - blacklevel) * shutter_ratio |
image0_raw_demosaic = np.array(bilinear_demosaic(images[0])) |
image0_rgb = image0_raw_demosaic @ meta['cam2rgb'][0].T |
exposure = np.percentile(image0_rgb, exposure_percentile) |
meta['exposure'] = exposure |
exposure_levels = {p: np.percentile(image0_rgb, p) for p in _PERCENTILE_LIST} |
meta['exposure_levels'] = exposure_levels |
cam2rgb0 = meta['cam2rgb'][0] |
meta['postprocess_fn'] = lambda z, x=exposure: postprocess_raw(z, cam2rgb0, x) |
def processing_fn(x): |
x_ = np.array(x) |
x_demosaic = bilinear_demosaic(x_) |
if n_downsample > 1: |
x_demosaic = lib_image.downsample(x_demosaic, n_downsample) |
return np.array(x_demosaic) |
images = np.stack([processing_fn(im) for im in images], axis=0) |
return images, meta, testscene |
def best_fit_affine(x, y, axis): |
"""Computes best fit a, b such that a * x + b = y, in a least square sense.""" |
x_m = x.mean(axis=axis) |
y_m = y.mean(axis=axis) |
xy_m = (x * y).mean(axis=axis) |
xx_m = (x * x).mean(axis=axis) |
a = (xy_m - x_m * y_m) / (xx_m - x_m * x_m) |
b = y_m - a * x_m |
return a, b |
def match_images_affine(est, gt, axis=(0, 1)): |
"""Computes affine best fit of gt->est, then maps est back to match gt.""" |
a, b = best_fit_affine(gt, est, axis=axis) |
est_matched = (est - b) / a |
return est_matched |