| | from __future__ import annotations |
| |
|
| | import os |
| | import base64 |
| | import json |
| | import time |
| | import logging |
| | import folder_paths |
| | import glob |
| | import comfy.utils |
| | from aiohttp import web |
| | from PIL import Image |
| | from io import BytesIO |
| | from folder_paths import map_legacy, filter_files_extensions, filter_files_content_types |
| |
|
| |
|
| | class ModelFileManager: |
| | def __init__(self) -> None: |
| | self.cache: dict[str, tuple[list[dict], dict[str, float], float]] = {} |
| |
|
| | def get_cache(self, key: str, default=None) -> tuple[list[dict], dict[str, float], float] | None: |
| | return self.cache.get(key, default) |
| |
|
| | def set_cache(self, key: str, value: tuple[list[dict], dict[str, float], float]): |
| | self.cache[key] = value |
| |
|
| | def clear_cache(self): |
| | self.cache.clear() |
| |
|
| | def add_routes(self, routes): |
| | |
| | @routes.get("/experiment/models") |
| | async def get_model_folders(request): |
| | model_types = list(folder_paths.folder_names_and_paths.keys()) |
| | folder_black_list = ["configs", "custom_nodes"] |
| | output_folders: list[dict] = [] |
| | for folder in model_types: |
| | if folder in folder_black_list: |
| | continue |
| | output_folders.append({"name": folder, "folders": folder_paths.get_folder_paths(folder)}) |
| | return web.json_response(output_folders) |
| |
|
| | |
| | @routes.get("/experiment/models/{folder}") |
| | async def get_all_models(request): |
| | folder = request.match_info.get("folder", None) |
| | if not folder in folder_paths.folder_names_and_paths: |
| | return web.Response(status=404) |
| | files = self.get_model_file_list(folder) |
| | return web.json_response(files) |
| |
|
| | @routes.get("/experiment/models/preview/{folder}/{path_index}/{filename:.*}") |
| | async def get_model_preview(request): |
| | folder_name = request.match_info.get("folder", None) |
| | path_index = int(request.match_info.get("path_index", None)) |
| | filename = request.match_info.get("filename", None) |
| |
|
| | if not folder_name in folder_paths.folder_names_and_paths: |
| | return web.Response(status=404) |
| |
|
| | folders = folder_paths.folder_names_and_paths[folder_name] |
| | folder = folders[0][path_index] |
| | full_filename = os.path.join(folder, filename) |
| |
|
| | previews = self.get_model_previews(full_filename) |
| | default_preview = previews[0] if len(previews) > 0 else None |
| | if default_preview is None or (isinstance(default_preview, str) and not os.path.isfile(default_preview)): |
| | return web.Response(status=404) |
| |
|
| | try: |
| | with Image.open(default_preview) as img: |
| | img_bytes = BytesIO() |
| | img.save(img_bytes, format="WEBP") |
| | img_bytes.seek(0) |
| | return web.Response(body=img_bytes.getvalue(), content_type="image/webp") |
| | except: |
| | return web.Response(status=404) |
| |
|
| | def get_model_file_list(self, folder_name: str): |
| | folder_name = map_legacy(folder_name) |
| | folders = folder_paths.folder_names_and_paths[folder_name] |
| | output_list: list[dict] = [] |
| |
|
| | for index, folder in enumerate(folders[0]): |
| | if not os.path.isdir(folder): |
| | continue |
| | out = self.cache_model_file_list_(folder) |
| | if out is None: |
| | out = self.recursive_search_models_(folder, index) |
| | self.set_cache(folder, out) |
| | output_list.extend(out[0]) |
| |
|
| | return output_list |
| |
|
| | def cache_model_file_list_(self, folder: str): |
| | model_file_list_cache = self.get_cache(folder) |
| |
|
| | if model_file_list_cache is None: |
| | return None |
| | if not os.path.isdir(folder): |
| | return None |
| | if os.path.getmtime(folder) != model_file_list_cache[1]: |
| | return None |
| | for x in model_file_list_cache[1]: |
| | time_modified = model_file_list_cache[1][x] |
| | folder = x |
| | if os.path.getmtime(folder) != time_modified: |
| | return None |
| |
|
| | return model_file_list_cache |
| |
|
| | def recursive_search_models_(self, directory: str, pathIndex: int) -> tuple[list[str], dict[str, float], float]: |
| | if not os.path.isdir(directory): |
| | return [], {}, time.perf_counter() |
| |
|
| | excluded_dir_names = [".git"] |
| | |
| | include_hidden_files = False |
| |
|
| | result: list[str] = [] |
| | dirs: dict[str, float] = {} |
| |
|
| | for dirpath, subdirs, filenames in os.walk(directory, followlinks=True, topdown=True): |
| | subdirs[:] = [d for d in subdirs if d not in excluded_dir_names] |
| | if not include_hidden_files: |
| | subdirs[:] = [d for d in subdirs if not d.startswith(".")] |
| | filenames = [f for f in filenames if not f.startswith(".")] |
| |
|
| | filenames = filter_files_extensions(filenames, folder_paths.supported_pt_extensions) |
| |
|
| | for file_name in filenames: |
| | try: |
| | full_path = os.path.join(dirpath, file_name) |
| | relative_path = os.path.relpath(full_path, directory) |
| |
|
| | |
| | file_info = { |
| | "name": relative_path, |
| | "pathIndex": pathIndex, |
| | "modified": os.path.getmtime(full_path), |
| | "created": os.path.getctime(full_path), |
| | "size": os.path.getsize(full_path) |
| | } |
| | result.append(file_info) |
| |
|
| | except Exception as e: |
| | logging.warning(f"Warning: Unable to access {file_name}. Error: {e}. Skipping this file.") |
| | continue |
| |
|
| | for d in subdirs: |
| | path: str = os.path.join(dirpath, d) |
| | try: |
| | dirs[path] = os.path.getmtime(path) |
| | except FileNotFoundError: |
| | logging.warning(f"Warning: Unable to access {path}. Skipping this path.") |
| | continue |
| |
|
| | return result, dirs, time.perf_counter() |
| |
|
| | def get_model_previews(self, filepath: str) -> list[str | BytesIO]: |
| | dirname = os.path.dirname(filepath) |
| |
|
| | if not os.path.exists(dirname): |
| | return [] |
| |
|
| | basename = os.path.splitext(filepath)[0] |
| | match_files = glob.glob(f"{basename}.*", recursive=False) |
| | image_files = filter_files_content_types(match_files, "image") |
| | safetensors_file = next(filter(lambda x: x.endswith(".safetensors"), match_files), None) |
| | safetensors_metadata = {} |
| |
|
| | result: list[str | BytesIO] = [] |
| |
|
| | for filename in image_files: |
| | _basename = os.path.splitext(filename)[0] |
| | if _basename == basename: |
| | result.append(filename) |
| | if _basename == f"{basename}.preview": |
| | result.append(filename) |
| |
|
| | if safetensors_file: |
| | safetensors_filepath = os.path.join(dirname, safetensors_file) |
| | header = comfy.utils.safetensors_header(safetensors_filepath, max_size=8*1024*1024) |
| | if header: |
| | safetensors_metadata = json.loads(header) |
| | safetensors_images = safetensors_metadata.get("__metadata__", {}).get("ssmd_cover_images", None) |
| | if safetensors_images: |
| | safetensors_images = json.loads(safetensors_images) |
| | for image in safetensors_images: |
| | result.append(BytesIO(base64.b64decode(image))) |
| |
|
| | return result |
| |
|
| | def __exit__(self, exc_type, exc_value, traceback): |
| | self.clear_cache() |
| |
|