Spaces:
Runtime error
Runtime error
| import sqlite3 | |
| import base64 | |
| import uuid | |
| from pathlib import Path | |
| from typing import Optional, List, Dict, Tuple | |
| from PIL import Image | |
| import io | |
| import hashlib | |
| class ImageDatabase: | |
| """SQLite database for storing images and metadata""" | |
| def __init__(self, db_path: str = "images.db"): | |
| self.db_path = db_path | |
| self.init_database() | |
| def init_database(self): | |
| """Initialize the database with required tables""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| # Create images table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS images ( | |
| id TEXT PRIMARY KEY, | |
| file_hash TEXT UNIQUE NOT NULL, | |
| original_path TEXT NOT NULL, | |
| filename TEXT NOT NULL, | |
| file_extension TEXT NOT NULL, | |
| file_size INTEGER NOT NULL, | |
| width INTEGER NOT NULL, | |
| height INTEGER NOT NULL, | |
| image_data BLOB NOT NULL, | |
| thumbnail_data BLOB, | |
| root_folder TEXT NOT NULL, | |
| relative_path TEXT NOT NULL, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| ''') | |
| # Create indexes for better performance | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_file_hash ON images(file_hash)') | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_root_folder ON images(root_folder)') | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_relative_path ON images(relative_path)') | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_filename ON images(filename)') | |
| conn.commit() | |
| conn.close() | |
| def _calculate_file_hash(self, image_data: bytes) -> str: | |
| """Calculate SHA-256 hash of image data""" | |
| return hashlib.sha256(image_data).hexdigest() | |
| def _create_thumbnail(self, image: Image.Image, size: Tuple[int, int] = (200, 200)) -> bytes: | |
| """Create a thumbnail of the image""" | |
| # Create a copy to avoid modifying original | |
| thumbnail = image.copy() | |
| thumbnail.thumbnail(size, Image.Resampling.LANCZOS) | |
| # Convert to bytes | |
| img_byte_arr = io.BytesIO() | |
| # Save as JPEG for thumbnails to reduce size | |
| if thumbnail.mode in ('RGBA', 'LA', 'P'): | |
| thumbnail = thumbnail.convert('RGB') | |
| thumbnail.save(img_byte_arr, format='JPEG', quality=85, optimize=True) | |
| return img_byte_arr.getvalue() | |
| def store_image(self, image_path: Path, root_folder: Path) -> Optional[str]: | |
| """ | |
| Store an image in the database | |
| Returns the image ID if successful, None if failed | |
| """ | |
| try: | |
| # Load the image | |
| with Image.open(image_path) as image: | |
| # Convert to RGB if needed | |
| if image.mode in ('RGBA', 'LA', 'P'): | |
| image = image.convert('RGB') | |
| # Get image data as bytes | |
| img_byte_arr = io.BytesIO() | |
| image.save(img_byte_arr, format='JPEG', quality=95, optimize=True) | |
| image_data = img_byte_arr.getvalue() | |
| # Calculate file hash | |
| file_hash = self._calculate_file_hash(image_data) | |
| # Create thumbnail | |
| thumbnail_data = self._create_thumbnail(image) | |
| # Calculate relative path | |
| relative_path = str(image_path.relative_to(root_folder)) | |
| # Prepare metadata | |
| image_id = str(uuid.uuid4()) | |
| filename = image_path.name | |
| file_extension = image_path.suffix.lower() | |
| file_size = len(image_data) | |
| width, height = image.size | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| # Check if image already exists (by hash) | |
| cursor.execute('SELECT id FROM images WHERE file_hash = ?', (file_hash,)) | |
| existing = cursor.fetchone() | |
| if existing: | |
| print(f"Image already exists in database: {filename}") | |
| conn.close() | |
| return existing[0] | |
| # Insert new image | |
| cursor.execute(''' | |
| INSERT INTO images ( | |
| id, file_hash, original_path, filename, file_extension, | |
| file_size, width, height, image_data, thumbnail_data, | |
| root_folder, relative_path | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| image_id, file_hash, str(image_path.absolute()), filename, | |
| file_extension, file_size, width, height, image_data, | |
| thumbnail_data, str(root_folder.absolute()), relative_path | |
| )) | |
| conn.commit() | |
| conn.close() | |
| print(f"Stored image in database: {filename} (ID: {image_id})") | |
| return image_id | |
| except Exception as e: | |
| print(f"Error storing image {image_path}: {e}") | |
| return None | |
| def get_image(self, image_id: str) -> Optional[Dict]: | |
| """Get an image by ID""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| SELECT id, filename, file_extension, file_size, width, height, | |
| image_data, root_folder, relative_path, created_at | |
| FROM images WHERE id = ? | |
| ''', (image_id,)) | |
| result = cursor.fetchone() | |
| conn.close() | |
| if result: | |
| return { | |
| 'id': result[0], | |
| 'filename': result[1], | |
| 'file_extension': result[2], | |
| 'file_size': result[3], | |
| 'width': result[4], | |
| 'height': result[5], | |
| 'image_data': result[6], | |
| 'root_folder': result[7], | |
| 'relative_path': result[8], | |
| 'created_at': result[9] | |
| } | |
| return None | |
| def get_thumbnail(self, image_id: str) -> Optional[bytes]: | |
| """Get thumbnail data for an image""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| cursor.execute('SELECT thumbnail_data FROM images WHERE id = ?', (image_id,)) | |
| result = cursor.fetchone() | |
| conn.close() | |
| return result[0] if result else None | |
| def get_images_by_folder(self, root_folder: str) -> List[Dict]: | |
| """Get all images from a specific folder""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| SELECT id, filename, file_extension, file_size, width, height, | |
| root_folder, relative_path, created_at | |
| FROM images WHERE root_folder = ? | |
| ORDER BY created_at DESC | |
| ''', (root_folder,)) | |
| results = cursor.fetchall() | |
| conn.close() | |
| return [ | |
| { | |
| 'id': row[0], | |
| 'filename': row[1], | |
| 'file_extension': row[2], | |
| 'file_size': row[3], | |
| 'width': row[4], | |
| 'height': row[5], | |
| 'root_folder': row[6], | |
| 'relative_path': row[7], | |
| 'created_at': row[8] | |
| } | |
| for row in results | |
| ] | |
| def get_all_images(self) -> List[Dict]: | |
| """Get all images from the database""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| SELECT id, filename, file_extension, file_size, width, height, | |
| root_folder, relative_path, created_at | |
| FROM images | |
| ORDER BY created_at DESC | |
| ''') | |
| results = cursor.fetchall() | |
| conn.close() | |
| return [ | |
| { | |
| 'id': row[0], | |
| 'filename': row[1], | |
| 'file_extension': row[2], | |
| 'file_size': row[3], | |
| 'width': row[4], | |
| 'height': row[5], | |
| 'root_folder': row[6], | |
| 'relative_path': row[7], | |
| 'created_at': row[8] | |
| } | |
| for row in results | |
| ] | |
| def delete_image(self, image_id: str) -> bool: | |
| """Delete an image from the database""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| cursor.execute('DELETE FROM images WHERE id = ?', (image_id,)) | |
| deleted = cursor.rowcount > 0 | |
| conn.commit() | |
| conn.close() | |
| return deleted | |
| def delete_images_by_folder(self, root_folder: str) -> int: | |
| """Delete all images from a specific folder""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| cursor.execute('DELETE FROM images WHERE root_folder = ?', (root_folder,)) | |
| deleted_count = cursor.rowcount | |
| conn.commit() | |
| conn.close() | |
| return deleted_count | |
| def image_exists_by_path(self, relative_path: str, root_folder: str) -> Optional[str]: | |
| """Check if an image exists by its path, return image ID if exists""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| SELECT id FROM images | |
| WHERE relative_path = ? AND root_folder = ? | |
| ''', (relative_path, root_folder)) | |
| result = cursor.fetchone() | |
| conn.close() | |
| return result[0] if result else None | |
| def get_database_stats(self) -> Dict: | |
| """Get database statistics""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| # Total images | |
| cursor.execute('SELECT COUNT(*) FROM images') | |
| total_images = cursor.fetchone()[0] | |
| # Total size | |
| cursor.execute('SELECT SUM(file_size) FROM images') | |
| total_size = cursor.fetchone()[0] or 0 | |
| # Images by folder | |
| cursor.execute('SELECT root_folder, COUNT(*) FROM images GROUP BY root_folder') | |
| folders = cursor.fetchall() | |
| conn.close() | |
| return { | |
| 'total_images': total_images, | |
| 'total_size_bytes': total_size, | |
| 'total_size_mb': round(total_size / (1024 * 1024), 2), | |
| 'folders': {folder: count for folder, count in folders} | |
| } | |