| | |
| | """ |
| | Popescu–Farid CFA Interpolation Forensics (End-to-End) |
| | |
| | Implements the algorithm from: |
| | |
| | A. C. Popescu and H. Farid, |
| | "Exposing Digital Forgeries in Color Filter Array Interpolated Images," |
| | IEEE Trans. Signal Processing, 2005. |
| | |
| | Features: |
| | - EM estimation of the linear correlation model for each color channel |
| | (single-channel Gaussian vs uniform mixture). |
| | - Posterior probability map per channel. |
| | - Synthetic CFA maps s_r, s_g, s_b for a Bayer pattern. |
| | - Fourier-domain similarity M(p_c, s_c) per channel. |
| | - Sliding-window analysis with 50% overlap. |
| | - Threshold calibration on a set of negative images to get ~0% FPs. |
| | - Multi-channel fusion (default): window authentic if ANY channel is CFA; |
| | optional green-only mode: window authentic if GREEN channel is CFA. |
| | |
| | Usage: |
| | |
| | # 1) Calibrate thresholds on negative (non-CFA / tampered) images |
| | python pf_cfa_detector.py calibrate \ |
| | --neg-dir path/to/negative_images \ |
| | --output thresholds.json |
| | |
| | # 2) Detect on a single image (all channels, PF-style fusion) |
| | python pf_cfa_detector.py detect \ |
| | --image path/to/test_image.png \ |
| | --thresholds thresholds.json |
| | |
| | # 3) Detect on a directory, green-only mode |
| | python pf_cfa_detector.py detect \ |
| | --image-dir path/to/test_images \ |
| | --thresholds thresholds.json \ |
| | --green-only |
| | """ |
| |
|
| | import argparse |
| | import json |
| | import math |
| | import os |
| | from typing import Dict, List, Sequence, Tuple |
| |
|
| | import numpy as np |
| | from numpy.fft import fft2 |
| | from PIL import Image |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | IMG_EXTS = (".jpg", ".jpeg", ".png", ".bmp", ".tif", ".tiff") |
| |
|
| |
|
| | def list_image_files(directory: str) -> List[str]: |
| | """Return all image files in a directory (non-recursive) with known extensions.""" |
| | paths = [] |
| | for name in os.listdir(directory): |
| | if name.lower().endswith(IMG_EXTS): |
| | paths.append(os.path.join(directory, name)) |
| | paths.sort() |
| | return paths |
| |
|
| |
|
| | def load_rgb_image(path: str) -> np.ndarray: |
| | """ |
| | Load an image from disk and return as float64 RGB array in [0,1]. |
| | |
| | Parameters |
| | ---------- |
| | path : str |
| | Path to image file. |
| | |
| | Returns |
| | ------- |
| | img : np.ndarray, shape (H, W, 3), dtype float64 |
| | RGB image, intensities in [0, 1]. |
| | """ |
| | im = Image.open(path).convert("RGB") |
| | arr = np.asarray(im, dtype=np.float64) |
| | if arr.ndim != 3 or arr.shape[2] != 3: |
| | raise ValueError(f"Expected an RGB image at {path}") |
| | return arr / 255.0 |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def em_probability_map( |
| | f: np.ndarray, |
| | N: int = 1, |
| | sigma0: float = 0.0075, |
| | p0: float = 1.0 / 256.0, |
| | max_iter: int = 50, |
| | tol: float = 1e-5, |
| | seed: int = 0, |
| | ) -> Tuple[np.ndarray, np.ndarray]: |
| | """ |
| | Estimate the posterior probability map w(x,y) that each sample belongs to |
| | the linearly correlated model M1 via EM (Gaussian vs uniform mixture). |
| | |
| | Model for a single channel f(x,y): |
| | f(x,y) = sum_{u,v} alpha_{u,v} f(x+u, y+v) + n(x,y) |
| | where n(x,y) ~ N(0, sigma^2), alpha_{0,0} = 0. |
| | """ |
| |
|
| | if f.ndim != 2: |
| | raise ValueError("em_probability_map expects a single 2D channel.") |
| |
|
| | f = f.astype(np.float64, copy=False) |
| | H, W = f.shape |
| | if H <= 2 * N or W <= 2 * N: |
| | raise ValueError("Image too small for N = %d neighborhood." % N) |
| |
|
| | |
| | OFFSETS: List[Tuple[int, int]] = [] |
| | for dy in range(-N, N + 1): |
| | for dx in range(-N, N + 1): |
| | if dy == 0 and dx == 0: |
| | continue |
| | OFFSETS.append((dy, dx)) |
| | K = len(OFFSETS) |
| |
|
| | |
| | num_pixels = (H - 2 * N) * (W - 2 * N) |
| | X = np.empty((num_pixels, K), dtype=np.float64) |
| | y = np.empty((num_pixels, 1), dtype=np.float64) |
| |
|
| | idx = 0 |
| | for yy in range(N, H - N): |
| | for xx in range(N, W - N): |
| | y[idx, 0] = f[yy, xx] |
| | for k, (dy, dx) in enumerate(OFFSETS): |
| | X[idx, k] = f[yy + dy, xx + dx] |
| | idx += 1 |
| |
|
| | rng = np.random.default_rng(seed) |
| | alpha = rng.normal(scale=0.01, size=(K, 1)) |
| |
|
| | |
| | EPS_SIGMA = 1e-6 |
| | sigma = float(max(sigma0, EPS_SIGMA)) |
| |
|
| | for _ in range(max_iter): |
| | |
| | r = y - X @ alpha |
| |
|
| | |
| | if sigma < EPS_SIGMA: |
| | sigma = EPS_SIGMA |
| |
|
| | coef = 1.0 / (sigma * math.sqrt(2.0 * math.pi)) |
| | P = coef * np.exp(-0.5 * (r / sigma) ** 2) |
| |
|
| | |
| | w = P / (P + p0) |
| |
|
| | |
| | WX = X * w |
| | A = X.T @ WX |
| | b = X.T @ (w * y) |
| |
|
| | try: |
| | alpha_new = np.linalg.solve(A, b) |
| | except np.linalg.LinAlgError: |
| | alpha_new, *_ = np.linalg.lstsq(A, b, rcond=None) |
| |
|
| | |
| | r = y - X @ alpha_new |
| | num = float(np.sum(w * (r ** 2))) |
| | den = float(np.sum(w)) |
| |
|
| | if den > 0.0: |
| | sigma_new = math.sqrt(num / den) |
| | else: |
| | sigma_new = sigma |
| |
|
| | |
| | if sigma_new < EPS_SIGMA: |
| | sigma_new = EPS_SIGMA |
| |
|
| | |
| | diff = np.linalg.norm(alpha_new - alpha) |
| | norm = np.linalg.norm(alpha) |
| | if norm > 0 and diff < tol * norm: |
| | alpha = alpha_new |
| | sigma = sigma_new |
| | break |
| |
|
| | alpha = alpha_new |
| | sigma = sigma_new |
| |
|
| | |
| | if sigma < EPS_SIGMA: |
| | sigma = EPS_SIGMA |
| |
|
| | r = y - X @ alpha |
| | coef = 1.0 / (sigma * math.sqrt(2.0 * math.pi)) |
| | P = coef * np.exp(-0.5 * (r / sigma) ** 2) |
| | w = P / (P + p0) |
| |
|
| | prob_map = np.zeros_like(f) |
| | idx = 0 |
| | for yy in range(N, H - N): |
| | for xx in range(N, W - N): |
| | prob_map[yy, xx] = w[idx, 0] |
| | idx += 1 |
| |
|
| | return prob_map, alpha.ravel() |
| |
|
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def synthetic_cfa_map( |
| | shape: Tuple[int, int], |
| | channel: str, |
| | pattern: str = "RGGB", |
| | ) -> np.ndarray: |
| | """ |
| | Build the synthetic binary map s_c(x,y) for a Bayer pattern: |
| | |
| | s_c(x,y) = 0 if CFA at (x,y) is color c |
| | = 1 otherwise |
| | |
| | Bayer patterns are specified as a 4-character string: |
| | 'RGGB', 'BGGR', 'GRBG', 'GBRG', etc. |
| | |
| | pattern[0] -> (row%2==0, col%2==0) |
| | pattern[1] -> (row%2==0, col%2==1) |
| | pattern[2] -> (row%2==1, col%2==0) |
| | pattern[3] -> (row%2==1, col%2==1) |
| | """ |
| | H, W = shape |
| | if len(pattern) != 4: |
| | raise ValueError("Bayer pattern string must have length 4, e.g. 'RGGB'.") |
| |
|
| | channel = channel.upper() |
| | if channel not in ("R", "G", "B"): |
| | raise ValueError("channel must be one of 'R', 'G', 'B'.") |
| |
|
| | tile = np.array(list(pattern), dtype="<U1").reshape(2, 2) |
| | s = np.ones((H, W), dtype=np.float64) |
| |
|
| | for y in range(H): |
| | for x in range(W): |
| | cy = y % 2 |
| | cx = x % 2 |
| | if tile[cy, cx] == channel: |
| | s[y, x] = 0.0 |
| |
|
| | return s |
| |
|
| |
|
| | def similarity_measure(prob_map: np.ndarray, synthetic_map: np.ndarray) -> float: |
| | """ |
| | Phase-insensitive similarity between a probability map and its CFA |
| | synthetic map: |
| | |
| | M(p,s) = sum |F(p)| * |F(s)| |
| | """ |
| | if prob_map.shape != synthetic_map.shape: |
| | raise ValueError("prob_map and synthetic_map must have the same shape.") |
| |
|
| | Fp = fft2(prob_map) |
| | Fs = fft2(synthetic_map) |
| | return float(np.sum(np.abs(Fp) * np.abs(Fs))) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def sliding_window_indices(H: int, W: int, window: int) -> List[Tuple[int, int]]: |
| | """ |
| | Generate (y,x) indices for sliding windows with 50% overlap. |
| | stride = window // 2 along each axis. |
| | """ |
| | if window > H or window > W: |
| | return [(0, 0)] |
| |
|
| | stride = max(1, window // 2) |
| | indices = [] |
| | y = 0 |
| | while y + window <= H: |
| | x = 0 |
| | while x + window <= W: |
| | indices.append((y, x)) |
| | x += stride |
| | y += stride |
| | return indices |
| |
|
| |
|
| | def analyze_window( |
| | window: np.ndarray, |
| | pattern: str = "RGGB", |
| | em_kwargs: Dict = None, |
| | ) -> Dict[str, Dict[str, np.ndarray]]: |
| | """ |
| | Run EM + CFA similarity on a single RGB window. |
| | |
| | Returns per-channel: |
| | 'prob_map', 'synthetic', 'M', 'alpha' |
| | """ |
| | if window.ndim != 3 or window.shape[2] != 3: |
| | raise ValueError("Expected RGB window of shape (H,W,3).") |
| |
|
| | if em_kwargs is None: |
| | em_kwargs = {} |
| |
|
| | H, W, _ = window.shape |
| | channels = { |
| | "R": window[:, :, 0], |
| | "G": window[:, :, 1], |
| | "B": window[:, :, 2], |
| | } |
| |
|
| | result: Dict[str, Dict[str, np.ndarray]] = {} |
| | for cname, ch in channels.items(): |
| | prob_map, alpha = em_probability_map(ch, **em_kwargs) |
| | syn = synthetic_cfa_map((H, W), cname, pattern=pattern) |
| | M = similarity_measure(prob_map, syn) |
| | result[cname] = { |
| | "prob_map": prob_map, |
| | "synthetic": syn, |
| | "M": np.array(M, dtype=np.float64), |
| | "alpha": alpha, |
| | } |
| |
|
| | return result |
| |
|
| |
|
| | def analyze_image_windows( |
| | img: np.ndarray, |
| | window: int = 256, |
| | pattern: str = "RGGB", |
| | em_kwargs: Dict = None, |
| | ) -> List[Dict]: |
| | """ |
| | Apply CFA EM analysis to all sliding windows of an image. |
| | |
| | If image is smaller than `window`, the entire image is treated as one window. |
| | """ |
| | H, W, C = img.shape |
| | if C != 3: |
| | raise ValueError("Expected RGB image with 3 channels.") |
| |
|
| | if em_kwargs is None: |
| | em_kwargs = {} |
| |
|
| | if H < window or W < window: |
| | windows = [(0, 0)] |
| | w_h, w_w = H, W |
| | else: |
| | windows = sliding_window_indices(H, W, window) |
| | w_h = w_w = window |
| |
|
| | results = [] |
| | for (yy, xx) in windows: |
| | sub = img[yy : yy + w_h, xx : xx + w_w, :] |
| | res = analyze_window(sub, pattern=pattern, em_kwargs=em_kwargs) |
| | entry = {"y": yy, "x": xx, "h": sub.shape[0], "w": sub.shape[1]} |
| | entry.update(res) |
| | results.append(entry) |
| |
|
| | return results |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def calibrate_thresholds( |
| | negative_image_paths: Sequence[str], |
| | window: int = 256, |
| | pattern: str = "RGGB", |
| | em_kwargs: Dict = None, |
| | ) -> Dict[str, float]: |
| | """ |
| | Estimate per-channel thresholds T_R, T_G, T_B to obtain ~0% false positives |
| | on a negative set (non-CFA / tampered images). |
| | |
| | Threshold per channel is defined as the maximum M value observed for that |
| | channel over all windows of all negative images. |
| | """ |
| | if em_kwargs is None: |
| | em_kwargs = {} |
| |
|
| | Ms = {"R": [], "G": [], "B": []} |
| |
|
| | for path in negative_image_paths: |
| | img = load_rgb_image(path) |
| | window_results = analyze_image_windows( |
| | img, |
| | window=window, |
| | pattern=pattern, |
| | em_kwargs=em_kwargs, |
| | ) |
| | for r in window_results: |
| | Ms["R"].append(float(r["R"]["M"])) |
| | Ms["G"].append(float(r["G"]["M"])) |
| | Ms["B"].append(float(r["B"]["M"])) |
| |
|
| | thresholds: Dict[str, float] = {} |
| | for c in ("R", "G", "B"): |
| | values = Ms[c] |
| | if not values: |
| | raise RuntimeError(f"No M values collected for channel {c}.") |
| | thresholds[c] = max(values) |
| |
|
| | return thresholds |
| |
|
| |
|
| | def classify_windows( |
| | window_results: List[Dict], |
| | thresholds: Dict[str, float], |
| | green_only: bool = False, |
| | ) -> List[Dict]: |
| | """ |
| | Add CFA/tampered labels to each window based on per-channel thresholds. |
| | |
| | If green_only=False (default, PF-style): |
| | channel is CFA-interpolated <=> M_c > T_c |
| | window authentic <=> any channel is CFA-interpolated |
| | |
| | If green_only=True: |
| | channel flags are still computed, but |
| | window authentic <=> GREEN channel is CFA-interpolated. |
| | """ |
| | classified = [] |
| | for r in window_results: |
| | M_R = float(r["R"]["M"]) |
| | M_G = float(r["G"]["M"]) |
| | M_B = float(r["B"]["M"]) |
| |
|
| | chan_cfa = { |
| | "R": M_R > thresholds["R"], |
| | "G": M_G > thresholds["G"], |
| | "B": M_B > thresholds["B"], |
| | } |
| |
|
| | if green_only: |
| | authentic = chan_cfa["G"] |
| | else: |
| | authentic = chan_cfa["R"] or chan_cfa["G"] or chan_cfa["B"] |
| |
|
| | out = dict(r) |
| | out["channel_cfa"] = chan_cfa |
| | out["authentic"] = authentic |
| | classified.append(out) |
| |
|
| | return classified |
| |
|
| |
|
| | def classify_image( |
| | img_path: str, |
| | thresholds: Dict[str, float], |
| | window: int = 256, |
| | pattern: str = "RGGB", |
| | em_kwargs: Dict = None, |
| | green_only: bool = False, |
| | ) -> Dict: |
| | """ |
| | Run full sliding-window Popescu–Farid-style detector on a single image. |
| | |
| | Returns: |
| | { |
| | "image_path": str, |
| | "windows": [...], |
| | "image_authentic": bool, |
| | } |
| | """ |
| | img = load_rgb_image(img_path) |
| | window_results = analyze_image_windows( |
| | img, |
| | window=window, |
| | pattern=pattern, |
| | em_kwargs=em_kwargs, |
| | ) |
| | classified = classify_windows(window_results, thresholds, green_only=green_only) |
| |
|
| | image_authentic = any(w["authentic"] for w in classified) |
| |
|
| | return { |
| | "image_path": img_path, |
| | "windows": classified, |
| | "image_authentic": image_authentic, |
| | } |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def add_em_args(parser: argparse.ArgumentParser) -> None: |
| | """Add EM-related arguments to a subparser.""" |
| | parser.add_argument( |
| | "--N", |
| | type=int, |
| | default=1, |
| | help="Neighborhood radius N for EM (default: 1).", |
| | ) |
| | parser.add_argument( |
| | "--sigma0", |
| | type=float, |
| | default=0.0075, |
| | help="Initial sigma_0 for EM (default: 0.0075).", |
| | ) |
| | parser.add_argument( |
| | "--p0", |
| | type=float, |
| | default=1.0 / 256.0, |
| | help=( |
| | "Outlier likelihood p0. Default 1/256 (PF-style for 8-bit data). " |
| | "For a uniform on [0,1], consider --p0 1.0." |
| | ), |
| | ) |
| | parser.add_argument( |
| | "--max-iter", |
| | type=int, |
| | default=50, |
| | help="Maximum EM iterations (default: 50).", |
| | ) |
| | parser.add_argument( |
| | "--tol", |
| | type=float, |
| | default=1e-5, |
| | help="Relative convergence tolerance on alpha (default: 1e-5).", |
| | ) |
| | parser.add_argument( |
| | "--seed", |
| | type=int, |
| | default=0, |
| | help="Random seed for EM initialization (default: 0).", |
| | ) |
| |
|
| |
|
| | def em_args_to_kwargs(args: argparse.Namespace) -> Dict: |
| | """Convert parsed EM args into kwargs dict for em_probability_map.""" |
| | return dict( |
| | N=args.N, |
| | sigma0=args.sigma0, |
| | p0=args.p0, |
| | max_iter=args.max_iter, |
| | tol=args.tol, |
| | seed=args.seed, |
| | ) |
| |
|
| |
|
| | def main(): |
| | parser = argparse.ArgumentParser( |
| | description="Popescu–Farid CFA interpolation detector (EM + spectral similarity)." |
| | ) |
| | subparsers = parser.add_subparsers(dest="command", required=True) |
| |
|
| | |
| | cal_parser = subparsers.add_parser( |
| | "calibrate", |
| | help="Calibrate per-channel thresholds on negative (non-CFA) images.", |
| | ) |
| | cal_parser.add_argument( |
| | "--neg-dir", |
| | required=True, |
| | help="Directory with negative (non-CFA / tampered) images for calibration.", |
| | ) |
| | cal_parser.add_argument( |
| | "--window", |
| | type=int, |
| | default=256, |
| | help="Window size (square) for analysis (default: 256).", |
| | ) |
| | cal_parser.add_argument( |
| | "--pattern", |
| | type=str, |
| | default="RGGB", |
| | help="Bayer pattern string (default: RGGB).", |
| | ) |
| | cal_parser.add_argument( |
| | "--output", |
| | type=str, |
| | default=None, |
| | help="Path to JSON file to save thresholds (optional).", |
| | ) |
| | add_em_args(cal_parser) |
| |
|
| | |
| | det_parser = subparsers.add_parser( |
| | "detect", |
| | help="Run detector on images using pre-calibrated thresholds.", |
| | ) |
| | det_parser.add_argument( |
| | "--image", |
| | type=str, |
| | default=None, |
| | help="Path to a single image.", |
| | ) |
| | det_parser.add_argument( |
| | "--image-dir", |
| | type=str, |
| | default=None, |
| | help="Directory with images to process.", |
| | ) |
| | det_parser.add_argument( |
| | "--thresholds", |
| | type=str, |
| | required=True, |
| | help="Path to JSON file with thresholds from 'calibrate'.", |
| | ) |
| | det_parser.add_argument( |
| | "--window", |
| | type=int, |
| | default=256, |
| | help="Window size (square) for analysis (default: 256).", |
| | ) |
| | det_parser.add_argument( |
| | "--pattern", |
| | type=str, |
| | default="RGGB", |
| | help="Bayer pattern string (default: RGGB).", |
| | ) |
| | det_parser.add_argument( |
| | "--green-only", |
| | action="store_true", |
| | help="Use only GREEN channel for window/image decisions.", |
| | ) |
| | add_em_args(det_parser) |
| |
|
| | args = parser.parse_args() |
| |
|
| | if args.command == "calibrate": |
| | neg_files = list_image_files(args.neg_dir) |
| | if not neg_files: |
| | raise SystemExit(f"No images found in negative directory: {args.neg_dir}") |
| |
|
| | em_kwargs = em_args_to_kwargs(args) |
| | thresholds = calibrate_thresholds( |
| | negative_image_paths=neg_files, |
| | window=args.window, |
| | pattern=args.pattern, |
| | em_kwargs=em_kwargs, |
| | ) |
| |
|
| | |
| | print("# Calibrated thresholds (0%% FPs on provided negatives):") |
| | for c in ("R", "G", "B"): |
| | print(f"T_{c} = {thresholds[c]:.6e}") |
| |
|
| | |
| | if args.output is not None: |
| | out_obj = { |
| | "thresholds": thresholds, |
| | "pattern": args.pattern, |
| | "window": args.window, |
| | "em_params": em_kwargs, |
| | } |
| | with open(args.output, "w", encoding="utf-8") as f: |
| | json.dump(out_obj, f, indent=2) |
| | print(f"# Saved thresholds to {args.output}") |
| |
|
| | elif args.command == "detect": |
| | |
| | with open(args.thresholds, "r", encoding="utf-8") as f: |
| | data = json.load(f) |
| |
|
| | if "thresholds" in data: |
| | thresholds = data["thresholds"] |
| | else: |
| | thresholds = data |
| |
|
| | |
| | for c in ("R", "G", "B"): |
| | if c not in thresholds: |
| | raise SystemExit(f"Thresholds JSON missing channel '{c}'.") |
| |
|
| | |
| | images: List[str] = [] |
| | if args.image is not None and args.image_dir is not None: |
| | raise SystemExit("Specify either --image or --image-dir, not both.") |
| | elif args.image is not None: |
| | images = [args.image] |
| | elif args.image_dir is not None: |
| | images = list_image_files(args.image_dir) |
| | if not images: |
| | raise SystemExit( |
| | f"No images found in directory: {args.image_dir}" |
| | ) |
| | else: |
| | raise SystemExit("You must specify either --image or --image-dir.") |
| |
|
| | em_kwargs = em_args_to_kwargs(args) |
| | green_only = bool(args.green_only) |
| |
|
| | for img_path in images: |
| | result = classify_image( |
| | img_path, |
| | thresholds=thresholds, |
| | window=args.window, |
| | pattern=args.pattern, |
| | em_kwargs=em_kwargs, |
| | green_only=green_only, |
| | ) |
| |
|
| | label = "AUTHENTIC" if result["image_authentic"] else "TAMPERED" |
| | mode = "GREEN_ONLY" if green_only else "ALL_CHANNELS" |
| | print(f"\n# Image: {result['image_path']}") |
| | print(f"# Overall label: {label} (mode={mode})") |
| | print("# y x h w M_R M_G M_B CFA_R CFA_G CFA_B WINDOW_LABEL") |
| |
|
| | for w in result["windows"]: |
| | y = w["y"] |
| | x = w["x"] |
| | h = w["h"] |
| | wd = w["w"] |
| | M_R = float(w["R"]["M"]) |
| | M_G = float(w["G"]["M"]) |
| | M_B = float(w["B"]["M"]) |
| | cfaR = w["channel_cfa"]["R"] |
| | cfaG = w["channel_cfa"]["G"] |
| | cfaB = w["channel_cfa"]["B"] |
| | wlabel = "AUTH" if w["authentic"] else "TAMP" |
| |
|
| | print( |
| | f"{y:5d} {x:5d} {h:4d} {wd:4d} " |
| | f"{M_R:.6e} {M_G:.6e} {M_B:.6e} " |
| | f"{int(cfaR)} {int(cfaG)} {int(cfaB)} {wlabel}" |
| | ) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|