Spaces:
Running
Running
import subprocess | |
import sys | |
import os | |
import traceback | |
import git | |
import configparser | |
import re | |
import json | |
import yaml | |
import requests | |
from tqdm.auto import tqdm | |
from git.remote import RemoteProgress | |
def download_url(url, dest_folder, filename=None): | |
# Ensure the destination folder exists | |
if not os.path.exists(dest_folder): | |
os.makedirs(dest_folder) | |
# Extract filename from URL if not provided | |
if filename is None: | |
filename = os.path.basename(url) | |
# Full path to save the file | |
dest_path = os.path.join(dest_folder, filename) | |
# Download the file | |
response = requests.get(url, stream=True) | |
if response.status_code == 200: | |
with open(dest_path, 'wb') as file: | |
for chunk in response.iter_content(chunk_size=1024): | |
if chunk: | |
file.write(chunk) | |
else: | |
print(f"Failed to download file from {url}") | |
config_path = os.path.join(os.path.dirname(__file__), "config.ini") | |
nodelist_path = os.path.join(os.path.dirname(__file__), "custom-node-list.json") | |
working_directory = os.getcwd() | |
if os.path.basename(working_directory) != 'custom_nodes': | |
print(f"WARN: This script should be executed in custom_nodes dir") | |
print(f"DBG: INFO {working_directory}") | |
print(f"DBG: INFO {sys.argv}") | |
# exit(-1) | |
class GitProgress(RemoteProgress): | |
def __init__(self): | |
super().__init__() | |
self.pbar = tqdm(ascii=True) | |
def update(self, op_code, cur_count, max_count=None, message=''): | |
self.pbar.total = max_count | |
self.pbar.n = cur_count | |
self.pbar.pos = 0 | |
self.pbar.refresh() | |
def gitclone(custom_nodes_path, url, target_hash=None): | |
repo_name = os.path.splitext(os.path.basename(url))[0] | |
repo_path = os.path.join(custom_nodes_path, repo_name) | |
# Clone the repository from the remote URL | |
repo = git.Repo.clone_from(url, repo_path, recursive=True, progress=GitProgress()) | |
if target_hash is not None: | |
print(f"CHECKOUT: {repo_name} [{target_hash}]") | |
repo.git.checkout(target_hash) | |
repo.git.clear_cache() | |
repo.close() | |
def gitcheck(path, do_fetch=False): | |
try: | |
# Fetch the latest commits from the remote repository | |
repo = git.Repo(path) | |
if repo.head.is_detached: | |
print("CUSTOM NODE CHECK: True") | |
return | |
current_branch = repo.active_branch | |
branch_name = current_branch.name | |
remote_name = current_branch.tracking_branch().remote_name | |
remote = repo.remote(name=remote_name) | |
if do_fetch: | |
remote.fetch() | |
# Get the current commit hash and the commit hash of the remote branch | |
commit_hash = repo.head.commit.hexsha | |
remote_commit_hash = repo.refs[f'{remote_name}/{branch_name}'].object.hexsha | |
# Compare the commit hashes to determine if the local repository is behind the remote repository | |
if commit_hash != remote_commit_hash: | |
# Get the commit dates | |
commit_date = repo.head.commit.committed_datetime | |
remote_commit_date = repo.refs[f'{remote_name}/{branch_name}'].object.committed_datetime | |
# Compare the commit dates to determine if the local repository is behind the remote repository | |
if commit_date < remote_commit_date: | |
print("CUSTOM NODE CHECK: True") | |
else: | |
print("CUSTOM NODE CHECK: False") | |
except Exception as e: | |
print(e) | |
print("CUSTOM NODE CHECK: Error") | |
def switch_to_default_branch(repo): | |
show_result = repo.git.remote("show", "origin") | |
matches = re.search(r"\s*HEAD branch:\s*(.*)", show_result) | |
if matches: | |
default_branch = matches.group(1) | |
repo.git.checkout(default_branch) | |
def gitpull(path): | |
# Check if the path is a git repository | |
if not os.path.exists(os.path.join(path, '.git')): | |
raise ValueError('Not a git repository') | |
# Pull the latest changes from the remote repository | |
repo = git.Repo(path) | |
if repo.is_dirty(): | |
repo.git.stash() | |
commit_hash = repo.head.commit.hexsha | |
try: | |
if repo.head.is_detached: | |
switch_to_default_branch(repo) | |
current_branch = repo.active_branch | |
branch_name = current_branch.name | |
remote_name = current_branch.tracking_branch().remote_name | |
remote = repo.remote(name=remote_name) | |
remote.fetch() | |
remote_commit_hash = repo.refs[f'{remote_name}/{branch_name}'].object.hexsha | |
if commit_hash == remote_commit_hash: | |
print("CUSTOM NODE PULL: None") # there is no update | |
repo.close() | |
return | |
remote.pull() | |
repo.git.submodule('update', '--init', '--recursive') | |
new_commit_hash = repo.head.commit.hexsha | |
if commit_hash != new_commit_hash: | |
print("CUSTOM NODE PULL: Success") # update success | |
else: | |
print("CUSTOM NODE PULL: Fail") # update fail | |
except Exception as e: | |
print(e) | |
print("CUSTOM NODE PULL: Fail") # unknown git error | |
repo.close() | |
def checkout_comfyui_hash(target_hash): | |
repo_path = os.path.abspath(os.path.join(working_directory, '..')) # ComfyUI dir | |
repo = git.Repo(repo_path) | |
commit_hash = repo.head.commit.hexsha | |
if commit_hash != target_hash: | |
try: | |
print(f"CHECKOUT: ComfyUI [{target_hash}]") | |
repo.git.checkout(target_hash) | |
except git.GitCommandError as e: | |
print(f"Error checking out the ComfyUI: {str(e)}") | |
def checkout_custom_node_hash(git_custom_node_infos): | |
repo_name_to_url = {} | |
for url in git_custom_node_infos.keys(): | |
repo_name = url.split('/')[-1] | |
if repo_name.endswith('.git'): | |
repo_name = repo_name[:-4] | |
repo_name_to_url[repo_name] = url | |
for path in os.listdir(working_directory): | |
if path.endswith("ComfyUI-Manager"): | |
continue | |
fullpath = os.path.join(working_directory, path) | |
if os.path.isdir(fullpath): | |
is_disabled = path.endswith(".disabled") | |
try: | |
git_dir = os.path.join(fullpath, '.git') | |
if not os.path.exists(git_dir): | |
continue | |
need_checkout = False | |
repo_name = os.path.basename(fullpath) | |
if repo_name.endswith('.disabled'): | |
repo_name = repo_name[:-9] | |
if repo_name not in repo_name_to_url: | |
if not is_disabled: | |
# should be disabled | |
print(f"DISABLE: {repo_name}") | |
new_path = fullpath + ".disabled" | |
os.rename(fullpath, new_path) | |
need_checkout = False | |
else: | |
item = git_custom_node_infos[repo_name_to_url[repo_name]] | |
if item['disabled'] and is_disabled: | |
pass | |
elif item['disabled'] and not is_disabled: | |
# disable | |
print(f"DISABLE: {repo_name}") | |
new_path = fullpath + ".disabled" | |
os.rename(fullpath, new_path) | |
elif not item['disabled'] and is_disabled: | |
# enable | |
print(f"ENABLE: {repo_name}") | |
new_path = fullpath[:-9] | |
os.rename(fullpath, new_path) | |
fullpath = new_path | |
need_checkout = True | |
else: | |
need_checkout = True | |
if need_checkout: | |
repo = git.Repo(fullpath) | |
commit_hash = repo.head.commit.hexsha | |
if commit_hash != item['hash']: | |
print(f"CHECKOUT: {repo_name} [{item['hash']}]") | |
repo.git.checkout(item['hash']) | |
except Exception: | |
print(f"Failed to restore snapshots for the custom node '{path}'") | |
# clone missing | |
for k, v in git_custom_node_infos.items(): | |
if not v['disabled']: | |
repo_name = k.split('/')[-1] | |
if repo_name.endswith('.git'): | |
repo_name = repo_name[:-4] | |
path = os.path.join(working_directory, repo_name) | |
if not os.path.exists(path): | |
print(f"CLONE: {path}") | |
gitclone(working_directory, k, v['hash']) | |
def invalidate_custom_node_file(file_custom_node_infos): | |
global nodelist_path | |
enabled_set = set() | |
for item in file_custom_node_infos: | |
if not item['disabled']: | |
enabled_set.add(item['filename']) | |
for path in os.listdir(working_directory): | |
fullpath = os.path.join(working_directory, path) | |
if not os.path.isdir(fullpath) and fullpath.endswith('.py'): | |
if path not in enabled_set: | |
print(f"DISABLE: {path}") | |
new_path = fullpath+'.disabled' | |
os.rename(fullpath, new_path) | |
elif not os.path.isdir(fullpath) and fullpath.endswith('.py.disabled'): | |
path = path[:-9] | |
if path in enabled_set: | |
print(f"ENABLE: {path}") | |
new_path = fullpath[:-9] | |
os.rename(fullpath, new_path) | |
# download missing: just support for 'copy' style | |
py_to_url = {} | |
with open(nodelist_path, 'r', encoding="UTF-8") as json_file: | |
info = json.load(json_file) | |
for item in info['custom_nodes']: | |
if item['install_type'] == 'copy': | |
for url in item['files']: | |
if url.endswith('.py'): | |
py = url.split('/')[-1] | |
py_to_url[py] = url | |
for item in file_custom_node_infos: | |
filename = item['filename'] | |
if not item['disabled']: | |
target_path = os.path.join(working_directory, filename) | |
if not os.path.exists(target_path) and filename in py_to_url: | |
url = py_to_url[filename] | |
print(f"DOWNLOAD: {filename}") | |
download_url(url, working_directory) | |
def apply_snapshot(target): | |
try: | |
path = os.path.join(os.path.dirname(__file__), 'snapshots', f"{target}") | |
if os.path.exists(path): | |
if not target.endswith('.json') and not target.endswith('.yaml'): | |
print(f"Snapshot file not found: `{path}`") | |
print("APPLY SNAPSHOT: False") | |
return None | |
with open(path, 'r', encoding="UTF-8") as snapshot_file: | |
if target.endswith('.json'): | |
info = json.load(snapshot_file) | |
elif target.endswith('.yaml'): | |
info = yaml.load(snapshot_file, Loader=yaml.SafeLoader) | |
info = info['custom_nodes'] | |
else: | |
# impossible case | |
print("APPLY SNAPSHOT: False") | |
return None | |
comfyui_hash = info['comfyui'] | |
git_custom_node_infos = info['git_custom_nodes'] | |
file_custom_node_infos = info['file_custom_nodes'] | |
checkout_comfyui_hash(comfyui_hash) | |
checkout_custom_node_hash(git_custom_node_infos) | |
invalidate_custom_node_file(file_custom_node_infos) | |
print("APPLY SNAPSHOT: True") | |
if 'pips' in info: | |
return info['pips'] | |
else: | |
return None | |
print(f"Snapshot file not found: `{path}`") | |
print("APPLY SNAPSHOT: False") | |
return None | |
except Exception as e: | |
print(e) | |
traceback.print_exc() | |
print("APPLY SNAPSHOT: False") | |
return None | |
def restore_pip_snapshot(pips, options): | |
non_url = [] | |
local_url = [] | |
non_local_url = [] | |
for k, v in pips.items(): | |
if v == "": | |
non_url.append(k) | |
else: | |
if v.startswith('file:'): | |
local_url.append(v) | |
else: | |
non_local_url.append(v) | |
failed = [] | |
if '--pip-non-url' in options: | |
# try all at once | |
res = 1 | |
try: | |
res = subprocess.check_call([sys.executable, '-m', 'pip', 'install'] + non_url) | |
except: | |
pass | |
# fallback | |
if res != 0: | |
for x in non_url: | |
res = 1 | |
try: | |
res = subprocess.check_call([sys.executable, '-m', 'pip', 'install', x]) | |
except: | |
pass | |
if res != 0: | |
failed.append(x) | |
if '--pip-non-local-url' in options: | |
for x in non_local_url: | |
res = 1 | |
try: | |
res = subprocess.check_call([sys.executable, '-m', 'pip', 'install', x]) | |
except: | |
pass | |
if res != 0: | |
failed.append(x) | |
if '--pip-local-url' in options: | |
for x in local_url: | |
res = 1 | |
try: | |
res = subprocess.check_call([sys.executable, '-m', 'pip', 'install', x]) | |
except: | |
pass | |
if res != 0: | |
failed.append(x) | |
print(f"Installation failed for pip packages: {failed}") | |
def setup_environment(): | |
config = configparser.ConfigParser() | |
config.read(config_path) | |
if 'default' in config and 'git_exe' in config['default'] and config['default']['git_exe'] != '': | |
git.Git().update_environment(GIT_PYTHON_GIT_EXECUTABLE=config['default']['git_exe']) | |
setup_environment() | |
try: | |
if sys.argv[1] == "--clone": | |
gitclone(sys.argv[2], sys.argv[3]) | |
elif sys.argv[1] == "--check": | |
gitcheck(sys.argv[2], False) | |
elif sys.argv[1] == "--fetch": | |
gitcheck(sys.argv[2], True) | |
elif sys.argv[1] == "--pull": | |
gitpull(sys.argv[2]) | |
elif sys.argv[1] == "--apply-snapshot": | |
options = set() | |
for x in sys.argv: | |
if x in ['--pip-non-url', '--pip-local-url', '--pip-non-local-url']: | |
options.add(x) | |
pips = apply_snapshot(sys.argv[2]) | |
if pips and len(options) > 0: | |
restore_pip_snapshot(pips, options) | |
sys.exit(0) | |
except Exception as e: | |
print(e) | |
sys.exit(-1) | |