Spaces:
Runtime error
Runtime error
import requests | |
import hashlib | |
import shutil | |
import os | |
import cv2 | |
import json | |
import threading | |
import time | |
from pathlib import Path | |
import numpy as np | |
class Models: | |
civit_workers = [] | |
def civit_update_worker(self, model_type, folder_paths): | |
from shared import path_manager | |
try: | |
import imageio.v3 | |
except: | |
# Skip updates if we are missing imageio | |
print(f"Can't find imageio.v3 module: Skip CivitAI update") | |
return | |
if str(model_type) in self.civit_workers: | |
# Already working on this folder | |
print(f"Skip CivitAI check. Update for {model_type} already running.") | |
return | |
if not Path(self.cache_paths[model_type]).is_dir(): | |
print(f"WARNING: Can't find {self.cache_paths[model_type]} Will not update thumbnails.") | |
return | |
self.civit_workers.append(str(model_type)) | |
self.ready[model_type] = False | |
updated = 0 | |
# Quick list | |
self.names[model_type] = [] | |
for folder in folder_paths: | |
for path in folder.rglob("*"): | |
if path.suffix.lower() in self.EXTENSIONS: | |
# Add to model names | |
self.names[model_type].append(str(path.relative_to(folder))) | |
# Return a sorted list, prepend names with 0 if they are in a folder or 1 | |
# if it is a plain file. This will sort folders above files in the dropdown | |
self.names[model_type] = sorted( | |
self.names[model_type], | |
key=lambda x: ( | |
f"0{x.casefold()}" | |
if not str(Path(x).parent) == "." | |
else f"1{x.casefold()}" | |
), | |
) | |
self.ready[model_type] = True | |
if self.offline: | |
self.civit_workers.remove(str(model_type)) | |
return | |
if model_type == "inbox" and self.names["inbox"]: | |
checkpoints = path_manager.model_paths["modelfile_path"] | |
checkpoints = checkpoints[0] if isinstance(checkpoints, list) else checkpoints | |
loras = path_manager.model_paths["lorafile_path"] | |
loras = loras[0] if isinstance(loras, list) else loras | |
folders = { | |
"LORA": (loras, self.cache_paths["loras"]), | |
"LoCon": (loras, self.cache_paths["loras"]), | |
"Checkpoint": (checkpoints, self.cache_paths["checkpoints"]), | |
} | |
# Go though and check previews | |
for folder in folder_paths: | |
for path in folder.rglob("*"): | |
if path.suffix.lower() in self.EXTENSIONS: | |
# get file name, add cache path change suffix | |
cache_file = Path(self.cache_paths[model_type] / path.name) | |
models = self.get_models_by_path(model_type, str(path)) | |
suffixes = [".jpeg", ".jpg", ".png", ".gif"] | |
has_preview = False | |
for suffix in suffixes: | |
thumbcheck = cache_file.with_suffix(suffix) | |
if Path(thumbcheck).is_file(): | |
has_preview = True | |
break | |
if not has_preview: | |
#print(f"Downloading model thumbnail for {Path(path).name} ({self.get_model_base(models)} - {self.get_model_type(models)})") | |
self.get_image(models, thumbcheck) | |
updated += 1 | |
time.sleep(1) | |
txtcheck = cache_file.with_suffix(".txt") | |
if model_type == "loras" and not txtcheck.exists(): | |
print(f"Get LoRA keywords for {Path(path).name} ({self.get_model_base(models)} - {self.get_model_type(models)})") | |
keywords = self.get_keywords(models) | |
with open(txtcheck, "w") as f: | |
f.write(", ".join(keywords)) | |
updated += 1 | |
if model_type == "inbox": | |
name = str(path.relative_to(folder_paths[0])) # FIXME handle if inbox is a list | |
model = self.get_models_by_path("inbox", name) | |
filename = self.get_file_from_name("inbox", name) | |
if model is None: | |
continue | |
baseModel = self.get_model_base(model) | |
folder, cache = folders.get(self.get_model_type(model), [None, None]) | |
if folder is None or baseModel is None: | |
print(f"Skipping {name} not sure what {self.get_model_type(model)} is.") | |
continue | |
# Move model to correct folder | |
dest = Path(folder) / baseModel | |
if not dest.exists(): | |
dest.mkdir(parents=True, exist_ok=True) | |
shutil.move(Path(filename), Path(dest) / name) | |
# Move cache-files | |
cache_file = Path(self.cache_paths[model_type] / name) | |
suffixes = [".json", ".txt", ".jpeg", ".jpg", ".png", ".gif"] | |
for suffix in suffixes: | |
cachefile = cache_file.with_suffix(suffix) | |
if cachefile.is_file(): | |
shutil.move(cachefile, Path(cache) / cachefile.name) | |
print(f"Moved {name} to {dest}") | |
if updated > 0: | |
print(f"CivitAI update for {model_type} done.") | |
self.civit_workers.remove(str(model_type)) | |
def get_names(self, model_type): | |
while not self.ready[model_type]: | |
# Wait until we have read all the filenames | |
time.sleep(0.2) | |
return self.names[model_type] | |
def get_file(self, model_type, name): | |
# Search the folders for the model | |
for folder in self.model_dirs[model_type]: | |
file = Path(folder) / name | |
if file.is_file(): | |
return file | |
return None | |
def update_all_models(self): | |
for model_type in ["checkpoints", "loras", "inbox"]: | |
threading.Thread( | |
target=self.civit_update_worker, | |
args=( | |
model_type, | |
self.model_dirs[model_type], | |
), | |
daemon=True, | |
).start() | |
def __init__(self, offline=False): | |
from shared import path_manager, settings | |
self.offline = offline | |
self.ready = { | |
"checkpoints": False, | |
"loras": False, | |
"inbox": False, | |
} | |
self.names = { | |
"checkpoints": [], | |
"loras": [], | |
"inbox": [], | |
} | |
checkpoints = path_manager.model_paths["modelfile_path"] | |
checkpoints = checkpoints if isinstance(checkpoints, list) else [checkpoints] | |
loras = path_manager.model_paths["lorafile_path"] | |
loras = loras if isinstance(loras, list) else [loras] | |
inbox = path_manager.model_paths["inbox_path"] | |
inbox = inbox if isinstance(inbox, list) else [inbox] | |
self.model_dirs = { | |
"checkpoints": checkpoints, | |
"loras": loras, | |
"inbox": inbox, | |
} | |
self.cache_paths = { | |
"checkpoints": Path(path_manager.model_paths["cache_path"] / "checkpoints"), | |
"loras": Path(path_manager.model_paths["cache_path"] / "loras"), | |
"inbox": Path(path_manager.model_paths["cache_path"] / "inbox"), | |
} | |
self.base_url = "https://civitai.com/api/v1/" | |
self.headers = {"Content-Type": "application/json"} | |
self.session = requests.Session() | |
self.EXTENSIONS = [".pth", ".ckpt", ".bin", ".safetensors", ".gguf"] | |
self.update_all_models() | |
def get_file_from_name(self, model_type, model_name): | |
for folder in self.model_dirs[model_type]: | |
path = Path(folder) / model_name | |
if path.is_file(): | |
return path | |
return None | |
def model_sha256(self, filename): | |
print(f"Hashing {filename}") | |
blksize = 1024 * 1024 | |
hash_sha256 = hashlib.sha256() | |
try: | |
with open(filename, 'rb') as f: | |
for chunk in iter(lambda: f.read(blksize), b""): | |
hash_sha256.update(chunk) | |
f.close() | |
return hash_sha256.hexdigest().upper() | |
except Exception as e: | |
print(f"model_sha256(): Failed reading {filename}") | |
print(f"Error: {e}") | |
return None | |
def get_models_by_path(self, model_type, path): | |
data = None | |
cache_path = Path(self.cache_paths[model_type]) / Path(Path(path).name) | |
if cache_path.is_dir(): | |
# Give up | |
return {} | |
json_path = Path(cache_path).with_suffix(".json") | |
if json_path.exists(): | |
try: | |
with open(json_path) as f: | |
data = json.load(f) | |
except: | |
data = None | |
if data is not None: | |
return data | |
if Path(path).suffix == ".merge": | |
return {"baseModel": "Merge"} | |
hash = self.model_sha256(path) | |
url = f"{self.base_url}model-versions/by-hash/{hash}" | |
try: | |
response = requests.get(url, headers=self.headers) | |
response.raise_for_status() | |
data = response.json() | |
except requests.exceptions.HTTPError as e: | |
if response.status_code == 404: | |
print(f"Warning: Could not find {Path(path).name} on civit.ai") | |
elif response.status_code == 503: | |
print("Error: Civit.ai Service Currently Unavailable") | |
else: | |
print(f"HTTP Error: {e}") | |
except requests.exceptions.RequestException as e: | |
print(f"Error: {e}") | |
if data is None: | |
# Create our own data | |
data = { | |
"files": [ | |
{ | |
"hashes": { | |
"SHA256": hash, | |
} | |
} | |
] | |
} | |
print(f"Update model data: {json_path}") | |
with open(json_path, "w") as f: | |
json.dump(data, f, indent=2) | |
return data | |
def get_keywords(self, model): | |
keywords = model.get("trainedWords", [""]) | |
return keywords | |
def get_model_base(self, model): | |
return model.get("baseModel", "Unknown") | |
def get_model_type(self, model): | |
res = model.get("model", None) | |
if res is not None: | |
res = res.get("type", "Unknown") | |
else: | |
res = "Unknown" | |
return res | |
def get_image(self, model, path): | |
from shared import settings | |
if "baseModel" in model and model["baseModel"] == "Merge": | |
return | |
import imageio.v3 as iio | |
if "model_preview" in settings.default_settings: | |
opts = settings.default_settings["model_preview"].split(",") | |
if "caption" in opts: | |
caption=True | |
if "nogifzoom" in opts: | |
nogifzoom=True | |
if "zoom" in opts: | |
zoom=True | |
else: | |
caption=False | |
nogifzoom=False | |
zoom=False | |
def make_thumbnail(image, text, zoom=False, caption=False): | |
max = 166 # Max width or height | |
if image is None: | |
return None | |
if zoom: | |
oh = image.shape[0] | |
ow = image.shape[1] | |
scale = max / oh if oh > ow else max / ow | |
image = cv2.resize( | |
image, | |
dsize=(int(ow * scale), int(oh * scale)), | |
interpolation=cv2.INTER_LANCZOS4, | |
) | |
if caption: | |
font = cv2.FONT_HERSHEY_SIMPLEX | |
fontScale = 0.35 | |
thickness = 1 | |
org = (3, 10) | |
color = (25, 15, 11) # BGR | |
image = cv2.putText( | |
image, | |
text, | |
org, | |
font, | |
fontScale, | |
color, | |
thickness*2, | |
cv2.LINE_AA | |
) | |
org = (3, 10) | |
color = (255, 215, 185) # BGR | |
image = cv2.putText( | |
image, | |
text, | |
org, | |
font, | |
fontScale, | |
color, | |
thickness, | |
cv2.LINE_AA | |
) | |
return image | |
path = path.with_suffix(".jpeg") | |
caption_text = f"{path.with_suffix('').name}" | |
image_url = None | |
for preview in model.get("images", [{}]): | |
url = preview.get("url") | |
format = preview.get("type") | |
if url: | |
print(f"Updating preview for {caption_text}.") | |
image_url = url | |
response = self.session.get(image_url) | |
if response.status_code != 200: | |
print(f"WARNING: get_image() for {caption_text} - {response.status_code} : {response.reason}") | |
break | |
image = np.asarray(bytearray(response.content), dtype="uint8") | |
out = make_thumbnail(cv2.imdecode(image, cv2.IMREAD_COLOR), caption_text, caption=caption, zoom=zoom) | |
if out is not None: | |
out = cv2.imencode('.jpg', out)[1] | |
else: | |
out = response.content | |
with open(path, "wb") as file: | |
file.write(out) | |
if format == "video": | |
tmp_path = f"{path}.tmp" | |
shutil.move(path, tmp_path) | |
video = iio.imiter(tmp_path) | |
fps = iio.immeta(tmp_path)["fps"] | |
video_out = [] | |
for i in video: | |
out = make_thumbnail(i, caption_text, caption=caption, zoom=not nogifzoom) | |
if out is None: | |
out = i | |
video_out.append(out) | |
iio.imwrite( | |
str(path.with_suffix(".gif")), video_out, fps=fps, loop=0 | |
) | |
os.remove(tmp_path) | |
break | |
if image_url is None: | |
shutil.copyfile("html/warning.jpeg", path) | |