Freak-ppa commited on
Commit
dee04f4
·
verified ·
1 Parent(s): 50961fd

Update ComfyUI/folder_paths.py

Browse files
Files changed (1) hide show
  1. ComfyUI/folder_paths.py +270 -270
ComfyUI/folder_paths.py CHANGED
@@ -1,270 +1,270 @@
1
- import os
2
- import time
3
- import logging
4
- from typing import Set, List, Dict, Tuple
5
-
6
- supported_pt_extensions: Set[str] = set(['.ckpt', '.pt', '.bin', '.pth', '.safetensors', '.pkl'])
7
-
8
- SupportedFileExtensionsType = Set[str]
9
- ScanPathType = List[str]
10
- folder_names_and_paths: Dict[str, Tuple[ScanPathType, SupportedFileExtensionsType]] = {}
11
-
12
- base_path = os.path.dirname(os.path.realpath(__file__))
13
- models_dir = os.path.join(base_path, "models")
14
- folder_names_and_paths["checkpoints"] = ([os.path.join(models_dir, "checkpoints")], supported_pt_extensions)
15
- folder_names_and_paths["configs"] = ([os.path.join(models_dir, "configs")], [".yaml"])
16
-
17
- folder_names_and_paths["loras"] = ([os.path.join(models_dir, "loras")], supported_pt_extensions)
18
- folder_names_and_paths["vae"] = ([os.path.join(models_dir, "vae")], supported_pt_extensions)
19
- folder_names_and_paths["clip"] = ([os.path.join(models_dir, "clip")], supported_pt_extensions)
20
- folder_names_and_paths["unet"] = ([os.path.join(models_dir, "unet")], supported_pt_extensions)
21
- folder_names_and_paths["clip_vision"] = ([os.path.join(models_dir, "clip_vision")], supported_pt_extensions)
22
- folder_names_and_paths["style_models"] = ([os.path.join(models_dir, "style_models")], supported_pt_extensions)
23
- folder_names_and_paths["embeddings"] = ([os.path.join(models_dir, "embeddings")], supported_pt_extensions)
24
- folder_names_and_paths["diffusers"] = ([os.path.join(models_dir, "diffusers")], ["folder"])
25
- folder_names_and_paths["vae_approx"] = ([os.path.join(models_dir, "vae_approx")], supported_pt_extensions)
26
-
27
- folder_names_and_paths["controlnet"] = ([os.path.join(models_dir, "controlnet"), os.path.join(models_dir, "t2i_adapter")], supported_pt_extensions)
28
- folder_names_and_paths["gligen"] = ([os.path.join(models_dir, "gligen")], supported_pt_extensions)
29
-
30
- folder_names_and_paths["upscale_models"] = ([os.path.join(models_dir, "upscale_models")], supported_pt_extensions)
31
-
32
- folder_names_and_paths["custom_nodes"] = ([os.path.join(base_path, "custom_nodes")], set())
33
-
34
- folder_names_and_paths["hypernetworks"] = ([os.path.join(models_dir, "hypernetworks")], supported_pt_extensions)
35
-
36
- folder_names_and_paths["photomaker"] = ([os.path.join(models_dir, "photomaker")], supported_pt_extensions)
37
-
38
- folder_names_and_paths["classifiers"] = ([os.path.join(models_dir, "classifiers")], {""})
39
-
40
- output_directory = os.path.join(os.path.dirname(os.path.realpath(__file__)), "output")
41
- temp_directory = os.path.join(os.path.dirname(os.path.realpath(__file__)), "temp")
42
- input_directory = os.path.join(os.path.dirname(os.path.realpath(__file__)), "input")
43
- user_directory = os.path.join(os.path.dirname(os.path.realpath(__file__)), "user")
44
-
45
- filename_list_cache = {}
46
-
47
- if not os.path.exists(input_directory):
48
- try:
49
- os.makedirs(input_directory)
50
- except:
51
- logging.error("Failed to create input directory")
52
-
53
- def set_output_directory(output_dir):
54
- global output_directory
55
- output_directory = output_dir
56
-
57
- def set_temp_directory(temp_dir):
58
- global temp_directory
59
- temp_directory = temp_dir
60
-
61
- def set_input_directory(input_dir):
62
- global input_directory
63
- input_directory = input_dir
64
-
65
- def get_output_directory():
66
- global output_directory
67
- return output_directory
68
-
69
- def get_temp_directory():
70
- global temp_directory
71
- return temp_directory
72
-
73
- def get_input_directory():
74
- global input_directory
75
- return input_directory
76
-
77
-
78
- #NOTE: used in http server so don't put folders that should not be accessed remotely
79
- def get_directory_by_type(type_name):
80
- if type_name == "output":
81
- return get_output_directory()
82
- if type_name == "temp":
83
- return get_temp_directory()
84
- if type_name == "input":
85
- return get_input_directory()
86
- return None
87
-
88
-
89
- # determine base_dir rely on annotation if name is 'filename.ext [annotation]' format
90
- # otherwise use default_path as base_dir
91
- def annotated_filepath(name):
92
- if name.endswith("[output]"):
93
- base_dir = get_output_directory()
94
- name = name[:-9]
95
- elif name.endswith("[input]"):
96
- base_dir = get_input_directory()
97
- name = name[:-8]
98
- elif name.endswith("[temp]"):
99
- base_dir = get_temp_directory()
100
- name = name[:-7]
101
- else:
102
- return name, None
103
-
104
- return name, base_dir
105
-
106
-
107
- def get_annotated_filepath(name, default_dir=None):
108
- name, base_dir = annotated_filepath(name)
109
-
110
- if base_dir is None:
111
- if default_dir is not None:
112
- base_dir = default_dir
113
- else:
114
- base_dir = get_input_directory() # fallback path
115
-
116
- return os.path.join(base_dir, name)
117
-
118
-
119
- def exists_annotated_filepath(name):
120
- name, base_dir = annotated_filepath(name)
121
-
122
- if base_dir is None:
123
- base_dir = get_input_directory() # fallback path
124
-
125
- filepath = os.path.join(base_dir, name)
126
- return os.path.exists(filepath)
127
-
128
-
129
- def add_model_folder_path(folder_name, full_folder_path):
130
- global folder_names_and_paths
131
- if folder_name in folder_names_and_paths:
132
- folder_names_and_paths[folder_name][0].append(full_folder_path)
133
- else:
134
- folder_names_and_paths[folder_name] = ([full_folder_path], set())
135
-
136
- def get_folder_paths(folder_name):
137
- return folder_names_and_paths[folder_name][0][:]
138
-
139
- def recursive_search(directory, excluded_dir_names=None):
140
- if not os.path.isdir(directory):
141
- return [], {}
142
-
143
- if excluded_dir_names is None:
144
- excluded_dir_names = []
145
-
146
- result = []
147
- dirs = {}
148
-
149
- # Attempt to add the initial directory to dirs with error handling
150
- try:
151
- dirs[directory] = os.path.getmtime(directory)
152
- except FileNotFoundError:
153
- logging.warning(f"Warning: Unable to access {directory}. Skipping this path.")
154
-
155
- logging.debug("recursive file list on directory {}".format(directory))
156
- for dirpath, subdirs, filenames in os.walk(directory, followlinks=True, topdown=True):
157
- subdirs[:] = [d for d in subdirs if d not in excluded_dir_names]
158
- for file_name in filenames:
159
- relative_path = os.path.relpath(os.path.join(dirpath, file_name), directory)
160
- result.append(relative_path)
161
-
162
- for d in subdirs:
163
- path = os.path.join(dirpath, d)
164
- try:
165
- dirs[path] = os.path.getmtime(path)
166
- except FileNotFoundError:
167
- logging.warning(f"Warning: Unable to access {path}. Skipping this path.")
168
- continue
169
- logging.debug("found {} files".format(len(result)))
170
- return result, dirs
171
-
172
- def filter_files_extensions(files, extensions):
173
- return sorted(list(filter(lambda a: os.path.splitext(a)[-1].lower() in extensions or len(extensions) == 0, files)))
174
-
175
-
176
-
177
- def get_full_path(folder_name, filename):
178
- global folder_names_and_paths
179
- if folder_name not in folder_names_and_paths:
180
- return None
181
- folders = folder_names_and_paths[folder_name]
182
- filename = os.path.relpath(os.path.join("/", filename), "/")
183
- for x in folders[0]:
184
- full_path = os.path.join(x, filename)
185
- if os.path.isfile(full_path):
186
- return full_path
187
- elif os.path.islink(full_path):
188
- logging.warning("WARNING path {} exists but doesn't link anywhere, skipping.".format(full_path))
189
-
190
- return None
191
-
192
- def get_filename_list_(folder_name):
193
- global folder_names_and_paths
194
- output_list = set()
195
- folders = folder_names_and_paths[folder_name]
196
- output_folders = {}
197
- for x in folders[0]:
198
- files, folders_all = recursive_search(x, excluded_dir_names=[".git"])
199
- output_list.update(filter_files_extensions(files, folders[1]))
200
- output_folders = {**output_folders, **folders_all}
201
-
202
- return (sorted(list(output_list)), output_folders, time.perf_counter())
203
-
204
- def cached_filename_list_(folder_name):
205
- global filename_list_cache
206
- global folder_names_and_paths
207
- if folder_name not in filename_list_cache:
208
- return None
209
- out = filename_list_cache[folder_name]
210
-
211
- for x in out[1]:
212
- time_modified = out[1][x]
213
- folder = x
214
- if os.path.getmtime(folder) != time_modified:
215
- return None
216
-
217
- folders = folder_names_and_paths[folder_name]
218
- for x in folders[0]:
219
- if os.path.isdir(x):
220
- if x not in out[1]:
221
- return None
222
-
223
- return out
224
-
225
- def get_filename_list(folder_name):
226
- out = cached_filename_list_(folder_name)
227
- if out is None:
228
- out = get_filename_list_(folder_name)
229
- global filename_list_cache
230
- filename_list_cache[folder_name] = out
231
- return list(out[0])
232
-
233
- def get_save_image_path(filename_prefix, output_dir, image_width=0, image_height=0):
234
- def map_filename(filename):
235
- prefix_len = len(os.path.basename(filename_prefix))
236
- prefix = filename[:prefix_len + 1]
237
- try:
238
- digits = int(filename[prefix_len + 1:].split('_')[0])
239
- except:
240
- digits = 0
241
- return (digits, prefix)
242
-
243
- def compute_vars(input, image_width, image_height):
244
- input = input.replace("%width%", str(image_width))
245
- input = input.replace("%height%", str(image_height))
246
- return input
247
-
248
- filename_prefix = compute_vars(filename_prefix, image_width, image_height)
249
-
250
- subfolder = os.path.dirname(os.path.normpath(filename_prefix))
251
- filename = os.path.basename(os.path.normpath(filename_prefix))
252
-
253
- full_output_folder = os.path.join(output_dir, subfolder)
254
-
255
- if os.path.commonpath((output_dir, os.path.abspath(full_output_folder))) != output_dir:
256
- err = "**** ERROR: Saving image outside the output folder is not allowed." + \
257
- "\n full_output_folder: " + os.path.abspath(full_output_folder) + \
258
- "\n output_dir: " + output_dir + \
259
- "\n commonpath: " + os.path.commonpath((output_dir, os.path.abspath(full_output_folder)))
260
- logging.error(err)
261
- raise Exception(err)
262
-
263
- try:
264
- counter = max(filter(lambda a: os.path.normcase(a[1][:-1]) == os.path.normcase(filename) and a[1][-1] == "_", map(map_filename, os.listdir(full_output_folder))))[0] + 1
265
- except ValueError:
266
- counter = 1
267
- except FileNotFoundError:
268
- os.makedirs(full_output_folder, exist_ok=True)
269
- counter = 1
270
- return full_output_folder, filename, counter, subfolder, filename_prefix
 
1
+ import os
2
+ import time
3
+ import logging
4
+ from typing import Set, List, Dict, Tuple
5
+
6
+ supported_pt_extensions: Set[str] = set(['.ckpt', '.pt', '.bin', '.pth', '.safetensors', '.pkl'])
7
+
8
+ SupportedFileExtensionsType = Set[str]
9
+ ScanPathType = List[str]
10
+ folder_names_and_paths: Dict[str, Tuple[ScanPathType, SupportedFileExtensionsType]] = {}
11
+
12
+ base_path = os.path.dirname(os.path.realpath(__file__))
13
+ models_dir = os.path.join(base_path, "models")
14
+ folder_names_and_paths["checkpoints"] = ([os.path.join(models_dir, "checkpoints")], supported_pt_extensions)
15
+ folder_names_and_paths["configs"] = ([os.path.join(models_dir, "configs")], [".yaml"])
16
+
17
+ folder_names_and_paths["loras"] = ([os.path.join(models_dir, "loras")], supported_pt_extensions)
18
+ folder_names_and_paths["vae"] = ([os.path.join(models_dir, "vae")], supported_pt_extensions)
19
+ folder_names_and_paths["clip"] = ([os.path.join(models_dir, "clip")], supported_pt_extensions)
20
+ folder_names_and_paths["unet"] = ([os.path.join(models_dir, "unet")], supported_pt_extensions)
21
+ folder_names_and_paths["clip_vision"] = ([os.path.join(models_dir, "clip_vision")], supported_pt_extensions)
22
+ folder_names_and_paths["style_models"] = ([os.path.join(models_dir, "style_models")], supported_pt_extensions)
23
+ folder_names_and_paths["embeddings"] = ([os.path.join(models_dir, "embeddings")], supported_pt_extensions)
24
+ folder_names_and_paths["diffusers"] = ([os.path.join(models_dir, "diffusers")], ["folder"])
25
+ folder_names_and_paths["vae_approx"] = ([os.path.join(models_dir, "vae_approx")], supported_pt_extensions)
26
+
27
+ folder_names_and_paths["controlnet"] = ([os.path.join(models_dir, "controlnet"), os.path.join(models_dir, "t2i_adapter")], supported_pt_extensions)
28
+ folder_names_and_paths["gligen"] = ([os.path.join(models_dir, "gligen")], supported_pt_extensions)
29
+
30
+ folder_names_and_paths["upscale_models"] = ([os.path.join(models_dir, "upscale_models")], supported_pt_extensions)
31
+
32
+ folder_names_and_paths["custom_nodes"] = ([os.path.join(base_path, "custom_nodes")], set())
33
+
34
+ folder_names_and_paths["hypernetworks"] = ([os.path.join(models_dir, "hypernetworks")], supported_pt_extensions)
35
+
36
+ folder_names_and_paths["photomaker"] = ([os.path.join(models_dir, "photomaker")], supported_pt_extensions)
37
+
38
+ folder_names_and_paths["classifiers"] = ([os.path.join(models_dir, "classifiers")], {""})
39
+ folder_names_and_paths["blip"] = ([os.path.join(models_dir, "blip")], supported_pt_extensions)
40
+ output_directory = os.path.join(os.path.dirname(os.path.realpath(__file__)), "output")
41
+ temp_directory = os.path.join(os.path.dirname(os.path.realpath(__file__)), "temp")
42
+ input_directory = os.path.join(os.path.dirname(os.path.realpath(__file__)), "input")
43
+ user_directory = os.path.join(os.path.dirname(os.path.realpath(__file__)), "user")
44
+
45
+ filename_list_cache = {}
46
+
47
+ if not os.path.exists(input_directory):
48
+ try:
49
+ os.makedirs(input_directory)
50
+ except:
51
+ logging.error("Failed to create input directory")
52
+
53
+ def set_output_directory(output_dir):
54
+ global output_directory
55
+ output_directory = output_dir
56
+
57
+ def set_temp_directory(temp_dir):
58
+ global temp_directory
59
+ temp_directory = temp_dir
60
+
61
+ def set_input_directory(input_dir):
62
+ global input_directory
63
+ input_directory = input_dir
64
+
65
+ def get_output_directory():
66
+ global output_directory
67
+ return output_directory
68
+
69
+ def get_temp_directory():
70
+ global temp_directory
71
+ return temp_directory
72
+
73
+ def get_input_directory():
74
+ global input_directory
75
+ return input_directory
76
+
77
+
78
+ #NOTE: used in http server so don't put folders that should not be accessed remotely
79
+ def get_directory_by_type(type_name):
80
+ if type_name == "output":
81
+ return get_output_directory()
82
+ if type_name == "temp":
83
+ return get_temp_directory()
84
+ if type_name == "input":
85
+ return get_input_directory()
86
+ return None
87
+
88
+
89
+ # determine base_dir rely on annotation if name is 'filename.ext [annotation]' format
90
+ # otherwise use default_path as base_dir
91
+ def annotated_filepath(name):
92
+ if name.endswith("[output]"):
93
+ base_dir = get_output_directory()
94
+ name = name[:-9]
95
+ elif name.endswith("[input]"):
96
+ base_dir = get_input_directory()
97
+ name = name[:-8]
98
+ elif name.endswith("[temp]"):
99
+ base_dir = get_temp_directory()
100
+ name = name[:-7]
101
+ else:
102
+ return name, None
103
+
104
+ return name, base_dir
105
+
106
+
107
+ def get_annotated_filepath(name, default_dir=None):
108
+ name, base_dir = annotated_filepath(name)
109
+
110
+ if base_dir is None:
111
+ if default_dir is not None:
112
+ base_dir = default_dir
113
+ else:
114
+ base_dir = get_input_directory() # fallback path
115
+
116
+ return os.path.join(base_dir, name)
117
+
118
+
119
+ def exists_annotated_filepath(name):
120
+ name, base_dir = annotated_filepath(name)
121
+
122
+ if base_dir is None:
123
+ base_dir = get_input_directory() # fallback path
124
+
125
+ filepath = os.path.join(base_dir, name)
126
+ return os.path.exists(filepath)
127
+
128
+
129
+ def add_model_folder_path(folder_name, full_folder_path):
130
+ global folder_names_and_paths
131
+ if folder_name in folder_names_and_paths:
132
+ folder_names_and_paths[folder_name][0].append(full_folder_path)
133
+ else:
134
+ folder_names_and_paths[folder_name] = ([full_folder_path], set())
135
+
136
+ def get_folder_paths(folder_name):
137
+ return folder_names_and_paths[folder_name][0][:]
138
+
139
+ def recursive_search(directory, excluded_dir_names=None):
140
+ if not os.path.isdir(directory):
141
+ return [], {}
142
+
143
+ if excluded_dir_names is None:
144
+ excluded_dir_names = []
145
+
146
+ result = []
147
+ dirs = {}
148
+
149
+ # Attempt to add the initial directory to dirs with error handling
150
+ try:
151
+ dirs[directory] = os.path.getmtime(directory)
152
+ except FileNotFoundError:
153
+ logging.warning(f"Warning: Unable to access {directory}. Skipping this path.")
154
+
155
+ logging.debug("recursive file list on directory {}".format(directory))
156
+ for dirpath, subdirs, filenames in os.walk(directory, followlinks=True, topdown=True):
157
+ subdirs[:] = [d for d in subdirs if d not in excluded_dir_names]
158
+ for file_name in filenames:
159
+ relative_path = os.path.relpath(os.path.join(dirpath, file_name), directory)
160
+ result.append(relative_path)
161
+
162
+ for d in subdirs:
163
+ path = os.path.join(dirpath, d)
164
+ try:
165
+ dirs[path] = os.path.getmtime(path)
166
+ except FileNotFoundError:
167
+ logging.warning(f"Warning: Unable to access {path}. Skipping this path.")
168
+ continue
169
+ logging.debug("found {} files".format(len(result)))
170
+ return result, dirs
171
+
172
+ def filter_files_extensions(files, extensions):
173
+ return sorted(list(filter(lambda a: os.path.splitext(a)[-1].lower() in extensions or len(extensions) == 0, files)))
174
+
175
+
176
+
177
+ def get_full_path(folder_name, filename):
178
+ global folder_names_and_paths
179
+ if folder_name not in folder_names_and_paths:
180
+ return None
181
+ folders = folder_names_and_paths[folder_name]
182
+ filename = os.path.relpath(os.path.join("/", filename), "/")
183
+ for x in folders[0]:
184
+ full_path = os.path.join(x, filename)
185
+ if os.path.isfile(full_path):
186
+ return full_path
187
+ elif os.path.islink(full_path):
188
+ logging.warning("WARNING path {} exists but doesn't link anywhere, skipping.".format(full_path))
189
+
190
+ return None
191
+
192
+ def get_filename_list_(folder_name):
193
+ global folder_names_and_paths
194
+ output_list = set()
195
+ folders = folder_names_and_paths[folder_name]
196
+ output_folders = {}
197
+ for x in folders[0]:
198
+ files, folders_all = recursive_search(x, excluded_dir_names=[".git"])
199
+ output_list.update(filter_files_extensions(files, folders[1]))
200
+ output_folders = {**output_folders, **folders_all}
201
+
202
+ return (sorted(list(output_list)), output_folders, time.perf_counter())
203
+
204
+ def cached_filename_list_(folder_name):
205
+ global filename_list_cache
206
+ global folder_names_and_paths
207
+ if folder_name not in filename_list_cache:
208
+ return None
209
+ out = filename_list_cache[folder_name]
210
+
211
+ for x in out[1]:
212
+ time_modified = out[1][x]
213
+ folder = x
214
+ if os.path.getmtime(folder) != time_modified:
215
+ return None
216
+
217
+ folders = folder_names_and_paths[folder_name]
218
+ for x in folders[0]:
219
+ if os.path.isdir(x):
220
+ if x not in out[1]:
221
+ return None
222
+
223
+ return out
224
+
225
+ def get_filename_list(folder_name):
226
+ out = cached_filename_list_(folder_name)
227
+ if out is None:
228
+ out = get_filename_list_(folder_name)
229
+ global filename_list_cache
230
+ filename_list_cache[folder_name] = out
231
+ return list(out[0])
232
+
233
+ def get_save_image_path(filename_prefix, output_dir, image_width=0, image_height=0):
234
+ def map_filename(filename):
235
+ prefix_len = len(os.path.basename(filename_prefix))
236
+ prefix = filename[:prefix_len + 1]
237
+ try:
238
+ digits = int(filename[prefix_len + 1:].split('_')[0])
239
+ except:
240
+ digits = 0
241
+ return (digits, prefix)
242
+
243
+ def compute_vars(input, image_width, image_height):
244
+ input = input.replace("%width%", str(image_width))
245
+ input = input.replace("%height%", str(image_height))
246
+ return input
247
+
248
+ filename_prefix = compute_vars(filename_prefix, image_width, image_height)
249
+
250
+ subfolder = os.path.dirname(os.path.normpath(filename_prefix))
251
+ filename = os.path.basename(os.path.normpath(filename_prefix))
252
+
253
+ full_output_folder = os.path.join(output_dir, subfolder)
254
+
255
+ if os.path.commonpath((output_dir, os.path.abspath(full_output_folder))) != output_dir:
256
+ err = "**** ERROR: Saving image outside the output folder is not allowed." + \
257
+ "\n full_output_folder: " + os.path.abspath(full_output_folder) + \
258
+ "\n output_dir: " + output_dir + \
259
+ "\n commonpath: " + os.path.commonpath((output_dir, os.path.abspath(full_output_folder)))
260
+ logging.error(err)
261
+ raise Exception(err)
262
+
263
+ try:
264
+ counter = max(filter(lambda a: os.path.normcase(a[1][:-1]) == os.path.normcase(filename) and a[1][-1] == "_", map(map_filename, os.listdir(full_output_folder))))[0] + 1
265
+ except ValueError:
266
+ counter = 1
267
+ except FileNotFoundError:
268
+ os.makedirs(full_output_folder, exist_ok=True)
269
+ counter = 1
270
+ return full_output_folder, filename, counter, subfolder, filename_prefix