Spaces:
Running
Running
| import os | |
| from pathlib import Path | |
| from PIL import Image, ImageOps | |
| import cv2 | |
| import numpy as np | |
| import tensorflow as tf | |
| from tensorflow.keras.applications.resnet50 import preprocess_input | |
| import torch | |
| import clip | |
| BASE_DIR = "MAS-AI-0000/Authentica" | |
| MODELS_DIR = os.path.join(BASE_DIR, "Lib/Models/Image") | |
| # Load models and preprocessing once at module level | |
| clip_mod, clip_pre = clip.load("ViT-B/32", jit=False) | |
| clip_mod.eval() | |
| for p in clip_mod.parameters(): | |
| p.requires_grad = False | |
| mlp_model= tf.keras.models.load_model(os.path.join(MODELS_DIR, "clip_model.keras")) | |
| cnn_model = tf.keras.models.load_model(os.path.join(MODELS_DIR, "cnn_model.keras")) | |
| resnet_model = tf.keras.models.load_model(os.path.join(MODELS_DIR, "resnet_model.keras")) | |
| def center_crop(image: Image.Image, crop_size=512) -> Image.Image | str: | |
| try: | |
| image = ImageOps.exif_transpose(image) | |
| w, h = image.size | |
| if w < crop_size or h < crop_size: | |
| # skip small images | |
| return f"skipped image (too small) ({w}x{h})" | |
| left = (w - crop_size) // 2 | |
| top = (h - crop_size) // 2 | |
| right = left + crop_size | |
| bottom = top + crop_size | |
| cropped = image.crop((left, top, right, bottom)) | |
| return cropped | |
| except Exception as e: | |
| return f"Error when cropping center: {e}" | |
| def denoise(src_image: Image) -> np.ndarray | str: | |
| """Read image, denoise (GPU if available) and return denoised image.""" | |
| img = np.array(src_image) # BGR uint8 numpy array | |
| img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) | |
| if src_image is None: | |
| print(f"WARNING: No source image, skipping.") | |
| return False | |
| # Denoising parameters | |
| H = 10 # filter strength for luminance component (recommended 3-15) | |
| H_COLOR = 10 # same for color components | |
| TEMPLATE_WINDOW_SIZE = 7 | |
| SEARCH_WINDOW_SIZE = 21 | |
| # Use CUDA if available, otherwise CPU fallback | |
| use_cuda = False | |
| try: | |
| use_cuda = hasattr(cv2, 'cuda') and cv2.cuda.getCudaEnabledDeviceCount() > 0 | |
| except Exception: | |
| use_cuda = False | |
| if use_cuda: | |
| # Create a GpuMat and upload the numpy image to GPU | |
| gpu_img = cv2.cuda_GpuMat() | |
| gpu_img.upload(img) # <-- this converts numpy -> GpuMat on device | |
| den_gpu = cv2.cuda.fastNlMeansDenoisingColored( | |
| gpu_img,H,H_COLOR,None,SEARCH_WINDOW_SIZE,TEMPLATE_WINDOW_SIZE | |
| ) | |
| # Download result back to CPU | |
| den = den_gpu.download() | |
| else: | |
| # Fallback to CPU implementation | |
| print("NOTICE: CUDA not available — using CPU denoiser.") | |
| den = cv2.fastNlMeansDenoisingColored( | |
| img, None, | |
| H, H_COLOR, | |
| TEMPLATE_WINDOW_SIZE, | |
| SEARCH_WINDOW_SIZE | |
| ) | |
| #cv2.imwrite("denoised.png", den) # for debugging | |
| den = cv2.cvtColor(den, cv2.COLOR_BGR2RGB) | |
| den = Image.fromarray(den) | |
| return den | |
| def compute_profile(raw_image: Image, den_image: Image, normalize=False ,verbose= True) -> np.ndarray | str: | |
| # read images | |
| if raw_image is None: | |
| return print(f"WARNING: couldn't read raw image") | |
| if den_image is None: | |
| return print(f"WARNING: couldn't read denoised image") | |
| raw = np.array(raw_image) # RGB uint8 numpy array | |
| raw = cv2.cvtColor(raw, cv2.COLOR_RGB2BGR) | |
| den = np.array(den_image) # RGB uint8 numpy array | |
| den = cv2.cvtColor(den, cv2.COLOR_RGB2BGR) | |
| # if shapes differ, resize den to raw's size (keeps alignment); warn | |
| if den.shape != raw.shape: | |
| if verbose: | |
| print(f"NOTE: shape mismatch, resizing denoised from {den.shape[:2]} to {raw.shape[:2]}") | |
| den = cv2.resize(den, (raw.shape[1], raw.shape[0]), interpolation=cv2.INTER_LINEAR) | |
| # absolute difference per-channel | |
| diff = cv2.absdiff(raw, den) # BGR, uint8 | |
| gray = cv2.cvtColor(diff, cv2.COLOR_BGR2GRAY) # single-channel uint8 | |
| # optionally normalize to full 0-255 (per-image) | |
| if normalize: | |
| # cv2.normalize will map min->0 and max->255 | |
| # but if the image is flat (min==max) normalize will set to 0; handle that | |
| minv = int(gray.min()) | |
| maxv = int(gray.max()) | |
| if maxv > minv: | |
| norm = cv2.normalize(gray, None, alpha=0, beta=255, norm_type=cv2.NORM_MINMAX) | |
| out = norm | |
| else: | |
| # nothing to normalize (flat), keep as-is (all zeros) | |
| out = gray | |
| else: | |
| # keep raw diff values but ensure dtype uint8 (already uint8) and values are 0..255 | |
| out = gray | |
| #cv2.imwrite("profile.png", out) # for debugging | |
| return out | |
| def profile_image_for_cnn_predict(pil_img: Image, crop_size=512): | |
| """Preprocess the input image and return a numpy array ready for model prediction.""" | |
| # Step 1: Center crop the image | |
| cropped_img = center_crop(pil_img, crop_size=crop_size) | |
| if isinstance(cropped_img, str): | |
| return cropped_img # return error message if cropping failed | |
| # Step 2: Denoise the cropped image | |
| denoised_img = denoise(cropped_img) | |
| if isinstance(denoised_img, str): | |
| return denoised_img # return error message if denoising failed | |
| # Step 3: Compute the profile image | |
| profile_img = compute_profile(cropped_img, denoised_img, normalize=False) | |
| if isinstance(profile_img, str): | |
| return profile_img # return error message if profile computation failed | |
| return profile_img | |
| def prepare_cv2_image_for_resnet(cv2_gray_img, target_size=(512,512)): | |
| img_rgb = cv2.cvtColor(cv2_gray_img, cv2.COLOR_GRAY2RGB) | |
| img_rgb = cv2.resize(img_rgb, (target_size[1], target_size[0]), interpolation=cv2.INTER_AREA) | |
| img_rgb = img_rgb.astype('float32') | |
| # 5) add batch dim | |
| x = np.expand_dims(img_rgb, axis=0) # shape (1, H, W, 3) | |
| x = preprocess_input(x) | |
| return x | |
| def predict_image_prob_clip(image: Image.Image, threshold=0.5, | |
| clip_model=None, clip_preprocess=None, | |
| keras_mlp=None): | |
| """ | |
| Predicts probability that image is AI-generated (AI=1) using CLIP + Keras MLP. | |
| Args: | |
| path_or_image: str (file path) or PIL.Image.Image or numpy array (H,W,3) | |
| threshold: float threshold for binary label | |
| clip_model, clip_preprocess: optionally pass existing CLIP objects | |
| keras_mlp: optionally pass existing loaded Keras model | |
| Returns: | |
| dict: {'prob': float_prob_AI, 'label': 'AI' or 'Real'} | |
| """ | |
| # --- try to reuse provided CLIP objects, otherwise load --- | |
| if clip_model is None or clip_preprocess is None: | |
| print("Loading Default CLIP model...") | |
| # pick a model name: prefer provided arg, else try global, else ViT-B/32 | |
| cmn = "ViT-B/32" | |
| clip_model, clip_preprocess = clip.load(cmn, device="cpu", jit=False) | |
| clip_model.eval() | |
| for p in clip_model.parameters(): | |
| p.requires_grad = False | |
| # --- try to reuse provided keras model, otherwise load from disk --- | |
| if keras_mlp is None: | |
| print("No keras model provided...") | |
| return None | |
| # --- load/normalize image --- | |
| # assume PIL image | |
| img = image.convert('RGB') | |
| # --- preprocess for CLIP and get embedding --- | |
| input_tensor = clip_preprocess(img).unsqueeze(0).to("cpu") # shape (1,C,H,W) | |
| with torch.no_grad(): | |
| emb = clip_model.encode_image(input_tensor) # (1, D) | |
| emb = emb / emb.norm(dim=-1, keepdim=True) # L2 normalize | |
| emb_np = emb.cpu().numpy().astype('float32') # shape (1, D) | |
| # --- predict with Keras MLP --- | |
| probs = keras_mlp.predict(emb_np, verbose=0).reshape(-1,) | |
| prob = float(probs[0]) | |
| return prob | |
| def clip_predict(pil_img: Image, crop_size=512): | |
| # pass model objects explicitly (faster if you call this repeatedly) | |
| pil_img = center_crop(pil_img, crop_size=crop_size) | |
| if isinstance(pil_img, str): | |
| return pil_img # return error message | |
| return predict_image_prob_clip(pil_img, | |
| clip_model=clip_mod, | |
| clip_preprocess=clip_pre, | |
| keras_mlp=mlp_model) | |
| def CNNPredict(predict_img: np.ndarray): | |
| #1 Real 0 AI | |
| #normalize image | |
| # expand dims to add channel axis | |
| predict_img = predict_img.astype('float32') / 255.0 # shape (H, W) | |
| predict_img = np.expand_dims(predict_img, axis=-1) # shape (H, W, 1) | |
| # expand dims to add batch axis | |
| predict_img = np.expand_dims(predict_img, axis=0) # shape (1, H, W, 1) | |
| prediction = cnn_model.predict(predict_img) | |
| return prediction[0][0] | |
| def ResnetPredict(predict_img): | |
| #1 Real 0 AI | |
| predict_img = prepare_cv2_image_for_resnet(predict_img) | |
| prediction = resnet_model.predict(predict_img) | |
| return prediction[0][0] | |