|
|
"""
|
|
|
ImagePreprocessor.py - FULLY OPTIMIZED
|
|
|
Minimal overhead, fast operations, no unnecessary prints.
|
|
|
"""
|
|
|
|
|
|
import cv2
|
|
|
import numpy as np
|
|
|
from PIL import Image
|
|
|
from typing import Tuple, Optional
|
|
|
from sklearn.cluster import MiniBatchKMeans
|
|
|
import warnings
|
|
|
|
|
|
|
|
|
class ImagePreprocessor:
|
|
|
"""OPTIMIZED image preprocessing for mosaic generation."""
|
|
|
|
|
|
def __init__(self, target_resolution: Tuple[int, int] = (800, 600),
|
|
|
grid_size: Tuple[int, int] = (20, 15),
|
|
|
verbose: bool = False):
|
|
|
"""Initialize preprocessor - MINIMAL overhead."""
|
|
|
self.target_resolution = target_resolution
|
|
|
self.grid_size = grid_size
|
|
|
self.verbose = verbose
|
|
|
|
|
|
|
|
|
self.tile_width = target_resolution[0] // grid_size[0]
|
|
|
self.tile_height = target_resolution[1] // grid_size[1]
|
|
|
self.adjusted_width = self.tile_width * grid_size[0]
|
|
|
self.adjusted_height = self.tile_height * grid_size[1]
|
|
|
|
|
|
def load_and_preprocess_image(self, image_path: str,
|
|
|
apply_quantization: bool = False,
|
|
|
n_colors: int = 16) -> Optional[np.ndarray]:
|
|
|
"""Load and preprocess - OPTIMIZED for speed."""
|
|
|
try:
|
|
|
|
|
|
image = cv2.imread(image_path)
|
|
|
if image is None:
|
|
|
raise ValueError(f"Could not load image: {image_path}")
|
|
|
|
|
|
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
|
|
|
processed_image = self._resize_and_crop(image)
|
|
|
|
|
|
|
|
|
if apply_quantization and n_colors < 256:
|
|
|
processed_image = self._apply_color_quantization(processed_image, n_colors)
|
|
|
|
|
|
return processed_image
|
|
|
|
|
|
except Exception as e:
|
|
|
if self.verbose:
|
|
|
print(f"Error processing {image_path}: {str(e)}")
|
|
|
return None
|
|
|
|
|
|
def preprocess_numpy_image(self, image: np.ndarray,
|
|
|
apply_quantization: bool = False,
|
|
|
n_colors: int = 16) -> Optional[np.ndarray]:
|
|
|
"""Preprocess numpy image (for Gradio)."""
|
|
|
try:
|
|
|
if len(image.shape) != 3 or image.shape[2] != 3:
|
|
|
raise ValueError("Image must be RGB (H, W, 3)")
|
|
|
|
|
|
processed_image = self._resize_and_crop(image)
|
|
|
|
|
|
if apply_quantization and n_colors < 256:
|
|
|
processed_image = self._apply_color_quantization(processed_image, n_colors)
|
|
|
|
|
|
return processed_image
|
|
|
|
|
|
except Exception as e:
|
|
|
if self.verbose:
|
|
|
print(f"Error processing image: {str(e)}")
|
|
|
return None
|
|
|
|
|
|
def _resize_and_crop(self, image: np.ndarray) -> np.ndarray:
|
|
|
"""Resize and crop - OPTIMIZED."""
|
|
|
h, w = image.shape[:2]
|
|
|
target_w, target_h = self.adjusted_width, self.adjusted_height
|
|
|
|
|
|
|
|
|
scale = max(target_w / w, target_h / h)
|
|
|
new_w = int(w * scale)
|
|
|
new_h = int(h * scale)
|
|
|
|
|
|
|
|
|
resized = cv2.resize(image, (new_w, new_h), interpolation=cv2.INTER_LINEAR)
|
|
|
|
|
|
|
|
|
start_x = (new_w - target_w) // 2
|
|
|
start_y = (new_h - target_h) // 2
|
|
|
cropped = resized[start_y:start_y + target_h, start_x:start_x + target_w]
|
|
|
|
|
|
return cropped
|
|
|
|
|
|
def _apply_color_quantization(self, image: np.ndarray, n_colors: int) -> np.ndarray:
|
|
|
"""OPTIMIZED color quantization with aggressive sampling."""
|
|
|
h, w, c = image.shape
|
|
|
pixels = image.reshape(-1, c)
|
|
|
total_pixels = len(pixels)
|
|
|
|
|
|
|
|
|
max_sample_size = 10000
|
|
|
|
|
|
if total_pixels > max_sample_size:
|
|
|
|
|
|
sample_indices = np.random.randint(0, total_pixels, size=max_sample_size)
|
|
|
sampled_pixels = pixels[sample_indices].astype(np.float32)
|
|
|
|
|
|
batch_size = min(max(len(sampled_pixels) // 10, 500), 2000)
|
|
|
|
|
|
with warnings.catch_warnings():
|
|
|
warnings.filterwarnings("ignore")
|
|
|
|
|
|
kmeans = MiniBatchKMeans(
|
|
|
n_clusters=n_colors, batch_size=batch_size,
|
|
|
random_state=42, n_init=1, max_iter=50
|
|
|
)
|
|
|
|
|
|
|
|
|
kmeans.fit(sampled_pixels)
|
|
|
labels = kmeans.predict(pixels.astype(np.float32))
|
|
|
else:
|
|
|
|
|
|
batch_size = min(max(total_pixels // 100, 500), 5000)
|
|
|
|
|
|
with warnings.catch_warnings():
|
|
|
warnings.filterwarnings("ignore")
|
|
|
|
|
|
kmeans = MiniBatchKMeans(
|
|
|
n_clusters=n_colors, batch_size=batch_size,
|
|
|
random_state=42, n_init=1, max_iter=50
|
|
|
)
|
|
|
labels = kmeans.fit_predict(pixels.astype(np.float32))
|
|
|
|
|
|
|
|
|
quantized_pixels = kmeans.cluster_centers_[labels]
|
|
|
quantized_image = quantized_pixels.reshape(h, w, c).astype(np.uint8)
|
|
|
|
|
|
return quantized_image
|
|
|
|
|
|
def save_preprocessed_image(self, image: np.ndarray, output_path: str):
|
|
|
"""Save preprocessed image."""
|
|
|
try:
|
|
|
pil_image = Image.fromarray(image)
|
|
|
pil_image.save(output_path, quality=95)
|
|
|
except Exception as e:
|
|
|
if self.verbose:
|
|
|
print(f"Error saving image: {str(e)}") |