LPX
Refactored for Gradio / HF Spaces usage.
5e8be73
"""
camera_pipeline.py
Functions for simulating a realistic camera pipeline, including Bayer mosaic/demosaic,
chromatic aberration, vignette, sensor noise, hot pixels, banding, motion blur, and JPEG recompression.
"""
from io import BytesIO
from PIL import Image
import numpy as np
try:
import cv2
_HAS_CV2 = True
except Exception:
cv2 = None
_HAS_CV2 = False
from scipy.ndimage import convolve
def _bayer_mosaic(img: np.ndarray, pattern='RGGB') -> np.ndarray:
"""Create a single-channel Bayer mosaic from an RGB image.
pattern currently supports 'RGGB' (most common). Returns uint8 2D array.
"""
h, w = img.shape[:2]
mosaic = np.zeros((h, w), dtype=np.uint8)
# pattern mapping for RGGB:
# (0,0) R, (0,1) G
# (1,0) G, (1,1) B
R = img[:, :, 0]
G = img[:, :, 1]
B = img[:, :, 2]
# fill mosaic according to RGGB
mosaic[0::2, 0::2] = R[0::2, 0::2]
mosaic[0::2, 1::2] = G[0::2, 1::2]
mosaic[1::2, 0::2] = G[1::2, 0::2]
mosaic[1::2, 1::2] = B[1::2, 1::2]
return mosaic
def _demosaic_bilinear(mosaic: np.ndarray) -> np.ndarray:
"""Simple bilinear demosaic fallback (no cv2). Outputs RGB uint8 image.
Not perfect but good enough to add demosaic artifacts.
"""
h, w = mosaic.shape
# Work in float to avoid overflow
m = mosaic.astype(np.float32)
# We'll compute each channel by averaging available mosaic samples
R = np.zeros_like(m)
G = np.zeros_like(m)
B = np.zeros_like(m)
# RGGB pattern
R[0::2, 0::2] = m[0::2, 0::2]
G[0::2, 1::2] = m[0::2, 1::2]
G[1::2, 0::2] = m[1::2, 0::2]
B[1::2, 1::2] = m[1::2, 1::2]
# Convolution kernels for interpolation (simple)
k_cross = np.array([[0, 1, 0], [1, 4, 1], [0, 1, 0]], dtype=np.float32) / 8.0
k_diag = np.array([[1, 0, 1], [0, 0, 0], [1, 0, 1]], dtype=np.float32) / 4.0
# convolve using scipy.ndimage.convolve
R_interp = convolve(R, k_cross, mode='mirror')
G_interp = convolve(G, k_cross, mode='mirror')
B_interp = convolve(B, k_cross, mode='mirror')
out = np.stack((R_interp, G_interp, B_interp), axis=2)
out = np.clip(out, 0, 255).astype(np.uint8)
return out
def _apply_chromatic_aberration(img: np.ndarray, strength=1.0, seed=None):
"""Shift R and B channels slightly in opposite directions to emulate CA.
strength is in pixels (float). Uses cv2.warpAffine if available; integer
fallback uses np.roll.
"""
if seed is not None:
rng = np.random.default_rng(seed)
else:
rng = np.random.default_rng()
h, w = img.shape[:2]
max_shift = max(1.0, strength)
# small random subpixel shift sampled from normal distribution
shift_r = rng.normal(loc=0.0, scale=max_shift * 0.6)
shift_b = rng.normal(loc=0.0, scale=max_shift * 0.6)
# apply opposite horizontal shifts to R and B for lateral CA
r_x = shift_r
r_y = rng.normal(scale=0.3 * abs(shift_r))
b_x = -shift_b
b_y = rng.normal(scale=0.3 * abs(shift_b))
out = img.copy().astype(np.float32)
if _HAS_CV2:
def warp_channel(ch, tx, ty):
M = np.array([[1, 0, tx], [0, 1, ty]], dtype=np.float32)
return cv2.warpAffine(ch, M, (w, h), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REFLECT)
out[:, :, 0] = warp_channel(out[:, :, 0], r_x, r_y)
out[:, :, 2] = warp_channel(out[:, :, 2], b_x, b_y)
else:
# integer fallback
ix_r = int(round(r_x))
iy_r = int(round(r_y))
ix_b = int(round(b_x))
iy_b = int(round(b_y))
out[:, :, 0] = np.roll(out[:, :, 0], shift=(iy_r, ix_r), axis=(0, 1))
out[:, :, 2] = np.roll(out[:, :, 2], shift=(iy_b, ix_b), axis=(0, 1))
out = np.clip(out, 0, 255).astype(np.uint8)
return out
def _apply_vignette(img: np.ndarray, strength=0.4):
h, w = img.shape[:2]
y = np.linspace(-1, 1, h)[:, None]
x = np.linspace(-1, 1, w)[None, :]
r = np.sqrt(x * x + y * y)
mask = 1.0 - (r ** 2) * strength
mask = np.clip(mask, 0.0, 1.0)
out = (img.astype(np.float32) * mask[:, :, None])
out = np.clip(out, 0, 255).astype(np.uint8)
return out
def _add_poisson_gaussian_noise(img: np.ndarray, iso_scale=1.0, read_noise_std=2.0, seed=None):
"""Poisson-Gaussian sensor noise model.
iso_scale scales the signal before Poisson sampling (higher -> more Poisson),
read_noise_std is the sigma (in DN) of additive Gaussian read noise.
"""
if seed is not None:
rng = np.random.default_rng(seed)
else:
rng = np.random.default_rng()
img_f = img.astype(np.float32)
# scale to simulate exposure/iso
scaled = img_f * iso_scale
# Poisson: we need integer counts; scale to a reasonable photon budget
# choose scale so that typical pixel values map to ~[0..2000] photons
photon_scale = 4.0
lam = np.clip(scaled * photon_scale, 0, 1e6)
noisy = rng.poisson(lam).astype(np.float32) / photon_scale
# add read noise
noisy += rng.normal(loc=0.0, scale=read_noise_std, size=noisy.shape)
noisy = np.clip(noisy, 0, 255).astype(np.uint8)
return noisy
def _add_hot_pixels_and_banding(img: np.ndarray, hot_pixel_prob=1e-6, banding_strength=0.0, seed=None):
if seed is not None:
rng = np.random.default_rng(seed)
else:
rng = np.random.default_rng()
h, w = img.shape[:2]
out = img.copy().astype(np.float32)
# hot pixels
n_pixels = int(h * w * hot_pixel_prob)
if n_pixels > 0:
ys = rng.integers(0, h, size=n_pixels)
xs = rng.integers(0, w, size=n_pixels)
vals = rng.integers(200, 256, size=n_pixels)
for y, x, v in zip(ys, xs, vals):
out[y, x, :] = v
# banding: add low-amplitude sinusoidal horizontal banding
if banding_strength > 0.0:
rows = np.arange(h)[:, None]
band = (np.sin(rows * 0.5) * 255.0 * banding_strength)
out += band[:, :, None]
out = np.clip(out, 0, 255).astype(np.uint8)
return out
def _motion_blur(img: np.ndarray, kernel_size=5):
if kernel_size <= 1:
return img
# simple linear motion kernel horizontally
kernel = np.zeros((kernel_size, kernel_size), dtype=np.float32)
kernel[kernel_size // 2, :] = 1.0 / kernel_size
out = np.zeros_like(img)
for c in range(3):
out[:, :, c] = convolve(img[:, :, c].astype(np.float32), kernel, mode='mirror')
out = np.clip(out, 0, 255).astype(np.uint8)
return out
def _jpeg_recompress(img: np.ndarray, quality=90) -> np.ndarray:
pil = Image.fromarray(img)
buf = BytesIO()
pil.save(buf, format='JPEG', quality=int(quality), optimize=False)
buf.seek(0)
rec = Image.open(buf).convert('RGB')
return np.array(rec)
def simulate_camera_pipeline(img_arr: np.ndarray,
bayer=True,
jpeg_cycles=1,
jpeg_quality_range=(88, 96),
vignette_strength=0.35,
chroma_aberr_strength=1.2,
iso_scale=1.0,
read_noise_std=2.0,
hot_pixel_prob=1e-6,
banding_strength=0.0,
motion_blur_kernel=1,
seed=None):
"""Apply a set of realistic camera/capture artifacts to img_arr (RGB uint8).
Returns an RGB uint8 image.
"""
if seed is not None:
rng = np.random.default_rng(seed)
else:
rng = np.random.default_rng()
out = img_arr.copy()
# 1) Bayer mosaic + demosaic (if enabled)
if bayer:
try:
# Build mosaic from the RGB image (no channel reversal). Previously the code
# reversed channels here which caused R/B swapping and strong green tint.
mosaic = _bayer_mosaic(out)
if _HAS_CV2:
# cv2 expects a single-channel Bayer and provides demosaicing codes
# We'll use RGGB code (COLOR_BAYER_RG2BGR) so convert back to RGB after
dem = cv2.demosaicing(mosaic, cv2.COLOR_BAYER_RG2BGR)
# cv2 returns BGR
dem = dem[:, :, ::-1]
out = dem
else:
out = _demosaic_bilinear(mosaic)
except Exception:
# if anything fails, keep original
out = img_arr.copy()
# 2) chromatic aberration
out = _apply_chromatic_aberration(out, strength=chroma_aberr_strength, seed=seed)
# 3) vignette
out = _apply_vignette(out, strength=vignette_strength)
# 4) noise (Poisson-Gaussian)
out = _add_poisson_gaussian_noise(out, iso_scale=iso_scale, read_noise_std=read_noise_std, seed=seed)
# 5) hot pixels and banding
out = _add_hot_pixels_and_banding(out, hot_pixel_prob=hot_pixel_prob, banding_strength=banding_strength, seed=seed)
# 6) motion blur
if motion_blur_kernel and motion_blur_kernel > 1:
out = _motion_blur(out, kernel_size=motion_blur_kernel)
# 7) JPEG recompression cycles
for i in range(max(1, int(jpeg_cycles))):
q = int(rng.integers(jpeg_quality_range[0], jpeg_quality_range[1] + 1))
out = _jpeg_recompress(out, quality=q)
return out