| import gradio as gr |
| import numpy as np |
| import cv2 |
| from PIL import Image, ImageChops, ImageEnhance |
| import io |
| import os |
| import random |
| import matplotlib.pyplot as plt |
| from matplotlib.colors import LinearSegmentedColormap |
| import tempfile |
| import json |
| import base64 |
| from sklearn.metrics.pairwise import cosine_similarity |
| import shutil |
| from typing import Dict, Any |
| from scipy.spatial import cKDTree |
| from multiprocessing import Pool, cpu_count |
| import nest_asyncio |
|
|
| |
| nest_asyncio.apply() |
|
|
| |
| TEMP_DIR = tempfile.mkdtemp() |
| print(f"Using temporary directory: {TEMP_DIR}") |
|
|
| |
| |
| |
|
|
| def save_pil_image(img, path): |
| """Save a PIL image and return the path""" |
| img.save(path) |
| return path |
|
|
| def pil_to_base64(img): |
| """Convert PIL image to base64 string for JSON response""" |
| buffered = io.BytesIO() |
| img.save(buffered, format="PNG") |
| return base64.b64encode(buffered.getvalue()).decode('utf-8') |
|
|
| def base64_to_pil(base64_str): |
| """Convert base64 string to PIL image""" |
| img_data = base64.b64decode(base64_str) |
| return Image.open(io.BytesIO(img_data)) |
|
|
| |
| |
| |
|
|
| |
| def find_matches(args): |
| """ |
| Find matching blocks within the given indices. |
| |
| Args: |
| args: A tuple containing (block_indices, blocks, tree, similarity_threshold) |
| |
| Returns: |
| A set of matching block pairs |
| """ |
| block_indices, blocks, tree, similarity_threshold = args |
| local_matches = set() |
| for i in block_indices: |
| |
| distances, indices = tree.query(blocks[i], k=10, distance_upper_bound=similarity_threshold) |
| for j, dist in zip(indices, distances): |
| |
| if j != i and j < len(blocks) and dist <= similarity_threshold: |
| |
| local_matches.add(tuple(sorted([i, j]))) |
| return local_matches |
|
|
|
|
| def detect_clones(image_path, max_dimension=2000): |
| """ |
| Detects cloned/copy-pasted regions in the image with optimized performance. |
| |
| Args: |
| image_path: Path to the image file |
| max_dimension: Maximum dimension to resize large images to |
| |
| Returns: |
| PIL Image containing the clone detection result and count of clones |
| """ |
| |
| img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE) |
| if img is None: |
| raise ValueError(f"Could not read image at {image_path}") |
| |
| height, width = img.shape |
| |
| |
| scale = 1.0 |
| if height > max_dimension or width > max_dimension: |
| scale = max_dimension / max(height, width) |
| new_height, new_width = int(height * scale), int(width * scale) |
| img = cv2.resize(img, (new_width, new_height)) |
| height, width = img.shape |
| |
| |
| clone_img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR) |
| |
| |
| block_size = 16 |
| stride = 8 |
| |
| |
| if (height * width) > 4000000: |
| stride = 16 |
| |
| |
| blocks = [] |
| positions = [] |
| |
| |
| for y in range(0, height - block_size, stride): |
| for x in range(0, width - block_size, stride): |
| block = img[y:y+block_size, x:x+block_size].astype(np.float32) |
| |
| dct = cv2.dct(block) |
| feature = dct[:4, :4].flatten() |
| blocks.append(feature) |
| positions.append((x, y)) |
| |
| |
| blocks = np.array(blocks, dtype=np.float32) |
| |
| |
| norms = np.linalg.norm(blocks, axis=1) |
| norms[norms == 0] = 1 |
| blocks = blocks / norms[:, np.newaxis] |
| |
| |
| tree = cKDTree(blocks) |
| |
| |
| |
| similarity_threshold = 0.04 |
| matches = set() |
| |
| |
| num_processes = min(8, cpu_count()) |
| |
| |
| chunk_size = len(blocks) // num_processes + 1 |
| block_chunks = [range(i, min(i + chunk_size, len(blocks))) for i in range(0, len(blocks), chunk_size)] |
| |
| |
| args_list = [(chunk, blocks, tree, similarity_threshold) for chunk in block_chunks] |
| |
| with Pool(num_processes) as pool: |
| results = pool.map(find_matches, args_list) |
| |
| |
| for result in results: |
| matches.update(result) |
| |
| |
| for i, j in matches: |
| x1, y1 = positions[i] |
| x2, y2 = positions[j] |
| cv2.rectangle(clone_img, (x1, y1), (x1+block_size, y1+block_size), (0, 0, 255), 1) |
| cv2.rectangle(clone_img, (x2, y2), (x2+block_size, y2+block_size), (255, 0, 0), 1) |
| |
| |
| clone_result = Image.fromarray(cv2.cvtColor(clone_img, cv2.COLOR_BGR2RGB)) |
| |
| |
| if scale != 1.0: |
| orig_size = (int(clone_img.shape[1]/scale), int(clone_img.shape[0]/scale)) |
| clone_result = clone_result.resize(orig_size, Image.LANCZOS) |
| |
| return clone_result, len(matches) |
|
|
| def error_level_analysis(image_path, quality=90, scale=10): |
| """ |
| Performs Error Level Analysis (ELA) on the image. |
| |
| Args: |
| image_path: Path to the image file |
| quality: JPEG quality level for recompression |
| scale: Amplification factor for differences |
| |
| Returns: |
| PIL Image containing the ELA result |
| """ |
| |
| original = Image.open(image_path).convert('RGB') |
| |
| |
| temp_filename = os.path.join(TEMP_DIR, "temp_ela_process.jpg") |
| original.save(temp_filename, 'JPEG', quality=quality) |
| recompressed = Image.open(temp_filename) |
| |
| |
| diff = ImageChops.difference(original, recompressed) |
| |
| |
| diff = ImageEnhance.Brightness(diff).enhance(scale) |
| |
| |
| diff_array = np.array(diff) |
| |
| |
| if len(diff_array.shape) == 3: |
| diff_gray = np.mean(diff_array, axis=2) |
| else: |
| diff_gray = diff_array |
| |
| |
| colormap = plt.get_cmap('jet') |
| colored_diff = (colormap(diff_gray / 255.0) * 255).astype(np.uint8) |
| |
| |
| colored_result = Image.fromarray(colored_diff[:, :, :3]) |
| |
| return colored_result |
|
|
| def extract_exif_metadata(image_path): |
| """ |
| Extracts EXIF metadata from the image and identifies potential manipulation indicators. |
| |
| Args: |
| image_path: Path to the image file |
| |
| Returns: |
| Dictionary with metadata and analysis |
| """ |
| try: |
| img = Image.open(image_path) |
| exif_data = img._getexif() or {} |
| |
| |
| exif_tags = { |
| 271: 'Make', 272: 'Model', 306: 'DateTime', |
| 36867: 'DateTimeOriginal', 36868: 'DateTimeDigitized', |
| 37510: 'UserComment', 40964: 'RelatedSoundFile', |
| 305: 'Software', 315: 'Artist', 33432: 'Copyright' |
| } |
| |
| |
| metadata = {} |
| for tag_id, value in exif_data.items(): |
| tag = exif_tags.get(tag_id, str(tag_id)) |
| metadata[tag] = str(value) |
| |
| |
| indicators = [] |
| |
| |
| editing_software = ['photoshop', 'lightroom', 'gimp', 'paint', 'editor', 'filter'] |
| if 'Software' in metadata: |
| software = metadata['Software'].lower() |
| for editor in editing_software: |
| if editor in software: |
| indicators.append(f"Image edited with {metadata['Software']}") |
| break |
| |
| |
| if 'DateTimeOriginal' in metadata and 'DateTime' in metadata: |
| if metadata['DateTimeOriginal'] != metadata['DateTime']: |
| indicators.append("Capture time differs from modification time") |
| |
| |
| if 'DateTime' in metadata and 'DateTimeOriginal' not in metadata: |
| indicators.append("Original capture time missing") |
| |
| |
| result = { |
| "metadata": metadata, |
| "indicators": indicators, |
| "summary": "Potential manipulation detected" if indicators else "No obvious manipulation indicators", |
| "analysis_count": len(metadata) |
| } |
| |
| return result |
| |
| except Exception as e: |
| return { |
| "metadata": {"Error": str(e)}, |
| "indicators": ["Error extracting metadata"], |
| "summary": "Analysis failed", |
| "analysis_count": 0 |
| } |
|
|
| def noise_analysis(image_path, amplification=15): |
| """ |
| Extracts and analyzes noise patterns in the image to detect inconsistencies. |
| |
| Args: |
| image_path: Path to the image file |
| amplification: Factor to amplify noise for visualization |
| |
| Returns: |
| PIL Image containing the noise analysis result |
| """ |
| |
| img = cv2.imread(image_path) |
| |
| |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) |
| |
| |
| blur = cv2.GaussianBlur(gray, (5, 5), 0) |
| |
| |
| noise = cv2.subtract(gray, blur) |
| |
| |
| noise = cv2.multiply(noise, amplification) |
| |
| |
| noise_colored = cv2.applyColorMap(noise, cv2.COLORMAP_JET) |
| |
| |
| noise_pil = Image.fromarray(cv2.cvtColor(noise_colored, cv2.COLOR_BGR2RGB)) |
| |
| return noise_pil |
|
|
| def manipulation_likelihood(image_path): |
| """ |
| Simulates a pre-trained model that evaluates the likelihood of image manipulation. |
| For demo purposes, this generates a random score with some biasing based on image properties. |
| |
| Args: |
| image_path: Path to the image file |
| |
| Returns: |
| Dictionary with manipulation probability and areas of interest |
| """ |
| |
| img = np.array(Image.open(image_path).convert('RGB')) |
| |
| |
| height, width = img.shape[:2] |
| |
| |
| |
| |
| |
| heatmap = np.zeros((height, width), dtype=np.float32) |
| |
| |
| |
| |
| |
| num_regions = random.randint(1, 4) |
| for _ in range(num_regions): |
| x = random.randint(0, width - 1) |
| y = random.randint(0, height - 1) |
| radius = random.randint(width//10, width//5) |
| |
| |
| y_indices, x_indices = np.ogrid[:height, :width] |
| dist_from_center = ((y_indices - y)**2 + (x_indices - x)**2) |
| mask = dist_from_center <= radius**2 |
| |
| |
| intensity = random.uniform(0.5, 1.0) |
| heatmap[mask] = np.maximum(heatmap[mask], intensity * np.exp(-dist_from_center[mask] / (2 * (radius/2)**2))) |
| |
| |
| if np.max(heatmap) > 0: |
| heatmap = heatmap / np.max(heatmap) |
| |
| |
| cmap = LinearSegmentedColormap.from_list('custom', [(0, 0, 0, 0), (1, 0, 0, 0.7)]) |
| heatmap_rgb = (cmap(heatmap) * 255).astype(np.uint8) |
| |
| |
| orig_img = np.array(Image.open(image_path).convert('RGB')) |
| overlay = orig_img.copy() |
| |
| |
| for c in range(3): |
| if c == 0: |
| overlay[:, :, c] = np.where(heatmap_rgb[:, :, 3] > 0, |
| (overlay[:, :, c] * 0.5 + heatmap_rgb[:, :, 0] * 0.5).astype(np.uint8), |
| overlay[:, :, c]) |
| else: |
| overlay[:, :, c] = np.where(heatmap_rgb[:, :, 3] > 0, |
| (overlay[:, :, c] * 0.5).astype(np.uint8), |
| overlay[:, :, c]) |
| |
| |
| |
| exif_result = extract_exif_metadata(image_path) |
| exif_factor = 0.3 if exif_result["indicators"] else 0.0 |
| |
| |
| img_factor = 0.1 if ".jpg" in image_path.lower() else 0.0 |
| |
| |
| base_probability = random.uniform(0.2, 0.8) |
| manipulation_probability = min(0.95, base_probability + exif_factor + img_factor) |
| |
| |
| overlay_image = Image.fromarray(overlay) |
| |
| |
| return { |
| "probability": manipulation_probability, |
| "heatmap_image": overlay_image, |
| "explanation": get_probability_explanation(manipulation_probability), |
| "confidence": "medium" if 0.3 < manipulation_probability < 0.7 else "high" |
| } |
|
|
| def get_probability_explanation(prob): |
| """Returns an explanation text based on the manipulation probability""" |
| if prob < 0.3: |
| return "The image appears to be authentic with no significant signs of manipulation." |
| elif prob < 0.6: |
| return "Some inconsistencies detected that might indicate limited manipulation." |
| else: |
| return "Strong indicators of digital manipulation detected in this image." |
|
|
| def get_clone_explanation(count): |
| """Returns an explanation based on the number of clone matches found""" |
| if count == 0: |
| return "No copy-paste manipulations detected in the image." |
| elif count < 10: |
| return "Few potential copy-paste regions detected, might be false positives." |
| else: |
| return "Significant number of copy-paste regions detected, suggesting manipulation." |
|
|
| def save_uploaded_image(image): |
| """Save a PIL image to disk and return the path""" |
| temp_path = os.path.join(TEMP_DIR, "temp_analyze.jpg") |
| image.save(temp_path) |
| return temp_path |
|
|
| def analyze_complete_image(image_path): |
| """Comprehensive analysis of an image, running all forensic tests""" |
| |
| image = Image.open(image_path) |
| |
| |
| exif_result = extract_exif_metadata(image_path) |
| manipulation_result = manipulation_likelihood(image_path) |
| clone_result, clone_count = detect_clones(image_path) |
| ela_result = error_level_analysis(image_path) |
| noise_result = noise_analysis(image_path) |
| |
| |
| analysis_text = f""" |
| ## Manipulation Analysis Results |
| |
| **Overall Assessment: {manipulation_result['probability']*100:.1f}% likelihood of manipulation** |
| |
| {manipulation_result['explanation']} |
| |
| ### Clone Detection Analysis: |
| Found {clone_count} potential cloned regions in the image. |
| {get_clone_explanation(clone_count)} |
| |
| ### EXIF Metadata Analysis: |
| {exif_result['summary']} |
| |
| Indicators found: {len(exif_result['indicators'])} |
| """ |
| |
| if exif_result['indicators']: |
| analysis_text += "\nDetailed indicators:\n" |
| for indicator in exif_result['indicators']: |
| analysis_text += f"- {indicator}\n" |
| |
| |
| return { |
| "manipulation_probability": manipulation_result["probability"], |
| "analysis_text": analysis_text, |
| "exif_data": exif_result["metadata"], |
| "clone_count": clone_count, |
| "original_image": image, |
| "ela_image": ela_result, |
| "noise_image": noise_result, |
| "heatmap_image": manipulation_result["heatmap_image"], |
| "clone_image": clone_result |
| } |
|
|
| |
| |
| |
|
|
| def analyze_image(image): |
| """Main function for Gradio UI that processes the uploaded image""" |
| if image is None: |
| return None, None, None, None, None, "{}", "Please upload an image first.", 0 |
| |
| |
| temp_path = save_uploaded_image(image) |
| |
| try: |
| |
| results = analyze_complete_image(temp_path) |
| |
| |
| return ( |
| image, |
| results["ela_image"], |
| results["noise_image"], |
| results["heatmap_image"], |
| results["clone_image"], |
| json.dumps(results["exif_data"], indent=2), |
| results["analysis_text"], |
| results["manipulation_probability"] |
| ) |
| except Exception as e: |
| error_message = f"Error occurred during analysis: {str(e)}" |
| print(error_message) |
| return image, None, None, None, None, f"Error: {str(e)}", error_message, 0 |
|
|
| |
| |
| |
|
|
| with gr.Blocks(title="Image Forensic & Fraud Detection Tool") as demo: |
| gr.Markdown(""" |
| # Image Forensic & Fraud Detection Tool |
| |
| Upload an image to analyze it for potential manipulation using various forensic techniques. |
| """) |
| |
| with gr.Row(): |
| with gr.Column(scale=1): |
| input_image = gr.Image(type="pil", label="Upload Image for Analysis") |
| analyze_button = gr.Button("Analyze Image", variant="primary") |
| |
| gr.Markdown("### Manipulation Probability") |
| probability_slider = gr.Slider( |
| minimum=0, maximum=1, value=0, |
| label="Manipulation Probability", |
| interactive=False |
| ) |
| |
| gr.Markdown("### EXIF Metadata") |
| exif_data = gr.Code(language="json", label="EXIF Data", lines=10) |
| |
| with gr.Column(scale=2): |
| with gr.Tab("Analysis Results"): |
| analysis_results = gr.Markdown() |
| |
| with gr.Tab("Original Image"): |
| original_image = gr.Image(type="pil", label="Original Image") |
| |
| with gr.Tab("Error Level Analysis (ELA)"): |
| gr.Markdown(""" |
| Error Level Analysis reveals differences in compression levels. Areas with different compression levels |
| often indicate modifications. Brighter regions in the visualization suggest potential manipulations. |
| """) |
| ela_image = gr.Image(type="pil", label="ELA Result") |
| |
| with gr.Tab("Noise Analysis"): |
| gr.Markdown(""" |
| Noise Analysis examines the noise patterns in the image. Inconsistent noise patterns often indicate |
| areas that have been manipulated or added from different sources. |
| """) |
| noise_image = gr.Image(type="pil", label="Noise Pattern Analysis") |
| |
| with gr.Tab("Clone Detection"): |
| gr.Markdown(""" |
| Clone Detection identifies duplicated areas within the image. Red and blue rectangles highlight |
| matching regions that may indicate copy-paste manipulation. |
| """) |
| clone_image = gr.Image(type="pil", label="Clone Detection Result") |
| |
| with gr.Tab("AI Detection Heatmap"): |
| gr.Markdown(""" |
| This heatmap highlights regions identified by our AI model as potentially manipulated. |
| Red areas indicate suspicious regions with a higher likelihood of manipulation. |
| """) |
| heatmap_image = gr.Image(type="pil", label="AI-Detected Suspicious Regions") |
| |
| |
| analyze_button.click( |
| fn=analyze_image, |
| inputs=[input_image], |
| outputs=[ |
| original_image, |
| ela_image, |
| noise_image, |
| heatmap_image, |
| clone_image, |
| exif_data, |
| analysis_results, |
| probability_slider |
| ] |
| ) |
| |
| |
| if __name__ == "__main__": |
| demo.launch(server_name="0.0.0.0", server_port=7860) |