Spaces:
Runtime error
Runtime error
import gradio as gr | |
import os | |
from PIL import Image | |
from PIL.PngImagePlugin import PngImageFile | |
import json | |
from typing import Dict, List, Tuple, Optional | |
from pathlib import Path | |
import sqlite3 | |
import time | |
from modules.path import PathManager # FIXME import from shared? | |
from modules.util import TimeIt | |
from shared import settings | |
import version | |
def format_metadata(metadata: Dict) -> Dict: | |
"""Format metadata into a more readable structure.""" | |
try: | |
# Create formatted output dictionary | |
formatted = {"File Path": metadata.get("file_path", "Unknown")} | |
# Parse the parameters string if it exists | |
if "parameters" in metadata: | |
params = json.loads(metadata["parameters"]) | |
# Add generation parameters in a logical order | |
if "Prompt" in params: | |
formatted["Prompt"] = params["Prompt"].strip() | |
if "Negative" in params: | |
formatted["Negative Prompt"] = params["Negative"].strip() | |
# Model information | |
if "base_model_name" in params: | |
formatted["Model"] = params["base_model_name"] | |
if "base_model_hash" in params: | |
formatted["Model Hash"] = params["base_model_hash"] | |
# Generation settings | |
settings = {} | |
for key in [ | |
"steps", | |
"cfg", | |
"width", | |
"height", | |
"seed", | |
"sampler_name", | |
"scheduler", | |
"clip_skip", | |
"denoise", | |
]: | |
if key in params and params[key] is not None: | |
settings[key.capitalize()] = params[key] | |
formatted["Settings"] = settings | |
# Software info | |
if "software" in params: | |
formatted["Software"] = params["software"] | |
# LoRAs if present | |
if "loras" in params and params["loras"]: | |
formatted["LoRAs"] = params["loras"] | |
return formatted | |
except Exception as e: | |
print(f"Error formatting metadata: {e}") | |
return metadata | |
def format_metadata_string(metadata: Dict) -> str: | |
"""Convert formatted metadata into a readable string.""" | |
try: | |
formatted = format_metadata(metadata) | |
output = [] | |
# File path | |
output.append(f"File: {formatted['File Path']}\n") | |
# Prompt | |
if "Prompt" in formatted: | |
output.append("Prompt:") | |
output.append(formatted["Prompt"]) | |
output.append("") | |
# Negative prompt | |
if "Negative Prompt" in formatted and formatted["Negative Prompt"]: | |
output.append("Negative Prompt:") | |
output.append(formatted["Negative Prompt"]) | |
output.append("") | |
# Model info | |
if "Model" in formatted: | |
output.append(f"Model: {formatted['Model']}") | |
if "Model Hash" in formatted: | |
output.append(f"Hash: {formatted['Model Hash']}") | |
output.append("") | |
# Settings | |
if "Settings" in formatted: | |
output.append("Generation Settings:") | |
for key, value in formatted["Settings"].items(): | |
output.append(f" {key}: {value}") | |
output.append("") | |
# Software | |
if "Software" in formatted: | |
output.append(f"Software: {formatted['Software']}") | |
# LoRAs | |
if "LoRAs" in formatted and formatted["LoRAs"]: | |
output.append("\nLoRAs:") | |
for lora in formatted["LoRAs"]: | |
output.append(f" {lora}") | |
return "\n".join(output) | |
except Exception as e: | |
return f"Error formatting metadata string: {e}\n\nRaw metadata:\n{json.dumps(metadata, indent=2)}" | |
def get_png_metadata(image_path: str) -> Dict: | |
"""Extract metadata from PNG file.""" | |
try: | |
with Image.open(image_path) as img: | |
if isinstance(img, PngImageFile): | |
metadata = {} | |
for key, value in img.info.items(): | |
if isinstance(value, bytes): | |
try: | |
value = value.decode("utf-8") | |
except UnicodeDecodeError: | |
value = str(value) | |
metadata[key] = value | |
return metadata | |
return {} | |
except Exception as e: | |
print(f"Error reading metadata from {image_path}: {e}") | |
return {} | |
def connect_database(path="cache/images.db"): | |
# Connect to an SQLite database (or create it if it doesn't exist) | |
conn = sqlite3.connect(path, check_same_thread=False) | |
conn.cursor() | |
conn.execute('''CREATE TABLE IF NOT EXISTS status (version text, date text)''') | |
res = conn.execute("SELECT count(*) FROM status") | |
cnt = res.fetchone()[0] | |
newdb = False | |
if cnt == 0: | |
conn.execute( | |
"INSERT INTO status(version, date) VALUES (?,?)", | |
(str(version.version), str(time.time())) | |
) | |
newdb = True | |
else: | |
res = conn.execute("SELECT version FROM status") | |
dbver = res.fetchone()[0] | |
if dbver != version.version: | |
newdb = True | |
if newdb: | |
try: | |
conn.execute( | |
"UPDATE status SET version = ?, date = ?", | |
(str(version.version), str(time.time())) | |
) | |
conn.execute("DROP TABLE images") | |
except: | |
pass | |
conn.commit() | |
conn.cursor() | |
conn.execute('''CREATE TABLE IF NOT EXISTS images (fullpath text, path text, json text)''') | |
conn.commit() | |
return conn | |
class ImageBrowser: | |
def __init__(self): | |
self.path_manager = PathManager() | |
self.base_path = Path(self.path_manager.model_paths["temp_outputs_path"]) | |
self.current_display_paths = [] # Track currently displayed images | |
self.sql_conn = connect_database() | |
self.images_per_page = int(settings.default_settings.get("images_per_page", 100)) | |
self.filter = "" | |
def num_images_pages(self): | |
result = self.sql_conn.execute(f"SELECT count(*) FROM images WHERE json LIKE '%{self.filter}%'") # FIXME!!! should only match prompt? | |
image_cnt = result.fetchone()[0] | |
pages = int(image_cnt/self.images_per_page) + 1 | |
return image_cnt, pages | |
def load_images(self, page: int) -> Tuple[List[str], str]: | |
text = "" | |
if page == None: | |
page = 1 | |
result = self.sql_conn.execute( | |
f"SELECT fullpath, path FROM images WHERE json LIKE '%{self.filter}%' ORDER BY path DESC LIMIT ? OFFSET ?", | |
( | |
str(self.images_per_page), | |
str((page-1)*self.images_per_page), | |
) | |
) | |
image_paths = result.fetchall() | |
self.current_display_paths = image_paths # Store current display order | |
if image_paths: | |
path1 = str(Path(image_paths[0][1])) | |
path2 = str(Path(image_paths[-1][1])) | |
text = f"{path1} ... {path2}" | |
else: | |
text = "" | |
if image_paths: | |
return list(Path(x[0]) for x in image_paths), text | |
return [], text | |
def add_image(self, full_path, rel_path, metadata, commit=False): | |
if commit: | |
self.sql_conn.cursor() | |
self.sql_conn.execute( | |
"INSERT INTO images (fullpath, path, json) VALUES (?, ?, ?)", | |
(str(full_path), str(rel_path), json.dumps(metadata)) | |
) | |
if commit: | |
self.sql_conn.commit() | |
def update_images(self) -> Tuple[List[str], str]: | |
"""Check all images and update database""" | |
try: | |
if not self.base_path.exists(): | |
return [], f"Folder not found: {self.base_path}" | |
image_cnt = 0 | |
self.sql_conn.cursor() | |
self.sql_conn.execute("DROP TABLE images") | |
self.sql_conn.commit() | |
self.sql_conn = connect_database() | |
self.sql_conn.cursor() | |
# Walk through directory and all subdirectories | |
print("Scanning folder to update DB:") | |
with TimeIt("Update DB"): | |
for folder in [self.base_path] + settings.default_settings.get("archive_folders", []): | |
print(f" {folder}") | |
for root, _, files in os.walk(folder): | |
for filename in files: | |
if filename.lower().endswith((".png", ".gif")): | |
#if filename.lower().endswith(".png"): | |
full_path = Path(root) / filename | |
rel_path = str(full_path.relative_to(folder)) | |
if filename.lower().endswith(".png"): | |
metadata = get_png_metadata(str(full_path)) | |
else: | |
metadata = {} # FIXME fake data for non-png images | |
metadata["file_path"] = rel_path | |
self.add_image(str(full_path), str(rel_path), metadata) | |
image_cnt += 1 | |
self.sql_conn.commit() | |
folders = ", ".join(map(str, [self.base_path] + settings.default_settings.get("archive_folders", []))) | |
if image_cnt: | |
return ( | |
gr.update(value=self.load_images(1)[0]), | |
gr.update( | |
value=1, | |
maximum=int(image_cnt/self.images_per_page) + 1, | |
), | |
gr.update( | |
value=f"Found {image_cnt} images from {folders} and subdirectories", | |
) | |
) | |
return ( | |
gr.update(value=[]), | |
gr.update(value=1, maximum=1), | |
gr.update(value=f"No images found in {folders} or subdirectories") | |
) | |
except Exception as e: | |
return ( | |
gr.update(value=["html/error.png"]), | |
gr.update(value=1, maximum=1), | |
gr.update(value=f"Error updating folder: {e}") | |
) | |
def get_image_metadata(self, evt: gr.SelectData) -> str: | |
"""Get metadata for selected image.""" | |
try: | |
selected_path = self.current_display_paths[evt.index][0] | |
result = self.sql_conn.execute("SELECT json FROM images WHERE fullpath = ?", (str(selected_path),)) | |
data = json.loads(result.fetchone()[0]) | |
return format_metadata_string(data) | |
except Exception as e: | |
return f"Error getting metadata: {e}" | |
def search_metadata(self, search_term: str) -> Tuple[List[str], str]: | |
self.filter = search_term | |
images = self.load_images(1)[0] | |
num_images, num_pages = self.num_images_pages() | |
text = f"Found {num_images} images" | |
return ( | |
gr.update(value=images), | |
gr.update(value=1, maximum=num_pages), | |
gr.update(value=text) | |
) |