| |
| """ |
| Image preprocessing utilities for coin dataset. |
| Handles deduplication, blur detection, normalization, and quality filtering. |
| """ |
|
|
| import os |
| import json |
| import cv2 |
| import numpy as np |
| import imagehash |
| from PIL import Image |
| from pathlib import Path |
| from typing import Dict, List, Tuple, Set |
| from collections import defaultdict |
| import logging |
|
|
|
|
| class CoinImagePreprocessor: |
| """Preprocessing utilities for coin images.""" |
|
|
| def __init__(self, config_path: str = "config.json"): |
| """Initialize preprocessor with configuration.""" |
| with open(config_path, 'r') as f: |
| self.config = json.load(f) |
|
|
| self.preprocessing_config = self.config['preprocessing'] |
| self.scraping_config = self.config['scraping'] |
|
|
| self.logger = logging.getLogger(__name__) |
| logging.basicConfig(level=logging.INFO) |
|
|
| def calculate_image_hash(self, image_path: str) -> str: |
| """Calculate perceptual hash of image.""" |
| try: |
| img = Image.open(image_path) |
| |
| hash_value = imagehash.average_hash(img) |
| return str(hash_value) |
| except Exception as e: |
| self.logger.error(f"Error hashing {image_path}: {e}") |
| return "" |
|
|
| def detect_blur(self, image_path: str) -> Tuple[bool, float]: |
| """Detect if image is blurry using Laplacian variance.""" |
| try: |
| img = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE) |
| if img is None: |
| return True, 0.0 |
|
|
| |
| laplacian = cv2.Laplacian(img, cv2.CV_64F) |
| variance = laplacian.var() |
|
|
| threshold = self.preprocessing_config['blur_threshold'] |
| is_blurry = variance < threshold |
|
|
| return is_blurry, variance |
|
|
| except Exception as e: |
| self.logger.error(f"Error detecting blur in {image_path}: {e}") |
| return True, 0.0 |
|
|
| def get_image_dimensions(self, image_path: str) -> Tuple[int, int]: |
| """Get image dimensions.""" |
| try: |
| img = Image.open(image_path) |
| return img.size |
| except Exception as e: |
| self.logger.error(f"Error getting dimensions of {image_path}: {e}") |
| return (0, 0) |
|
|
| def is_valid_size(self, image_path: str) -> bool: |
| """Check if image meets size requirements.""" |
| width, height = self.get_image_dimensions(image_path) |
| min_size = self.scraping_config['min_image_size'] |
|
|
| return min(width, height) >= min_size |
|
|
| def normalize_image(self, image_path: str, output_path: str = None) -> str: |
| """Normalize image to target size while maintaining aspect ratio.""" |
| try: |
| img = Image.open(image_path) |
| target_size = self.preprocessing_config['normalize_size'] |
|
|
| |
| width, height = img.size |
| if width > height: |
| new_width = target_size |
| new_height = int(height * (target_size / width)) |
| else: |
| new_height = target_size |
| new_width = int(width * (target_size / height)) |
|
|
| |
| img_resized = img.resize((new_width, new_height), Image.Resampling.LANCZOS) |
|
|
| |
| canvas = Image.new('RGB', (target_size, target_size), (255, 255, 255)) |
| offset_x = (target_size - new_width) // 2 |
| offset_y = (target_size - new_height) // 2 |
| canvas.paste(img_resized, (offset_x, offset_y)) |
|
|
| |
| if output_path is None: |
| output_path = image_path |
|
|
| canvas.save(output_path) |
| return output_path |
|
|
| except Exception as e: |
| self.logger.error(f"Error normalizing {image_path}: {e}") |
| return image_path |
|
|
| def find_duplicates(self, image_dir: str = None) -> Dict[str, List[str]]: |
| """Find duplicate images using perceptual hashing.""" |
| if image_dir is None: |
| image_dir = self.scraping_config['images_dir'] |
|
|
| self.logger.info("Finding duplicate images...") |
|
|
| hash_to_files = defaultdict(list) |
| image_files = list(Path(image_dir).glob("*.png")) + \ |
| list(Path(image_dir).glob("*.jpg")) + \ |
| list(Path(image_dir).glob("*.jpeg")) |
|
|
| for img_path in image_files: |
| img_hash = self.calculate_image_hash(str(img_path)) |
| if img_hash: |
| hash_to_files[img_hash].append(str(img_path)) |
|
|
| |
| duplicates = {h: files for h, files in hash_to_files.items() if len(files) > 1} |
|
|
| self.logger.info(f"Found {len(duplicates)} groups of duplicate images") |
| return duplicates |
|
|
| def remove_duplicates(self, keep_first: bool = True) -> int: |
| """Remove duplicate images, keeping only one from each group.""" |
| duplicates = self.find_duplicates() |
| removed_count = 0 |
|
|
| for img_hash, files in duplicates.items(): |
| |
| files_sorted = sorted(files) |
|
|
| |
| files_to_remove = files_sorted[1:] if keep_first else files_sorted[:-1] |
|
|
| for file_path in files_to_remove: |
| try: |
| os.remove(file_path) |
| removed_count += 1 |
| self.logger.debug(f"Removed duplicate: {file_path}") |
|
|
| |
| object_id = Path(file_path).stem.split('_')[0] |
| metadata_path = os.path.join( |
| self.scraping_config['metadata_dir'], |
| f"{object_id}.json" |
| ) |
| if os.path.exists(metadata_path): |
| os.remove(metadata_path) |
|
|
| except Exception as e: |
| self.logger.error(f"Error removing {file_path}: {e}") |
|
|
| self.logger.info(f"Removed {removed_count} duplicate images") |
| return removed_count |
|
|
| def filter_poor_quality(self) -> Tuple[int, int]: |
| """Filter out blurry and undersized images.""" |
| images_dir = self.scraping_config['images_dir'] |
| image_files = list(Path(images_dir).glob("*.png")) + \ |
| list(Path(images_dir).glob("*.jpg")) + \ |
| list(Path(images_dir).glob("*.jpeg")) |
|
|
| removed_blur = 0 |
| removed_size = 0 |
|
|
| for img_path in image_files: |
| img_path_str = str(img_path) |
| remove = False |
| reason = "" |
|
|
| |
| if not self.is_valid_size(img_path_str): |
| remove = True |
| reason = "undersized" |
| removed_size += 1 |
|
|
| |
| elif self.preprocessing_config['detect_blur']: |
| is_blurry, variance = self.detect_blur(img_path_str) |
| if is_blurry: |
| remove = True |
| reason = f"blurry (variance: {variance:.2f})" |
| removed_blur += 1 |
|
|
| if remove: |
| try: |
| os.remove(img_path_str) |
| self.logger.debug(f"Removed {img_path.name}: {reason}") |
|
|
| |
| object_id = img_path.stem.split('_')[0] |
| metadata_path = os.path.join( |
| self.scraping_config['metadata_dir'], |
| f"{object_id}.json" |
| ) |
| if os.path.exists(metadata_path): |
| os.remove(metadata_path) |
|
|
| except Exception as e: |
| self.logger.error(f"Error removing {img_path}: {e}") |
|
|
| self.logger.info(f"Removed {removed_blur} blurry images") |
| self.logger.info(f"Removed {removed_size} undersized images") |
|
|
| return removed_blur, removed_size |
|
|
| def process_all(self): |
| """Run all preprocessing steps.""" |
| self.logger.info("Starting preprocessing pipeline...") |
|
|
| |
| if self.preprocessing_config['detect_blur'] or \ |
| self.scraping_config['min_image_size'] > 0: |
| self.logger.info("Step 1: Filtering poor quality images...") |
| self.filter_poor_quality() |
|
|
| |
| if self.preprocessing_config['remove_duplicates']: |
| self.logger.info("Step 2: Removing duplicates...") |
| self.remove_duplicates() |
|
|
| |
| if self.preprocessing_config['normalize_size'] > 0: |
| self.logger.info("Step 3: Normalizing image sizes...") |
| images_dir = self.scraping_config['images_dir'] |
| image_files = list(Path(images_dir).glob("*.png")) + \ |
| list(Path(images_dir).glob("*.jpg")) + \ |
| list(Path(images_dir).glob("*.jpeg")) |
|
|
| for img_path in image_files: |
| self.normalize_image(str(img_path)) |
|
|
| self.logger.info("Preprocessing complete!") |
|
|
| def generate_dataset_stats(self) -> Dict: |
| """Generate statistics about the dataset.""" |
| images_dir = self.scraping_config['images_dir'] |
| metadata_dir = self.scraping_config['metadata_dir'] |
|
|
| image_files = list(Path(images_dir).glob("*.png")) + \ |
| list(Path(images_dir).glob("*.jpg")) + \ |
| list(Path(images_dir).glob("*.jpeg")) |
|
|
| metadata_files = list(Path(metadata_dir).glob("*.json")) |
|
|
| stats = { |
| 'total_images': len(image_files), |
| 'total_metadata': len(metadata_files), |
| 'cultures': defaultdict(int), |
| 'periods': defaultdict(int), |
| 'mediums': defaultdict(int), |
| 'dimensions': [] |
| } |
|
|
| for meta_file in metadata_files: |
| try: |
| with open(meta_file, 'r') as f: |
| data = json.load(f) |
| stats['cultures'][data.get('culture', 'Unknown')] += 1 |
| stats['periods'][data.get('period', 'Unknown')] += 1 |
| stats['mediums'][data.get('medium', 'Unknown')] += 1 |
| except Exception as e: |
| self.logger.error(f"Error reading {meta_file}: {e}") |
|
|
| |
| stats['cultures'] = dict(stats['cultures']) |
| stats['periods'] = dict(stats['periods']) |
| stats['mediums'] = dict(stats['mediums']) |
|
|
| return stats |
|
|
|
|
| def main(): |
| """Main entry point.""" |
| import argparse |
|
|
| parser = argparse.ArgumentParser(description='Preprocess coin images') |
| parser.add_argument('--config', default='config.json', help='Path to config file') |
| parser.add_argument('--stats-only', action='store_true', help='Only generate statistics') |
|
|
| args = parser.parse_args() |
|
|
| processor = CoinImagePreprocessor(args.config) |
|
|
| if args.stats_only: |
| stats = processor.generate_dataset_stats() |
| print("\n=== Dataset Statistics ===") |
| print(json.dumps(stats, indent=2)) |
| else: |
| processor.process_all() |
|
|
| |
| stats = processor.generate_dataset_stats() |
| print("\n=== Final Dataset Statistics ===") |
| print(json.dumps(stats, indent=2)) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|