Spaces:
Runtime error
Runtime error
| from pathlib import Path | |
| import json | |
| import os | |
| try: | |
| # This can fail during the first run | |
| import requests | |
| from tqdm import tqdm | |
| except: | |
| pass | |
| class PathManager: | |
| DEFAULT_PATHS = { | |
| "path_checkpoints": ["../models/checkpoints/"], | |
| "path_diffusers": "../models/diffusers/", | |
| "path_diffusers_cache": "../models/diffusers_cache/", | |
| "path_loras": ["../models/loras/"], | |
| "path_controlnet": "../models/controlnet/", | |
| "path_vae_approx": "../models/vae_approx/", | |
| "path_vae": "../models/vae/", | |
| "path_preview": "../outputs/preview.jpg", | |
| "path_faceswap": "../models/faceswap/", | |
| "path_upscalers": "../models/upscale_models", | |
| "path_outputs": "../outputs/", | |
| "path_clip": "../models/clip/", | |
| "path_clip_vision": "../models/clip_vision/", | |
| "path_cache": "../cache/", | |
| "path_llm": "../models/llm", | |
| "path_inbox": "../models/inbox", | |
| } | |
| EXTENSIONS = [".pth", ".ckpt", ".bin", ".safetensors", ".gguf", ".merge"] | |
| # Add a dictionary to store file download information | |
| DOWNLOADABLE_FILES = {} | |
| name = None | |
| settings_path = None | |
| paths = None | |
| def __init__(self): | |
| from argparser import args | |
| self.name = args.settings | |
| self.set_settings_path(args.settings) | |
| self.paths = self.load_paths() | |
| self.model_paths = self.get_model_paths() | |
| self.upscaler_filenames = self.get_model_filenames( | |
| self.model_paths["upscaler_path"] | |
| ) | |
| pathdb_folder = "modules/pathdb" | |
| files = os.listdir(pathdb_folder) | |
| for file in files: | |
| # Check if the file has a .json extension | |
| if file.endswith('.json'): | |
| file_path = os.path.join(pathdb_folder, file) | |
| try: | |
| # Open and read the JSON file | |
| with open(file_path, 'r') as json_file: | |
| data = json.load(json_file) | |
| self.DOWNLOADABLE_FILES.update(data) | |
| except Exception as e: | |
| print(f"Error reading {file}: {e}") | |
| def set_settings_path(self, subfolder=None): | |
| self.subfolder = subfolder | |
| if self.subfolder in [None, "", "default"]: | |
| path = Path("settings/paths.json") | |
| else: | |
| path = Path(f"settings/{self.subfolder}/paths.json") | |
| if not path.parent.exists(): | |
| path.parent.mkdir() | |
| self.settings_path = path | |
| def load_paths(self): | |
| paths = self.DEFAULT_PATHS.copy() | |
| if self.settings_path.exists(): | |
| with self.settings_path.open() as f: | |
| paths.update(json.load(f)) | |
| for key in self.DEFAULT_PATHS: | |
| if key not in paths: | |
| paths[key] = self.DEFAULT_PATHS[key] | |
| # Fix paths | |
| for key in ['path_checkpoints', 'path_loras']: | |
| if key in paths and not isinstance(paths[key], list): # Some folders should be lists | |
| paths[key] = [paths[key]] | |
| with self.settings_path.open("w") as f: | |
| json.dump(paths, f, indent=2) | |
| return paths | |
| def save_paths(self): | |
| paths = self.paths | |
| # for key in newpaths: | |
| # if key not in paths: | |
| # paths[key] = newpaths[key] | |
| with self.settings_path.open("w") as f: | |
| json.dump(paths, f, indent=2) | |
| return paths | |
| def get_model_paths(self): | |
| return { | |
| "modelfile_path": self.get_abspath_folder(self.paths["path_checkpoints"]), | |
| "diffusers_path": self.get_abspath_folder(self.paths["path_diffusers"]), | |
| "diffusers_cache_path": self.get_abspath_folder( | |
| self.paths["path_diffusers_cache"] | |
| ), | |
| "lorafile_path": self.get_abspath_folder(self.paths["path_loras"]), | |
| "controlnet_path": self.get_abspath_folder(self.paths["path_controlnet"]), | |
| "vae_approx_path": self.get_abspath_folder(self.paths["path_vae_approx"]), | |
| "vae_path": self.get_abspath_folder(self.paths["path_vae"]), | |
| "temp_outputs_path": self.get_abspath_folder(self.paths["path_outputs"]), | |
| "temp_preview_path": self.get_abspath(self.paths["path_preview"]), | |
| "faceswap_path": self.get_abspath_folder(self.paths["path_faceswap"]), | |
| "upscaler_path": self.get_abspath_folder(self.paths["path_upscalers"]), | |
| "clip_path": self.get_abspath_folder(self.paths["path_clip"]), | |
| "clip_vision_path": self.get_abspath_folder(self.paths["path_clip_vision"]), | |
| "cache_path": self.get_abspath_folder(self.paths["path_cache"]), | |
| "llm_path": self.get_abspath_folder(self.paths["path_llm"]), | |
| "inbox_path": self.get_abspath_folder(self.paths["path_inbox"]), | |
| } | |
| def get_abspath_folder(self, path): | |
| if isinstance(path, list): | |
| rc = [] | |
| for folder in path: | |
| rc.append(self.get_abspath(folder)) | |
| else: | |
| rc = self.get_abspath(path) | |
| if not rc.exists(): | |
| rc.mkdir(parents=True, exist_ok=True) | |
| return rc | |
| def get_abspath(self, path): | |
| return Path(path) if Path(path).is_absolute() else Path(__file__).parent / path | |
| def get_model_filenames(self, folder_path, cache=None, isLora=False): | |
| folder_path = Path(folder_path) | |
| if not folder_path.is_dir(): | |
| raise ValueError(f"{folder_path} is not a valid directory.") | |
| filenames = [] | |
| for path in folder_path.rglob("*"): | |
| if path.suffix.lower() in self.EXTENSIONS: | |
| if isLora: | |
| txtcheck = path.with_suffix(".txt") | |
| if txtcheck.exists(): | |
| fstats = txtcheck.stat() | |
| if fstats.st_size > 0: | |
| path = path.with_suffix(f"{path.suffix}") | |
| filenames.append(str(path.relative_to(folder_path))) | |
| # Return a sorted list, prepend names with 0 if they are in a folder or 1 | |
| # if it is a plain file. This will sort folders above files in the dropdown | |
| return sorted( | |
| filenames, | |
| key=lambda x: ( | |
| f"0{x.casefold()}" | |
| if not str(Path(x).parent) == "." | |
| else f"1{x.casefold()}" | |
| ), | |
| ) | |
| def get_diffusers_filenames(self, folder_path, cache=None, isLora=False): | |
| folder_path = Path(folder_path) | |
| if not folder_path.is_dir(): | |
| raise ValueError(f"{folder_path} is not a valid directory.") | |
| filenames = [] | |
| for path in folder_path.glob("*/*"): | |
| filenames.append(f"🤗:{path.relative_to(folder_path)}") | |
| return sorted( | |
| filenames, | |
| key=lambda x: ( | |
| f"0{x.casefold()}" | |
| if not str(Path(x).parent) == "." | |
| else f"1{x.casefold()}" | |
| ), | |
| ) | |
| def get_file_path(self, file_key, default=None): | |
| """ | |
| Get the path for a file, downloading it if it doesn't exist. | |
| """ | |
| if file_key not in self.DOWNLOADABLE_FILES: | |
| return default | |
| file_info = self.DOWNLOADABLE_FILES[file_key] | |
| folder = self.paths[file_info["path"]] | |
| if isinstance(folder, list): # folder might be a list of folders | |
| folder = folder[0] # ...select the first one | |
| file_path = ( | |
| self.get_abspath(folder) / file_info["filename"] | |
| ) | |
| if not file_path.exists(): | |
| self.download_file(file_key) | |
| return file_path | |
| def get_folder_file_path(self, folder, filename, default=None): | |
| return self.get_file_path(f"{folder}/{filename}", default=default) | |
| def get_folder_list(self, folder): | |
| result = [] | |
| for file in self.DOWNLOADABLE_FILES: | |
| if file.startswith(f"{folder}/"): | |
| result.append(self.DOWNLOADABLE_FILES[file]["filename"]) | |
| # FIXME: also list files already in folder | |
| return result | |
| def download_file(self, file_key): | |
| """ | |
| Download a file if it doesn't exist. | |
| """ | |
| file_info = self.DOWNLOADABLE_FILES[file_key] | |
| folder = self.paths[file_info["path"]] | |
| if isinstance(folder, list): # folder might be a list of folders | |
| folder = folder[0] # ...select the first one | |
| file_path = ( | |
| self.get_abspath(folder) / file_info["filename"] | |
| ) | |
| print(f"Downloading {file_info['url']}...") | |
| response = requests.get(file_info["url"], stream=True) | |
| total_size = int(response.headers.get("content-length", 0)) | |
| with open(file_path, "wb") as file, tqdm( | |
| desc=file_info["filename"], | |
| total=total_size, | |
| unit="iB", | |
| unit_scale=True, | |
| unit_divisor=1024, | |
| ) as progress_bar: | |
| for data in response.iter_content(chunk_size=1024): | |
| size = file.write(data) | |
| progress_bar.update(size) | |
| print(f"Downloaded {file_info['filename']} to {file_path}") | |
| def find_lcm_lora(self): | |
| return self.get_file_path("lcm_lora") | |