import os import folder_paths import json from server import PromptServer import glob from aiohttp import web def get_allowed_dirs(): dir = os.path.abspath(os.path.join(__file__, "../../user")) file = os.path.join(dir, "text_file_dirs.json") with open(file, "r") as f: return json.loads(f.read()) def get_valid_dirs(): return get_allowed_dirs().keys() def get_dir_from_name(name): dirs = get_allowed_dirs() if name not in dirs: raise KeyError(name + " dir not found") path = dirs[name] path = path.replace("$input", folder_paths.get_input_directory()) path = path.replace("$output", folder_paths.get_output_directory()) path = path.replace("$temp", folder_paths.get_temp_directory()) return path def is_child_dir(parent_path, child_path): parent_path = os.path.abspath(parent_path) child_path = os.path.abspath(child_path) return os.path.commonpath([parent_path]) == os.path.commonpath([parent_path, child_path]) def get_real_path(dir): dir = dir.replace("/**/", "/") dir = os.path.abspath(dir) dir = os.path.split(dir)[0] return dir @PromptServer.instance.routes.get("/pysssss/text-file/{name}") async def get_files(request): name = request.match_info["name"] dir = get_dir_from_name(name) recursive = "/**/" in dir # Ugh cant use root_path on glob... lazy hack.. pre = get_real_path(dir) files = list(map(lambda t: os.path.relpath(t, pre), glob.glob(dir, recursive=recursive))) if len(files) == 0: files = ["[none]"] return web.json_response(files) def get_file(root_dir, file): if file == "[none]" or not file or not file.strip(): raise ValueError("No file") root_dir = get_dir_from_name(root_dir) root_dir = get_real_path(root_dir) full_path = os.path.join(root_dir, file) if not is_child_dir(root_dir, full_path): raise ReferenceError() return full_path class TextFileNode: RETURN_TYPES = ("STRING",) CATEGORY = "utils" @classmethod def VALIDATE_INPUTS(self, root_dir, file, **kwargs): self.file = get_file(root_dir, file) return True def load_text(self, **kwargs): with open(self.file, "r") as f: return (f.read(), ) class LoadText(TextFileNode): @classmethod def IS_CHANGED(self, **kwargs): return os.path.getmtime(self.file) @classmethod def INPUT_TYPES(s): return { "required": { "root_dir": (list(get_valid_dirs()), {}), "file": (["[none]"], { "pysssss.binding": [{ "source": "root_dir", "callback": [{ "type": "set", "target": "$this.disabled", "value": True }, { "type": "fetch", "url": "/pysssss/text-file/{$source.value}", "then": [{ "type": "set", "target": "$this.options.values", "value": "$result" }, { "type": "validate-combo" }, { "type": "set", "target": "$this.disabled", "value": False }] }], }] }) }, } FUNCTION = "load_text" class SaveText(TextFileNode): @classmethod def IS_CHANGED(self, **kwargs): return float("nan") @classmethod def INPUT_TYPES(s): return { "required": { "root_dir": (list(get_valid_dirs()), {}), "file": ("STRING", {"default": "file.txt"}), "append": (["append", "overwrite", "new only"], {}), "insert": ("BOOLEAN", { "default": True, "label_on": "new line", "label_off": "none", "pysssss.binding": [{ "source": "append", "callback": [{ "type": "if", "condition": [{ "left": "$source.value", "op": "eq", "right": '"append"' }], "true": [{ "type": "set", "target": "$this.disabled", "value": False }], "false": [{ "type": "set", "target": "$this.disabled", "value": True }], }] }] }), "text": ("STRING", {"forceInput": True, "multiline": True}) }, } FUNCTION = "write_text" def write_text(self, root_dir, file, append, insert, text): if append == "new only" and os.path.exists(self.file): raise FileExistsError( self.file + " already exists and 'new only' is selected.") with open(self.file, "a+" if append == "append" else "w") as f: is_append = f.tell() != 0 if is_append and insert: f.write("\n") f.write(text) return super().load_text() NODE_CLASS_MAPPINGS = { "LoadText|pysssss": LoadText, "SaveText|pysssss": SaveText, } NODE_DISPLAY_NAME_MAPPINGS = { "LoadText|pysssss": "Load Text 🐍", "SaveText|pysssss": "Save Text 🐍", }