RuinedFooocus / modules /model_handler.py
malizec's picture
Upload folder using huggingface_hub
2de3774 verified
import requests
import hashlib
import shutil
import os
import cv2
import json
import threading
import time
from pathlib import Path
import numpy as np
class Models:
civit_workers = []
def civit_update_worker(self, model_type, folder_paths):
from shared import path_manager
try:
import imageio.v3
except:
# Skip updates if we are missing imageio
print(f"Can't find imageio.v3 module: Skip CivitAI update")
return
if str(model_type) in self.civit_workers:
# Already working on this folder
print(f"Skip CivitAI check. Update for {model_type} already running.")
return
if not Path(self.cache_paths[model_type]).is_dir():
print(f"WARNING: Can't find {self.cache_paths[model_type]} Will not update thumbnails.")
return
self.civit_workers.append(str(model_type))
self.ready[model_type] = False
updated = 0
# Quick list
self.names[model_type] = []
for folder in folder_paths:
for path in folder.rglob("*"):
if path.suffix.lower() in self.EXTENSIONS:
# Add to model names
self.names[model_type].append(str(path.relative_to(folder)))
# Return a sorted list, prepend names with 0 if they are in a folder or 1
# if it is a plain file. This will sort folders above files in the dropdown
self.names[model_type] = sorted(
self.names[model_type],
key=lambda x: (
f"0{x.casefold()}"
if not str(Path(x).parent) == "."
else f"1{x.casefold()}"
),
)
self.ready[model_type] = True
if self.offline:
self.civit_workers.remove(str(model_type))
return
if model_type == "inbox" and self.names["inbox"]:
checkpoints = path_manager.model_paths["modelfile_path"]
checkpoints = checkpoints[0] if isinstance(checkpoints, list) else checkpoints
loras = path_manager.model_paths["lorafile_path"]
loras = loras[0] if isinstance(loras, list) else loras
folders = {
"LORA": (loras, self.cache_paths["loras"]),
"LoCon": (loras, self.cache_paths["loras"]),
"Checkpoint": (checkpoints, self.cache_paths["checkpoints"]),
}
# Go though and check previews
for folder in folder_paths:
for path in folder.rglob("*"):
if path.suffix.lower() in self.EXTENSIONS:
# get file name, add cache path change suffix
cache_file = Path(self.cache_paths[model_type] / path.name)
models = self.get_models_by_path(model_type, str(path))
suffixes = [".jpeg", ".jpg", ".png", ".gif"]
has_preview = False
for suffix in suffixes:
thumbcheck = cache_file.with_suffix(suffix)
if Path(thumbcheck).is_file():
has_preview = True
break
if not has_preview:
#print(f"Downloading model thumbnail for {Path(path).name} ({self.get_model_base(models)} - {self.get_model_type(models)})")
self.get_image(models, thumbcheck)
updated += 1
time.sleep(1)
txtcheck = cache_file.with_suffix(".txt")
if model_type == "loras" and not txtcheck.exists():
print(f"Get LoRA keywords for {Path(path).name} ({self.get_model_base(models)} - {self.get_model_type(models)})")
keywords = self.get_keywords(models)
with open(txtcheck, "w") as f:
f.write(", ".join(keywords))
updated += 1
if model_type == "inbox":
name = str(path.relative_to(folder_paths[0])) # FIXME handle if inbox is a list
model = self.get_models_by_path("inbox", name)
filename = self.get_file_from_name("inbox", name)
if model is None:
continue
baseModel = self.get_model_base(model)
folder, cache = folders.get(self.get_model_type(model), [None, None])
if folder is None or baseModel is None:
print(f"Skipping {name} not sure what {self.get_model_type(model)} is.")
continue
# Move model to correct folder
dest = Path(folder) / baseModel
if not dest.exists():
dest.mkdir(parents=True, exist_ok=True)
shutil.move(Path(filename), Path(dest) / name)
# Move cache-files
cache_file = Path(self.cache_paths[model_type] / name)
suffixes = [".json", ".txt", ".jpeg", ".jpg", ".png", ".gif"]
for suffix in suffixes:
cachefile = cache_file.with_suffix(suffix)
if cachefile.is_file():
shutil.move(cachefile, Path(cache) / cachefile.name)
print(f"Moved {name} to {dest}")
if updated > 0:
print(f"CivitAI update for {model_type} done.")
self.civit_workers.remove(str(model_type))
def get_names(self, model_type):
while not self.ready[model_type]:
# Wait until we have read all the filenames
time.sleep(0.2)
return self.names[model_type]
def get_file(self, model_type, name):
# Search the folders for the model
for folder in self.model_dirs[model_type]:
file = Path(folder) / name
if file.is_file():
return file
return None
def update_all_models(self):
for model_type in ["checkpoints", "loras", "inbox"]:
threading.Thread(
target=self.civit_update_worker,
args=(
model_type,
self.model_dirs[model_type],
),
daemon=True,
).start()
def __init__(self, offline=False):
from shared import path_manager, settings
self.offline = offline
self.ready = {
"checkpoints": False,
"loras": False,
"inbox": False,
}
self.names = {
"checkpoints": [],
"loras": [],
"inbox": [],
}
checkpoints = path_manager.model_paths["modelfile_path"]
checkpoints = checkpoints if isinstance(checkpoints, list) else [checkpoints]
loras = path_manager.model_paths["lorafile_path"]
loras = loras if isinstance(loras, list) else [loras]
inbox = path_manager.model_paths["inbox_path"]
inbox = inbox if isinstance(inbox, list) else [inbox]
self.model_dirs = {
"checkpoints": checkpoints,
"loras": loras,
"inbox": inbox,
}
self.cache_paths = {
"checkpoints": Path(path_manager.model_paths["cache_path"] / "checkpoints"),
"loras": Path(path_manager.model_paths["cache_path"] / "loras"),
"inbox": Path(path_manager.model_paths["cache_path"] / "inbox"),
}
self.base_url = "https://civitai.com/api/v1/"
self.headers = {"Content-Type": "application/json"}
self.session = requests.Session()
self.EXTENSIONS = [".pth", ".ckpt", ".bin", ".safetensors", ".gguf"]
self.update_all_models()
def get_file_from_name(self, model_type, model_name):
for folder in self.model_dirs[model_type]:
path = Path(folder) / model_name
if path.is_file():
return path
return None
def model_sha256(self, filename):
print(f"Hashing {filename}")
blksize = 1024 * 1024
hash_sha256 = hashlib.sha256()
try:
with open(filename, 'rb') as f:
for chunk in iter(lambda: f.read(blksize), b""):
hash_sha256.update(chunk)
f.close()
return hash_sha256.hexdigest().upper()
except Exception as e:
print(f"model_sha256(): Failed reading {filename}")
print(f"Error: {e}")
return None
def get_models_by_path(self, model_type, path):
data = None
cache_path = Path(self.cache_paths[model_type]) / Path(Path(path).name)
if cache_path.is_dir():
# Give up
return {}
json_path = Path(cache_path).with_suffix(".json")
if json_path.exists():
try:
with open(json_path) as f:
data = json.load(f)
except:
data = None
if data is not None:
return data
if Path(path).suffix == ".merge":
return {"baseModel": "Merge"}
hash = self.model_sha256(path)
url = f"{self.base_url}model-versions/by-hash/{hash}"
try:
response = requests.get(url, headers=self.headers)
response.raise_for_status()
data = response.json()
except requests.exceptions.HTTPError as e:
if response.status_code == 404:
print(f"Warning: Could not find {Path(path).name} on civit.ai")
elif response.status_code == 503:
print("Error: Civit.ai Service Currently Unavailable")
else:
print(f"HTTP Error: {e}")
except requests.exceptions.RequestException as e:
print(f"Error: {e}")
if data is None:
# Create our own data
data = {
"files": [
{
"hashes": {
"SHA256": hash,
}
}
]
}
print(f"Update model data: {json_path}")
with open(json_path, "w") as f:
json.dump(data, f, indent=2)
return data
def get_keywords(self, model):
keywords = model.get("trainedWords", [""])
return keywords
def get_model_base(self, model):
return model.get("baseModel", "Unknown")
def get_model_type(self, model):
res = model.get("model", None)
if res is not None:
res = res.get("type", "Unknown")
else:
res = "Unknown"
return res
def get_image(self, model, path):
from shared import settings
if "baseModel" in model and model["baseModel"] == "Merge":
return
import imageio.v3 as iio
if "model_preview" in settings.default_settings:
opts = settings.default_settings["model_preview"].split(",")
if "caption" in opts:
caption=True
if "nogifzoom" in opts:
nogifzoom=True
if "zoom" in opts:
zoom=True
else:
caption=False
nogifzoom=False
zoom=False
def make_thumbnail(image, text, zoom=False, caption=False):
max = 166 # Max width or height
if image is None:
return None
if zoom:
oh = image.shape[0]
ow = image.shape[1]
scale = max / oh if oh > ow else max / ow
image = cv2.resize(
image,
dsize=(int(ow * scale), int(oh * scale)),
interpolation=cv2.INTER_LANCZOS4,
)
if caption:
font = cv2.FONT_HERSHEY_SIMPLEX
fontScale = 0.35
thickness = 1
org = (3, 10)
color = (25, 15, 11) # BGR
image = cv2.putText(
image,
text,
org,
font,
fontScale,
color,
thickness*2,
cv2.LINE_AA
)
org = (3, 10)
color = (255, 215, 185) # BGR
image = cv2.putText(
image,
text,
org,
font,
fontScale,
color,
thickness,
cv2.LINE_AA
)
return image
path = path.with_suffix(".jpeg")
caption_text = f"{path.with_suffix('').name}"
image_url = None
for preview in model.get("images", [{}]):
url = preview.get("url")
format = preview.get("type")
if url:
print(f"Updating preview for {caption_text}.")
image_url = url
response = self.session.get(image_url)
if response.status_code != 200:
print(f"WARNING: get_image() for {caption_text} - {response.status_code} : {response.reason}")
break
image = np.asarray(bytearray(response.content), dtype="uint8")
out = make_thumbnail(cv2.imdecode(image, cv2.IMREAD_COLOR), caption_text, caption=caption, zoom=zoom)
if out is not None:
out = cv2.imencode('.jpg', out)[1]
else:
out = response.content
with open(path, "wb") as file:
file.write(out)
if format == "video":
tmp_path = f"{path}.tmp"
shutil.move(path, tmp_path)
video = iio.imiter(tmp_path)
fps = iio.immeta(tmp_path)["fps"]
video_out = []
for i in video:
out = make_thumbnail(i, caption_text, caption=caption, zoom=not nogifzoom)
if out is None:
out = i
video_out.append(out)
iio.imwrite(
str(path.with_suffix(".gif")), video_out, fps=fps, loop=0
)
os.remove(tmp_path)
break
if image_url is None:
shutil.copyfile("html/warning.jpeg", path)