| |
| """ |
| GPU-accelerated image processing using OpenCV CUDA (cv2.cuda.GpuMat). |
| |
| Provides drop-in replacements for common cv2 functions. When OpenCV is built |
| with CUDA support the functions transparently upload β process β download via |
| GpuMat; otherwise they fall back to the regular CPU path so the rest of the |
| codebase never has to care whether CUDA is available. |
| |
| Usage |
| ----- |
| from modules.gpu_processing import ( |
| gpu_gaussian_blur, gpu_sharpen, gpu_add_weighted, |
| gpu_resize, gpu_cvt_color, gpu_flip, |
| is_gpu_accelerated, |
| ) |
| """ |
|
|
| from __future__ import annotations |
|
|
| import cv2 |
| import numpy as np |
| from typing import Tuple, Optional |
|
|
| |
| |
| |
| CUDA_AVAILABLE: bool = False |
|
|
| try: |
| |
| _test_mat = cv2.cuda.GpuMat() |
| |
| _has_gauss = hasattr(cv2.cuda, "createGaussianFilter") |
| _has_resize = hasattr(cv2.cuda, "resize") |
| _has_cvt = hasattr(cv2.cuda, "cvtColor") |
| if _has_gauss and _has_resize and _has_cvt: |
| CUDA_AVAILABLE = True |
| print("[gpu_processing] OpenCV CUDA support detected β GPU-accelerated processing enabled.") |
| else: |
| missing = [] |
| if not _has_gauss: |
| missing.append("createGaussianFilter") |
| if not _has_resize: |
| missing.append("resize") |
| if not _has_cvt: |
| missing.append("cvtColor") |
| print(f"[gpu_processing] cv2.cuda.GpuMat exists but missing: {', '.join(missing)} β falling back to CPU.") |
| except Exception: |
| print("[gpu_processing] OpenCV CUDA not available β using CPU fallback for all operations.") |
|
|
|
|
| |
| |
| |
|
|
| def _ensure_uint8(img: np.ndarray) -> np.ndarray: |
| """Clip and convert to uint8 if necessary.""" |
| if img.dtype != np.uint8: |
| return np.clip(img, 0, 255).astype(np.uint8) |
| return img |
|
|
|
|
| def _ksize_odd(ksize: Tuple[int, int]) -> Tuple[int, int]: |
| """Ensure kernel dimensions are positive and odd (required by GaussianBlur).""" |
| kw = max(1, ksize[0] // 2 * 2 + 1) if ksize[0] > 0 else 0 |
| kh = max(1, ksize[1] // 2 * 2 + 1) if ksize[1] > 0 else 0 |
| return (kw, kh) |
|
|
|
|
| def _cv_type_for(img: np.ndarray) -> int: |
| """Return the OpenCV type constant matching *img* (uint8 only).""" |
| channels = 1 if img.ndim == 2 else img.shape[2] |
| if channels == 1: |
| return cv2.CV_8UC1 |
| elif channels == 3: |
| return cv2.CV_8UC3 |
| elif channels == 4: |
| return cv2.CV_8UC4 |
| return cv2.CV_8UC3 |
|
|
|
|
| |
| |
| |
|
|
| def gpu_gaussian_blur( |
| src: np.ndarray, |
| ksize: Tuple[int, int], |
| sigma_x: float, |
| sigma_y: float = 0, |
| ) -> np.ndarray: |
| """Drop-in replacement for ``cv2.GaussianBlur`` with CUDA acceleration. |
| |
| Parameters match ``cv2.GaussianBlur(src, ksize, sigmaX, sigmaY)``. |
| When *ksize* is ``(0, 0)`` OpenCV computes the kernel size from *sigma_x*. |
| """ |
| if CUDA_AVAILABLE: |
| try: |
| src_u8 = _ensure_uint8(src) |
| cv_type = _cv_type_for(src_u8) |
| ks = _ksize_odd(ksize) if ksize != (0, 0) else ksize |
|
|
| gauss = cv2.cuda.createGaussianFilter(cv_type, cv_type, ks, sigma_x, sigma_y) |
| gpu_src = cv2.cuda.GpuMat() |
| gpu_src.upload(src_u8) |
| gpu_dst = gauss.apply(gpu_src) |
| return gpu_dst.download() |
| except cv2.error: |
| pass |
|
|
| return cv2.GaussianBlur(src, ksize, sigma_x, sigmaY=sigma_y) |
|
|
|
|
| |
| |
| |
|
|
| def gpu_add_weighted( |
| src1: np.ndarray, |
| alpha: float, |
| src2: np.ndarray, |
| beta: float, |
| gamma: float, |
| ) -> np.ndarray: |
| """Drop-in replacement for ``cv2.addWeighted`` with CUDA acceleration.""" |
| if CUDA_AVAILABLE: |
| try: |
| s1 = _ensure_uint8(src1) |
| s2 = _ensure_uint8(src2) |
| g1 = cv2.cuda.GpuMat() |
| g2 = cv2.cuda.GpuMat() |
| g1.upload(s1) |
| g2.upload(s2) |
| gpu_dst = cv2.cuda.addWeighted(g1, alpha, g2, beta, gamma) |
| return gpu_dst.download() |
| except cv2.error: |
| pass |
|
|
| return cv2.addWeighted(src1, alpha, src2, beta, gamma) |
|
|
|
|
| |
| |
| |
|
|
| def gpu_sharpen( |
| src: np.ndarray, |
| strength: float, |
| sigma: float = 3, |
| ) -> np.ndarray: |
| """Unsharp-mask sharpening, optionally GPU-accelerated. |
| |
| Equivalent to:: |
| |
| blurred = GaussianBlur(src, (0,0), sigma) |
| result = addWeighted(src, 1+strength, blurred, -strength, 0) |
| """ |
| if strength <= 0: |
| return src |
|
|
| if CUDA_AVAILABLE: |
| try: |
| src_u8 = _ensure_uint8(src) |
| cv_type = _cv_type_for(src_u8) |
|
|
| gauss = cv2.cuda.createGaussianFilter(cv_type, cv_type, (0, 0), sigma) |
| gpu_src = cv2.cuda.GpuMat() |
| gpu_src.upload(src_u8) |
| gpu_blurred = gauss.apply(gpu_src) |
| gpu_sharp = cv2.cuda.addWeighted(gpu_src, 1.0 + strength, gpu_blurred, -strength, 0) |
| result = gpu_sharp.download() |
| return np.clip(result, 0, 255).astype(np.uint8) |
| except cv2.error: |
| pass |
|
|
| blurred = cv2.GaussianBlur(src, (0, 0), sigma) |
| sharpened = cv2.addWeighted(src, 1.0 + strength, blurred, -strength, 0) |
| return np.clip(sharpened, 0, 255).astype(np.uint8) |
|
|
|
|
| |
| |
| |
|
|
| |
| _INTERP_MAP = { |
| cv2.INTER_NEAREST: cv2.INTER_NEAREST, |
| cv2.INTER_LINEAR: cv2.INTER_LINEAR, |
| cv2.INTER_CUBIC: cv2.INTER_CUBIC, |
| cv2.INTER_AREA: cv2.INTER_AREA, |
| cv2.INTER_LANCZOS4: cv2.INTER_LANCZOS4, |
| } |
|
|
|
|
| def gpu_resize( |
| src: np.ndarray, |
| dsize: Tuple[int, int], |
| fx: float = 0, |
| fy: float = 0, |
| interpolation: int = cv2.INTER_LINEAR, |
| ) -> np.ndarray: |
| """Drop-in replacement for ``cv2.resize`` with CUDA acceleration. |
| |
| Parameters match ``cv2.resize(src, dsize, fx=fx, fy=fy, interpolation=...)``. |
| """ |
| if CUDA_AVAILABLE: |
| try: |
| src_u8 = _ensure_uint8(src) |
| gpu_src = cv2.cuda.GpuMat() |
| gpu_src.upload(src_u8) |
|
|
| interp = _INTERP_MAP.get(interpolation, cv2.INTER_LINEAR) |
|
|
| if dsize and dsize[0] > 0 and dsize[1] > 0: |
| gpu_dst = cv2.cuda.resize(gpu_src, dsize, interpolation=interp) |
| else: |
| gpu_dst = cv2.cuda.resize(gpu_src, (0, 0), fx=fx, fy=fy, interpolation=interp) |
|
|
| return gpu_dst.download() |
| except cv2.error: |
| pass |
|
|
| return cv2.resize(src, dsize, fx=fx, fy=fy, interpolation=interpolation) |
|
|
|
|
| |
| |
| |
|
|
| def gpu_cvt_color( |
| src: np.ndarray, |
| code: int, |
| ) -> np.ndarray: |
| """Drop-in replacement for ``cv2.cvtColor`` with CUDA acceleration. |
| |
| Parameters match ``cv2.cvtColor(src, code)``. |
| """ |
| if CUDA_AVAILABLE: |
| try: |
| src_u8 = _ensure_uint8(src) |
| gpu_src = cv2.cuda.GpuMat() |
| gpu_src.upload(src_u8) |
| gpu_dst = cv2.cuda.cvtColor(gpu_src, code) |
| return gpu_dst.download() |
| except cv2.error: |
| pass |
|
|
| return cv2.cvtColor(src, code) |
|
|
|
|
| |
| |
| |
|
|
| def gpu_flip( |
| src: np.ndarray, |
| flip_code: int, |
| ) -> np.ndarray: |
| """Drop-in replacement for ``cv2.flip`` with CUDA acceleration. |
| |
| Parameters match ``cv2.flip(src, flipCode)``. |
| *flip_code*: 0 = vertical, 1 = horizontal, -1 = both. |
| """ |
| if CUDA_AVAILABLE: |
| try: |
| src_u8 = _ensure_uint8(src) |
| gpu_src = cv2.cuda.GpuMat() |
| gpu_src.upload(src_u8) |
| gpu_dst = cv2.cuda.flip(gpu_src, flip_code) |
| return gpu_dst.download() |
| except cv2.error: |
| pass |
|
|
| return cv2.flip(src, flip_code) |
|
|
|
|
| |
| |
| |
|
|
| def is_gpu_accelerated() -> bool: |
| """Return ``True`` when the CUDA path will be used.""" |
| return CUDA_AVAILABLE |
|
|
| |
|
|