File size: 14,717 Bytes
2de3774
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
import requests
import hashlib
import shutil
import os
import cv2
import json
import threading
import time
from pathlib import Path
import numpy as np

class Models:
    civit_workers = []

    def civit_update_worker(self, model_type, folder_paths):
        from shared import path_manager

        try:
            import imageio.v3
        except:
            # Skip updates if we are missing imageio
            print(f"Can't find imageio.v3 module: Skip CivitAI update")
            return
        if str(model_type) in self.civit_workers:
            # Already working on this folder
            print(f"Skip CivitAI check. Update for {model_type} already running.")
            return
        if not Path(self.cache_paths[model_type]).is_dir():
            print(f"WARNING: Can't find {self.cache_paths[model_type]}  Will not update thumbnails.")
            return

        self.civit_workers.append(str(model_type))
        self.ready[model_type] = False
        updated = 0

        # Quick list
        self.names[model_type] = []
        for folder in folder_paths:
            for path in folder.rglob("*"):
                if path.suffix.lower() in self.EXTENSIONS:
                    # Add to model names
                    self.names[model_type].append(str(path.relative_to(folder)))

        # Return a sorted list, prepend names with 0 if they are in a folder or 1
        # if it is a plain file. This will sort folders above files in the dropdown
        self.names[model_type] = sorted(
            self.names[model_type],
            key=lambda x: (
                f"0{x.casefold()}"
                if not str(Path(x).parent) == "."
                else f"1{x.casefold()}"
            ),
        )
        self.ready[model_type] = True

        if self.offline:
            self.civit_workers.remove(str(model_type))
            return

        if model_type == "inbox" and self.names["inbox"]:
            checkpoints = path_manager.model_paths["modelfile_path"]
            checkpoints = checkpoints[0] if isinstance(checkpoints, list) else checkpoints
            loras = path_manager.model_paths["lorafile_path"]
            loras = loras[0] if isinstance(loras, list) else loras
            folders = {
                "LORA": (loras, self.cache_paths["loras"]),
                "LoCon": (loras, self.cache_paths["loras"]),
                "Checkpoint": (checkpoints, self.cache_paths["checkpoints"]),
            }

        # Go though and check previews
        for folder in folder_paths:
            for path in folder.rglob("*"):
                if path.suffix.lower() in self.EXTENSIONS:
                    # get file name, add cache path change suffix
                    cache_file = Path(self.cache_paths[model_type] / path.name)
                    models = self.get_models_by_path(model_type, str(path))

                    suffixes = [".jpeg", ".jpg", ".png", ".gif"]
                    has_preview = False
                    for suffix in suffixes:
                        thumbcheck = cache_file.with_suffix(suffix)
                        if Path(thumbcheck).is_file():
                            has_preview = True
                            break

                    if not has_preview:
                        #print(f"Downloading model thumbnail for {Path(path).name} ({self.get_model_base(models)} - {self.get_model_type(models)})")
                        self.get_image(models, thumbcheck)
                        updated += 1
                        time.sleep(1)

                    txtcheck = cache_file.with_suffix(".txt")
                    if model_type == "loras" and not txtcheck.exists():
                        print(f"Get LoRA keywords for {Path(path).name} ({self.get_model_base(models)} - {self.get_model_type(models)})")
                        keywords = self.get_keywords(models)
                        with open(txtcheck, "w") as f:
                            f.write(", ".join(keywords))
                        updated += 1

                    if model_type == "inbox":
                        name = str(path.relative_to(folder_paths[0])) # FIXME handle if inbox is a list
                        model = self.get_models_by_path("inbox", name)
                        filename =  self.get_file_from_name("inbox", name)
                        if model is None:
                            continue
                        baseModel = self.get_model_base(model)
                        folder, cache = folders.get(self.get_model_type(model), [None, None])
                        if folder is None or baseModel is None:
                            print(f"Skipping {name} not sure what {self.get_model_type(model)} is.")
                            continue
                        # Move model to correct folder
                        dest = Path(folder) / baseModel
                        if not dest.exists():
                            dest.mkdir(parents=True, exist_ok=True)
                        shutil.move(Path(filename), Path(dest) / name)
                        # Move cache-files
                        cache_file = Path(self.cache_paths[model_type] / name)
                        suffixes = [".json", ".txt", ".jpeg", ".jpg", ".png", ".gif"]
                        for suffix in suffixes:
                            cachefile = cache_file.with_suffix(suffix)
                            if cachefile.is_file():
                                shutil.move(cachefile, Path(cache) / cachefile.name)
                        print(f"Moved {name} to {dest}")

        if updated > 0:
            print(f"CivitAI update for {model_type} done.")
        self.civit_workers.remove(str(model_type))

    def get_names(self, model_type):
        while not self.ready[model_type]:
            # Wait until we have read all the filenames
            time.sleep(0.2)
        return self.names[model_type]

    def get_file(self, model_type, name):
        # Search the folders for the model
        for folder in self.model_dirs[model_type]:
            file = Path(folder) / name
            if file.is_file():
                return file
        return None

    def update_all_models(self):
        for model_type in ["checkpoints", "loras", "inbox"]:
            threading.Thread(
                target=self.civit_update_worker,
                args=(
                    model_type,
                    self.model_dirs[model_type],
                ),
                daemon=True,
            ).start()

    def __init__(self, offline=False):
        from shared import path_manager, settings

        self.offline = offline

        self.ready = {
            "checkpoints": False,
            "loras": False,
            "inbox": False,
        }
        self.names = {
            "checkpoints": [],
            "loras": [],
            "inbox": [],
        }
        checkpoints = path_manager.model_paths["modelfile_path"]
        checkpoints = checkpoints if isinstance(checkpoints, list) else [checkpoints]
        loras = path_manager.model_paths["lorafile_path"]
        loras = loras if isinstance(loras, list) else [loras]
        inbox = path_manager.model_paths["inbox_path"]
        inbox = inbox if isinstance(inbox, list) else [inbox]
        self.model_dirs = {
            "checkpoints": checkpoints,
            "loras": loras,
            "inbox": inbox,
        }
        self.cache_paths = {
            "checkpoints": Path(path_manager.model_paths["cache_path"] / "checkpoints"),
            "loras": Path(path_manager.model_paths["cache_path"] / "loras"),
            "inbox": Path(path_manager.model_paths["cache_path"] / "inbox"),
        }

        self.base_url = "https://civitai.com/api/v1/"
        self.headers = {"Content-Type": "application/json"}
        self.session = requests.Session()
        self.EXTENSIONS = [".pth", ".ckpt", ".bin", ".safetensors", ".gguf"]

        self.update_all_models()


    def get_file_from_name(self, model_type, model_name):
        for folder in self.model_dirs[model_type]:
            path = Path(folder) / model_name
            if path.is_file():
                return path
        return None

    def model_sha256(self, filename):
        print(f"Hashing {filename}")
        blksize = 1024 * 1024
        hash_sha256 = hashlib.sha256()
        try:
            with open(filename, 'rb') as f:
                for chunk in iter(lambda: f.read(blksize), b""):
                    hash_sha256.update(chunk)
            f.close()
            return hash_sha256.hexdigest().upper()
        except Exception as e:
            print(f"model_sha256(): Failed reading {filename}")
            print(f"Error: {e}")
            return None

    def get_models_by_path(self, model_type, path):
        data = None

        cache_path = Path(self.cache_paths[model_type]) / Path(Path(path).name)
        if cache_path.is_dir():
            # Give up
            return {}
        json_path = Path(cache_path).with_suffix(".json")

        if json_path.exists():
            try:
                with open(json_path) as f:
                    data = json.load(f)
            except:
                data = None
        if data is not None:
            return data

        if Path(path).suffix == ".merge":
            return {"baseModel": "Merge"}

        hash = self.model_sha256(path)
        url = f"{self.base_url}model-versions/by-hash/{hash}"
        try:
            response = requests.get(url, headers=self.headers)
            response.raise_for_status()
            data = response.json()
        except requests.exceptions.HTTPError as e:
            if response.status_code == 404:
                print(f"Warning: Could not find {Path(path).name} on civit.ai")
            elif response.status_code == 503:
                print("Error: Civit.ai Service Currently Unavailable")
            else:
                print(f"HTTP Error: {e}")
        except requests.exceptions.RequestException as e:
            print(f"Error: {e}")

        if data is None:
            # Create our own data
            data = {
                "files": [
                    {
                        "hashes": {
                            "SHA256": hash,
                        }
                    }
                ]
            }

        print(f"Update model data: {json_path}")
        with open(json_path, "w") as f:
            json.dump(data, f, indent=2)

        return data

    def get_keywords(self, model):
        keywords = model.get("trainedWords", [""])
        return keywords

    def get_model_base(self, model):
        return model.get("baseModel", "Unknown")

    def get_model_type(self, model):
        res = model.get("model", None)
        if res is not None:
            res = res.get("type", "Unknown")
        else:
            res = "Unknown"
        return res

    def get_image(self, model, path):
        from shared import settings

        if "baseModel" in model and model["baseModel"] == "Merge":
            return

        import imageio.v3 as iio
        if "model_preview" in settings.default_settings:
            opts = settings.default_settings["model_preview"].split(",")
            if "caption" in opts:
                caption=True
            if "nogifzoom" in opts:
                nogifzoom=True
            if "zoom" in opts:
                zoom=True
        else:
            caption=False
            nogifzoom=False
            zoom=False

        def make_thumbnail(image, text, zoom=False, caption=False):
            max = 166  # Max width or height

            if image is None:
                return None

            if zoom:
                oh = image.shape[0]
                ow = image.shape[1]
                scale = max / oh if oh > ow else max / ow
                image = cv2.resize(
                    image,
                    dsize=(int(ow * scale), int(oh * scale)),
                    interpolation=cv2.INTER_LANCZOS4,
                )

            if caption:
                font = cv2.FONT_HERSHEY_SIMPLEX
                fontScale = 0.35
                thickness = 1

                org = (3, 10)
                color = (25, 15, 11) # BGR
                image = cv2.putText(
                    image,
                    text,
                    org,
                    font,
                    fontScale,
                    color,
                    thickness*2,
                    cv2.LINE_AA
                )
                org = (3, 10)
                color = (255, 215, 185) # BGR
                image = cv2.putText(
                    image,
                    text,
                    org,
                    font,
                    fontScale,
                    color,
                    thickness,
                    cv2.LINE_AA
                )

            return image

        path = path.with_suffix(".jpeg")
        caption_text = f"{path.with_suffix('').name}"

        image_url = None
        for preview in model.get("images", [{}]):
            url = preview.get("url")
            format = preview.get("type")
            if url:
                print(f"Updating preview for {caption_text}.")
                image_url = url
                response = self.session.get(image_url)
                if response.status_code != 200:
                    print(f"WARNING: get_image() for {caption_text} - {response.status_code} : {response.reason}")
                    break
                image = np.asarray(bytearray(response.content), dtype="uint8") 
                out = make_thumbnail(cv2.imdecode(image, cv2.IMREAD_COLOR), caption_text, caption=caption, zoom=zoom)
                if out is not None:
                    out = cv2.imencode('.jpg', out)[1] 
                else:
                    out = response.content
                with open(path, "wb") as file:
                    file.write(out)

                if format == "video":
                    tmp_path = f"{path}.tmp"
                    shutil.move(path, tmp_path)
                    video = iio.imiter(tmp_path)
                    fps = iio.immeta(tmp_path)["fps"]
                    video_out = []
                    for i in video:
                        out = make_thumbnail(i, caption_text, caption=caption, zoom=not nogifzoom)
                        if out is None:
                            out = i
                        video_out.append(out)
                    iio.imwrite(
                        str(path.with_suffix(".gif")), video_out, fps=fps, loop=0
                    )
                    os.remove(tmp_path)
                break
        if image_url is None:
            shutil.copyfile("html/warning.jpeg", path)