File size: 11,118 Bytes
2de3774
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
import gradio as gr
import os
from PIL import Image
from PIL.PngImagePlugin import PngImageFile
import json
from typing import Dict, List, Tuple, Optional
from pathlib import Path
import sqlite3
import time
from modules.path import PathManager # FIXME import from shared?
from modules.util import TimeIt
from shared import settings
import version


def format_metadata(metadata: Dict) -> Dict:
    """Format metadata into a more readable structure."""
    try:
        # Create formatted output dictionary
        formatted = {"File Path": metadata.get("file_path", "Unknown")}

        # Parse the parameters string if it exists
        if "parameters" in metadata:
            params = json.loads(metadata["parameters"])

            # Add generation parameters in a logical order
            if "Prompt" in params:
                formatted["Prompt"] = params["Prompt"].strip()
            if "Negative" in params:
                formatted["Negative Prompt"] = params["Negative"].strip()

            # Model information
            if "base_model_name" in params:
                formatted["Model"] = params["base_model_name"]
            if "base_model_hash" in params:
                formatted["Model Hash"] = params["base_model_hash"]

            # Generation settings
            settings = {}
            for key in [
                "steps",
                "cfg",
                "width",
                "height",
                "seed",
                "sampler_name",
                "scheduler",
                "clip_skip",
                "denoise",
            ]:
                if key in params and params[key] is not None:
                    settings[key.capitalize()] = params[key]
            formatted["Settings"] = settings

            # Software info
            if "software" in params:
                formatted["Software"] = params["software"]

            # LoRAs if present
            if "loras" in params and params["loras"]:
                formatted["LoRAs"] = params["loras"]

        return formatted
    except Exception as e:
        print(f"Error formatting metadata: {e}")
        return metadata


def format_metadata_string(metadata: Dict) -> str:
    """Convert formatted metadata into a readable string."""
    try:
        formatted = format_metadata(metadata)
        output = []

        # File path
        output.append(f"File: {formatted['File Path']}\n")

        # Prompt
        if "Prompt" in formatted:
            output.append("Prompt:")
            output.append(formatted["Prompt"])
            output.append("")

        # Negative prompt
        if "Negative Prompt" in formatted and formatted["Negative Prompt"]:
            output.append("Negative Prompt:")
            output.append(formatted["Negative Prompt"])
            output.append("")

        # Model info
        if "Model" in formatted:
            output.append(f"Model: {formatted['Model']}")
            if "Model Hash" in formatted:
                output.append(f"Hash: {formatted['Model Hash']}")
            output.append("")

        # Settings
        if "Settings" in formatted:
            output.append("Generation Settings:")
            for key, value in formatted["Settings"].items():
                output.append(f"  {key}: {value}")
            output.append("")

        # Software
        if "Software" in formatted:
            output.append(f"Software: {formatted['Software']}")

        # LoRAs
        if "LoRAs" in formatted and formatted["LoRAs"]:
            output.append("\nLoRAs:")
            for lora in formatted["LoRAs"]:
                output.append(f"  {lora}")

        return "\n".join(output)
    except Exception as e:
        return f"Error formatting metadata string: {e}\n\nRaw metadata:\n{json.dumps(metadata, indent=2)}"


def get_png_metadata(image_path: str) -> Dict:
    """Extract metadata from PNG file."""
    try:
        with Image.open(image_path) as img:
            if isinstance(img, PngImageFile):
                metadata = {}
                for key, value in img.info.items():
                    if isinstance(value, bytes):
                        try:
                            value = value.decode("utf-8")
                        except UnicodeDecodeError:
                            value = str(value)
                    metadata[key] = value
                return metadata
            return {}
    except Exception as e:
        print(f"Error reading metadata from {image_path}: {e}")
        return {}


def connect_database(path="cache/images.db"):
    # Connect to an SQLite database (or create it if it doesn't exist)
    conn = sqlite3.connect(path, check_same_thread=False)

    conn.cursor()
    conn.execute('''CREATE TABLE IF NOT EXISTS status (version text, date text)''')
    res = conn.execute("SELECT count(*) FROM status")
    cnt = res.fetchone()[0]
    newdb = False
    if cnt == 0:
        conn.execute(
            "INSERT INTO status(version, date) VALUES (?,?)",
            (str(version.version), str(time.time()))
        )
        newdb = True
    else:
        res = conn.execute("SELECT version FROM status")
        dbver = res.fetchone()[0]
        if dbver != version.version:
            newdb = True

    if newdb:
        try:
            conn.execute(
                "UPDATE status SET version = ?, date = ?", 
                (str(version.version), str(time.time()))
            )
            conn.execute("DROP TABLE images")
        except:
            pass
    conn.commit()
    conn.cursor()
    conn.execute('''CREATE TABLE IF NOT EXISTS images (fullpath text, path text, json text)''')
    conn.commit()

    return conn


class ImageBrowser:
    def __init__(self):
        self.path_manager = PathManager()
        self.base_path = Path(self.path_manager.model_paths["temp_outputs_path"])
        self.current_display_paths = []  # Track currently displayed images
        self.sql_conn = connect_database()
        self.images_per_page = int(settings.default_settings.get("images_per_page", 100))
        self.filter = ""

    def num_images_pages(self):
        result = self.sql_conn.execute(f"SELECT count(*) FROM images WHERE json LIKE '%{self.filter}%'") # FIXME!!! should only match prompt?
        image_cnt = result.fetchone()[0]
        pages = int(image_cnt/self.images_per_page) + 1
        return image_cnt, pages

    def load_images(self, page: int) -> Tuple[List[str], str]:
        text = ""
        if page == None:
            page = 1
        result = self.sql_conn.execute(
            f"SELECT fullpath, path FROM images WHERE json LIKE '%{self.filter}%' ORDER BY path DESC LIMIT ? OFFSET ?",
            (
                str(self.images_per_page),
                str((page-1)*self.images_per_page),
            )
        )
        image_paths = result.fetchall()
        self.current_display_paths = image_paths  # Store current display order
        if image_paths:
            path1 = str(Path(image_paths[0][1]))
            path2 = str(Path(image_paths[-1][1]))
            text = f"{path1} ... {path2}"
        else:
            text = ""

        if image_paths:
            return list(Path(x[0]) for x in image_paths), text
        return [], text

    def add_image(self, full_path, rel_path, metadata, commit=False):
        if commit:
            self.sql_conn.cursor()
        self.sql_conn.execute(
            "INSERT INTO images (fullpath, path, json) VALUES (?, ?, ?)",
            (str(full_path), str(rel_path), json.dumps(metadata))
        )
        if commit:
            self.sql_conn.commit()

    def update_images(self) -> Tuple[List[str], str]:
        """Check all images and update database"""
        try:
            if not self.base_path.exists():
                return [], f"Folder not found: {self.base_path}"

            image_cnt = 0
            self.sql_conn.cursor()
            self.sql_conn.execute("DROP TABLE images")
            self.sql_conn.commit()
            self.sql_conn = connect_database()
            self.sql_conn.cursor()

            # Walk through directory and all subdirectories
            print("Scanning folder to update DB:")
            with TimeIt("Update DB"):
                for folder in [self.base_path] + settings.default_settings.get("archive_folders", []):
                    print(f"    {folder}")
                    for root, _, files in os.walk(folder):
                        for filename in files:
                            if filename.lower().endswith((".png", ".gif")):
                            #if filename.lower().endswith(".png"):
                                full_path = Path(root) / filename
                                rel_path = str(full_path.relative_to(folder))
                                if filename.lower().endswith(".png"):
                                    metadata = get_png_metadata(str(full_path))
                                else:
                                    metadata = {} # FIXME fake data for non-png images
                                metadata["file_path"] = rel_path

                                self.add_image(str(full_path), str(rel_path), metadata)
                                image_cnt += 1

            self.sql_conn.commit()

            folders = ", ".join(map(str, [self.base_path] + settings.default_settings.get("archive_folders", [])))
            if image_cnt:
                return (
                    gr.update(value=self.load_images(1)[0]),
                    gr.update(
                        value=1,
                        maximum=int(image_cnt/self.images_per_page) + 1,
                    ),
                    gr.update(
                        value=f"Found {image_cnt} images from {folders} and subdirectories",
                    )
                )
            return (
                gr.update(value=[]),
                gr.update(value=1, maximum=1),
                gr.update(value=f"No images found in {folders} or subdirectories")
            )

        except Exception as e:
            return (
                gr.update(value=["html/error.png"]),
                gr.update(value=1, maximum=1),
                gr.update(value=f"Error updating folder: {e}")
            )

    def get_image_metadata(self, evt: gr.SelectData) -> str:
        """Get metadata for selected image."""
        try:
            selected_path = self.current_display_paths[evt.index][0]
            result = self.sql_conn.execute("SELECT json FROM images WHERE fullpath = ?", (str(selected_path),))
            data = json.loads(result.fetchone()[0])
            return format_metadata_string(data)
        except Exception as e:
            return f"Error getting metadata: {e}"

    def search_metadata(self, search_term: str) -> Tuple[List[str], str]:
        self.filter = search_term
        images = self.load_images(1)[0]
        num_images, num_pages = self.num_images_pages()
        text = f"Found {num_images} images"
        return (
            gr.update(value=images),
            gr.update(value=1, maximum=num_pages),
            gr.update(value=text)
        )