Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import os | |
| from PIL import Image | |
| from PIL.PngImagePlugin import PngImageFile | |
| import json | |
| from typing import Dict, List, Tuple, Optional | |
| from pathlib import Path | |
| import sqlite3 | |
| import time | |
| from modules.path import PathManager # FIXME import from shared? | |
| from modules.util import TimeIt | |
| from shared import settings | |
| import version | |
| def format_metadata(metadata: Dict) -> Dict: | |
| """Format metadata into a more readable structure.""" | |
| try: | |
| # Create formatted output dictionary | |
| formatted = {"File Path": metadata.get("file_path", "Unknown")} | |
| # Parse the parameters string if it exists | |
| if "parameters" in metadata: | |
| params = json.loads(metadata["parameters"]) | |
| # Add generation parameters in a logical order | |
| if "Prompt" in params: | |
| formatted["Prompt"] = params["Prompt"].strip() | |
| if "Negative" in params: | |
| formatted["Negative Prompt"] = params["Negative"].strip() | |
| # Model information | |
| if "base_model_name" in params: | |
| formatted["Model"] = params["base_model_name"] | |
| if "base_model_hash" in params: | |
| formatted["Model Hash"] = params["base_model_hash"] | |
| # Generation settings | |
| settings = {} | |
| for key in [ | |
| "steps", | |
| "cfg", | |
| "width", | |
| "height", | |
| "seed", | |
| "sampler_name", | |
| "scheduler", | |
| "clip_skip", | |
| "denoise", | |
| ]: | |
| if key in params and params[key] is not None: | |
| settings[key.capitalize()] = params[key] | |
| formatted["Settings"] = settings | |
| # Software info | |
| if "software" in params: | |
| formatted["Software"] = params["software"] | |
| # LoRAs if present | |
| if "loras" in params and params["loras"]: | |
| formatted["LoRAs"] = params["loras"] | |
| return formatted | |
| except Exception as e: | |
| print(f"Error formatting metadata: {e}") | |
| return metadata | |
| def format_metadata_string(metadata: Dict) -> str: | |
| """Convert formatted metadata into a readable string.""" | |
| try: | |
| formatted = format_metadata(metadata) | |
| output = [] | |
| # File path | |
| output.append(f"File: {formatted['File Path']}\n") | |
| # Prompt | |
| if "Prompt" in formatted: | |
| output.append("Prompt:") | |
| output.append(formatted["Prompt"]) | |
| output.append("") | |
| # Negative prompt | |
| if "Negative Prompt" in formatted and formatted["Negative Prompt"]: | |
| output.append("Negative Prompt:") | |
| output.append(formatted["Negative Prompt"]) | |
| output.append("") | |
| # Model info | |
| if "Model" in formatted: | |
| output.append(f"Model: {formatted['Model']}") | |
| if "Model Hash" in formatted: | |
| output.append(f"Hash: {formatted['Model Hash']}") | |
| output.append("") | |
| # Settings | |
| if "Settings" in formatted: | |
| output.append("Generation Settings:") | |
| for key, value in formatted["Settings"].items(): | |
| output.append(f" {key}: {value}") | |
| output.append("") | |
| # Software | |
| if "Software" in formatted: | |
| output.append(f"Software: {formatted['Software']}") | |
| # LoRAs | |
| if "LoRAs" in formatted and formatted["LoRAs"]: | |
| output.append("\nLoRAs:") | |
| for lora in formatted["LoRAs"]: | |
| output.append(f" {lora}") | |
| return "\n".join(output) | |
| except Exception as e: | |
| return f"Error formatting metadata string: {e}\n\nRaw metadata:\n{json.dumps(metadata, indent=2)}" | |
| def get_png_metadata(image_path: str) -> Dict: | |
| """Extract metadata from PNG file.""" | |
| try: | |
| with Image.open(image_path) as img: | |
| if isinstance(img, PngImageFile): | |
| metadata = {} | |
| for key, value in img.info.items(): | |
| if isinstance(value, bytes): | |
| try: | |
| value = value.decode("utf-8") | |
| except UnicodeDecodeError: | |
| value = str(value) | |
| metadata[key] = value | |
| return metadata | |
| return {} | |
| except Exception as e: | |
| print(f"Error reading metadata from {image_path}: {e}") | |
| return {} | |
| def connect_database(path="cache/images.db"): | |
| # Connect to an SQLite database (or create it if it doesn't exist) | |
| conn = sqlite3.connect(path, check_same_thread=False) | |
| conn.cursor() | |
| conn.execute('''CREATE TABLE IF NOT EXISTS status (version text, date text)''') | |
| res = conn.execute("SELECT count(*) FROM status") | |
| cnt = res.fetchone()[0] | |
| newdb = False | |
| if cnt == 0: | |
| conn.execute( | |
| "INSERT INTO status(version, date) VALUES (?,?)", | |
| (str(version.version), str(time.time())) | |
| ) | |
| newdb = True | |
| else: | |
| res = conn.execute("SELECT version FROM status") | |
| dbver = res.fetchone()[0] | |
| if dbver != version.version: | |
| newdb = True | |
| if newdb: | |
| try: | |
| conn.execute( | |
| "UPDATE status SET version = ?, date = ?", | |
| (str(version.version), str(time.time())) | |
| ) | |
| conn.execute("DROP TABLE images") | |
| except: | |
| pass | |
| conn.commit() | |
| conn.cursor() | |
| conn.execute('''CREATE TABLE IF NOT EXISTS images (fullpath text, path text, json text)''') | |
| conn.commit() | |
| return conn | |
| class ImageBrowser: | |
| def __init__(self): | |
| self.path_manager = PathManager() | |
| self.base_path = Path(self.path_manager.model_paths["temp_outputs_path"]) | |
| self.current_display_paths = [] # Track currently displayed images | |
| self.sql_conn = connect_database() | |
| self.images_per_page = int(settings.default_settings.get("images_per_page", 100)) | |
| self.filter = "" | |
| def num_images_pages(self): | |
| result = self.sql_conn.execute(f"SELECT count(*) FROM images WHERE json LIKE '%{self.filter}%'") # FIXME!!! should only match prompt? | |
| image_cnt = result.fetchone()[0] | |
| pages = int(image_cnt/self.images_per_page) + 1 | |
| return image_cnt, pages | |
| def load_images(self, page: int) -> Tuple[List[str], str]: | |
| text = "" | |
| if page == None: | |
| page = 1 | |
| result = self.sql_conn.execute( | |
| f"SELECT fullpath, path FROM images WHERE json LIKE '%{self.filter}%' ORDER BY path DESC LIMIT ? OFFSET ?", | |
| ( | |
| str(self.images_per_page), | |
| str((page-1)*self.images_per_page), | |
| ) | |
| ) | |
| image_paths = result.fetchall() | |
| self.current_display_paths = image_paths # Store current display order | |
| if image_paths: | |
| path1 = str(Path(image_paths[0][1])) | |
| path2 = str(Path(image_paths[-1][1])) | |
| text = f"{path1} ... {path2}" | |
| else: | |
| text = "" | |
| if image_paths: | |
| return list(Path(x[0]) for x in image_paths), text | |
| return [], text | |
| def add_image(self, full_path, rel_path, metadata, commit=False): | |
| if commit: | |
| self.sql_conn.cursor() | |
| self.sql_conn.execute( | |
| "INSERT INTO images (fullpath, path, json) VALUES (?, ?, ?)", | |
| (str(full_path), str(rel_path), json.dumps(metadata)) | |
| ) | |
| if commit: | |
| self.sql_conn.commit() | |
| def update_images(self) -> Tuple[List[str], str]: | |
| """Check all images and update database""" | |
| try: | |
| if not self.base_path.exists(): | |
| return [], f"Folder not found: {self.base_path}" | |
| image_cnt = 0 | |
| self.sql_conn.cursor() | |
| self.sql_conn.execute("DROP TABLE images") | |
| self.sql_conn.commit() | |
| self.sql_conn = connect_database() | |
| self.sql_conn.cursor() | |
| # Walk through directory and all subdirectories | |
| print("Scanning folder to update DB:") | |
| with TimeIt("Update DB"): | |
| for folder in [self.base_path] + settings.default_settings.get("archive_folders", []): | |
| print(f" {folder}") | |
| for root, _, files in os.walk(folder): | |
| for filename in files: | |
| if filename.lower().endswith((".png", ".gif")): | |
| #if filename.lower().endswith(".png"): | |
| full_path = Path(root) / filename | |
| rel_path = str(full_path.relative_to(folder)) | |
| if filename.lower().endswith(".png"): | |
| metadata = get_png_metadata(str(full_path)) | |
| else: | |
| metadata = {} # FIXME fake data for non-png images | |
| metadata["file_path"] = rel_path | |
| self.add_image(str(full_path), str(rel_path), metadata) | |
| image_cnt += 1 | |
| self.sql_conn.commit() | |
| folders = ", ".join(map(str, [self.base_path] + settings.default_settings.get("archive_folders", []))) | |
| if image_cnt: | |
| return ( | |
| gr.update(value=self.load_images(1)[0]), | |
| gr.update( | |
| value=1, | |
| maximum=int(image_cnt/self.images_per_page) + 1, | |
| ), | |
| gr.update( | |
| value=f"Found {image_cnt} images from {folders} and subdirectories", | |
| ) | |
| ) | |
| return ( | |
| gr.update(value=[]), | |
| gr.update(value=1, maximum=1), | |
| gr.update(value=f"No images found in {folders} or subdirectories") | |
| ) | |
| except Exception as e: | |
| return ( | |
| gr.update(value=["html/error.png"]), | |
| gr.update(value=1, maximum=1), | |
| gr.update(value=f"Error updating folder: {e}") | |
| ) | |
| def get_image_metadata(self, evt: gr.SelectData) -> str: | |
| """Get metadata for selected image.""" | |
| try: | |
| selected_path = self.current_display_paths[evt.index][0] | |
| result = self.sql_conn.execute("SELECT json FROM images WHERE fullpath = ?", (str(selected_path),)) | |
| data = json.loads(result.fetchone()[0]) | |
| return format_metadata_string(data) | |
| except Exception as e: | |
| return f"Error getting metadata: {e}" | |
| def search_metadata(self, search_term: str) -> Tuple[List[str], str]: | |
| self.filter = search_term | |
| images = self.load_images(1)[0] | |
| num_images, num_pages = self.num_images_pages() | |
| text = f"Found {num_images} images" | |
| return ( | |
| gr.update(value=images), | |
| gr.update(value=1, maximum=num_pages), | |
| gr.update(value=text) | |
| ) |