test / files_cells /python /en /downloading_en.py
NagisaNao's picture
less waiting :3
1692eff verified
raw
history blame
22.4 kB
##~ DOWNLOADING CODE | BY: ANXETY ~##
from directory_setup import *
from models_data import model_list, vae_list, controlnet_list
import os
import re
import time
import json
import shutil
import zipfile
import requests
import subprocess
from datetime import timedelta
from subprocess import getoutput
from IPython.utils import capture
from IPython.display import clear_output
from urllib.parse import urlparse, parse_qs
# Setup Env
env = os.getenv('ENV_NAME')
root_path = os.getenv('ROOT_PATH')
webui_path = os.getenv('WEBUI_PATH')
free_plan = os.getenv('FREE_PLAN')
UI = os.getenv('SDW_UI')
OLD_UI = os.getenv('SDW_OLD_UI')
# ============ loading settings V4 =============
def load_settings(path):
if os.path.exists(path):
with open(path, 'r') as file:
return json.load(file)
return {}
settings = load_settings(f'{root_path}/settings.json')
VARIABLES = [
'model', 'model_num', 'inpainting_model',
'vae', 'vae_num', 'latest_webui', 'latest_exstensions',
'change_webui', 'detailed_download', 'controlnet',
'controlnet_num', 'commit_hash', 'huggingface_token',
'ngrok_token', 'zrok_token', 'commandline_arguments',
'Model_url', 'Vae_url', 'LoRA_url', 'Embedding_url',
'Extensions_url', 'custom_file_urls'
]
locals().update({key: settings.get(key) for key in VARIABLES})
# ================ LIBRARIES V2 ================
flag_file = f"{root_path}/libraries_installed.txt"
if not os.path.exists(flag_file):
print("πŸ’Ώ Installing the libraries, it's going to take a while:\n")
install_lib = {
# "aria2": "apt -y install aria2",
"aria2": "pip install aria2",
"localtunnel": "npm install -g localtunnel",
}
if controlnet != 'none':
install_lib["insightface"] = "pip install insightface"
additional_libs = {
"Google Colab": {
"xformers": "pip install xformers==0.0.27 --no-deps"
},
"Kaggle": {
"xformers": "pip install xformers==0.0.26.post1",
# "torch": "pip install torch==2.1.2+cu121 torchvision==0.16.2+cu121 torchaudio==2.1.2 --extra-index-url https://download.pytorch.org/whl/cu121",
# "aiohttp": "pip install trash-cli && trash-put /opt/conda/lib/python3.10/site-packages/aiohttp*" # fix install req
}
}
if env in additional_libs:
install_lib.update(additional_libs[env])
# Loop through libraries
for index, (package, install_cmd) in enumerate(install_lib.items(), start=1):
print(f"\r[{index}/{len(install_lib)}] \033[32m>>\033[0m Installing \033[33m{package}\033[0m..." + " "*35, end='')
subprocess.run(install_cmd, shell=True, capture_output=True)
# Additional specific packages
with capture.capture_output() as cap:
get_ipython().system('curl -s -OL https://github.com/DEX-1101/sd-webui-notebook/raw/main/res/new_tunnel --output-dir {root_path}')
get_ipython().system('curl -s -Lo /usr/bin/cl https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-amd64 && chmod +x /usr/bin/cl')
get_ipython().system('curl -sLO https://github.com/openziti/zrok/releases/download/v0.4.32/zrok_0.4.32_linux_amd64.tar.gz && tar -xzf zrok_0.4.32_linux_amd64.tar.gz -C /usr/bin && rm -f zrok_0.4.32_linux_amd64.tar.gz')
del cap
clear_output()
# Save file install lib
with open(flag_file, "w") as f:
f.write(">W<'")
print("πŸͺ Libraries are installed!" + " "*35)
time.sleep(2)
clear_output()
# =================== OTHER ====================
# Setup Timer
if "START_COLAB" in os.environ:
start_colab = int(os.environ["START_COLAB"])
else:
start_colab = int(time.time()) - 5
os.environ["START_COLAB"] = str(start_colab)
# remove directory func
def _remove_dir(directory_path):
if directory_path and os.path.exists(directory_path):
try:
shutil.rmtree(directory_path)
except Exception:
get_ipython().system('rm -rf {directory_path}')
# Save files temporarily
temporarily_dir = f'{root_path}/temp_dir'
def copy_items_with_replace(src_base, dst_base):
items_to_copy = [
'embeddings',
'models/Stable-diffusion',
'models/VAE',
'models/Lora',
'models/ControlNet'
]
print("Moving files...", end='')
time.sleep(1)
for item in items_to_copy:
src = os.path.join(src_base, item)
dst = os.path.join(dst_base, item)
if os.path.exists(src):
if os.path.exists(dst):
_remove_dir(dst)
os.makedirs(os.path.dirname(dst), exist_ok=True)
shutil.move(src, dst)
print("\rπŸ”₯ Files moved!" + " "*15)
def download_and_unpack(url, dest_path):
aria2_args = "--console-log-level=error -c -x 16 -s 16 -k 1M"
get_ipython().system("aria2c {aria2_args} '{url}' -o repo.zip")
get_ipython().system('unzip -q -o repo.zip -d {dest_path}')
get_ipython().system('rm -rf repo.zip')
def handle_colab_timer(webui_path, timer_colab):
timer_file_path = os.path.join(webui_path, 'static', 'colabTimer.txt')
if not os.path.exists(timer_file_path):
with open(timer_file_path, 'w') as timer_file:
timer_file.write(str(timer_colab))
else:
with open(timer_file_path, 'r') as timer_file:
timer_colab = float(timer_file.read())
return timer_colab
def unpack_webui():
start_install = time.time()
print(f"⌚ Unpacking Stable Diffusion{' (Forge)' if UI == 'Forge' else ''}...", end='')
with capture.capture_output() as cap:
download_url = "https://huggingface.co/NagisaNao/fast_repo/resolve/main/FULL_REPO.zip"
if UI == 'Forge':
download_url = "https://huggingface.co/NagisaNao/fast_repo/resolve/main/FULL_REPO_forge.zip"
download_and_unpack(download_url, webui_path)
get_ipython().system(f'echo -n {start_colab} > {webui_path}/static/colabTimer.txt')
del cap
install_time = time.time() - start_install
minutes, seconds = divmod(int(install_time), 60)
print(f"\rπŸš€ Unpacking complete! For {minutes:02}:{seconds:02} ⚑" + " "*15)
if os.path.exists(temporarily_dir):
copy_items_with_replace(temporarily_dir, webui_path)
_remove_dir(temporarily_dir)
# ================= MAIN CODE ==================
if os.path.exists(webui_path):
if UI != OLD_UI:
print(f'Switching the WebUI from \033[33m{OLD_UI}\033[0m to \033[33m{UI}\033[0m:\n')
copy_items_with_replace(webui_path, temporarily_dir)
_remove_dir(webui_path)
os.environ['SDW_OLD_UI'] = UI
time.sleep(2)
clear_output()
if not os.path.exists(webui_path):
unpack_webui()
else:
print("πŸš€ All unpacked... Skip. ⚑")
timer_colab = handle_colab_timer(webui_path, start_colab)
elapsed_time = str(timedelta(seconds=time.time() - timer_colab)).split('.')[0]
print(f"⌚️ You have been conducting this session for - \033[33m{elapsed_time}\033[0m")
## Changes extensions and WebUi
if latest_webui or latest_exstensions:
action = "WebUI and Extensions" if latest_webui and latest_exstensions else ("WebUI" if latest_webui else "Extensions")
print(f"⌚️ Updating {action}...", end='')
with capture.capture_output() as cap:
get_ipython().system('git config --global user.email "you@example.com"')
get_ipython().system('git config --global user.name "Your Name"')
## Update Webui
if latest_webui:
get_ipython().run_line_magic('cd', '{webui_path}')
get_ipython().system('git restore .')
get_ipython().system('git pull -X theirs --rebase --autostash')
## Update extensions
if latest_exstensions:
get_ipython().system('{\'for dir in \' + webui_path + \'/extensions/*/; do cd \\"$dir\\" && git reset --hard && git pull; done\'}')
del cap
print(f"\r✨ Updating {action} Completed!")
# === FIXING EXTENSIONS ===
anxety_repos = "https://huggingface.co/NagisaNao/fast_repo/resolve/main"
with capture.capture_output() as cap:
# --- Umi-Wildcard ---
get_ipython().system("sed -i '521s/open=\\(False\\|True\\)/open=False/' {webui_path}/extensions/Umi-AI-Wildcards/scripts/wildcard_recursive.py # Closed accordion by default")
# --- Encrypt-Image ---
get_ipython().system("sed -i '9,37d' {webui_path}/extensions/Encrypt-Image/javascript/encrypt_images_info.js # Removes the weird text in webui")
# --- Additional-Networks ---
get_ipython().system('wget -O {webui_path}/extensions/additional-networks/scripts/metadata_editor.py {anxety_repos}/extensions/Additional-Networks/fix/metadata_editor.py # Fixing an error due to old style')
del cap
## Version switching
if commit_hash:
print('⏳ Time machine activation...', end="")
with capture.capture_output() as cap:
get_ipython().run_line_magic('cd', '{webui_path}')
get_ipython().system('git config --global user.email "you@example.com"')
get_ipython().system('git config --global user.name "Your Name"')
get_ipython().system('git reset --hard {commit_hash}')
del cap
print(f"\rβŒ›οΈ The time machine has been activated! Current commit: \033[34m{commit_hash}\033[0m")
## Downloading model and stuff | oh~ Hey! If you're freaked out by that code too, don't worry, me too!
print("πŸ“¦ Downloading models and stuff...", end='')
url = ""
PREFIXES = {
"model": models_dir,
"vae": vaes_dir,
"lora": loras_dir,
"embed": embeddings_dir,
"extension": extensions_dir,
"control": control_dir,
"adetailer": adetailer_dir,
"config": webui_path
}
extension_repo = []
directories = [value for key, value in PREFIXES.items()] # for unpucking zip files
get_ipython().system('mkdir -p {" ".join(directories)}')
hf_token = huggingface_token if huggingface_token else "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"
user_header = f"\"Authorization: Bearer {hf_token}\""
''' Formatted Info Output '''
from math import floor
def center_text(text, terminal_width=45):
padding = (terminal_width - len(text)) // 2
return f"\033[1m\033[36m{' ' * padding}{text}{' ' * padding}\033[0m\033[32m"
def format_output(url, dst_dir, file_name, image_name=None, image_url=None):
info = center_text(f"[{file_name.split('.')[0]}]")
separation_line = '\033[32m' + '---' * 20
print(f"\n{separation_line}{info}{separation_line}")
print(f"\033[33mURL: \033[34m{url}")
print(f"\033[33mSAVE DIR: \033[34m{dst_dir}")
print(f"\033[33mFILE NAME: \033[34m{file_name}\033[0m")
if 'civitai' in url and image_url:
print(f"\033[32m[Preview DL]:\033[0m {image_name} - {image_url}\n")
''' GET CivitAi API - DATA '''
def CivitAi_API(url, file_name=None):
SUPPORT_TYPES = ('Checkpoint', 'TextualInversion', 'LORA')
CIVITAI_TOKEN = "62c0c5956b2f9defbd844d754000180b"
url = url.split('?token=')[0] if '?token=' in url else url
url = url.replace('?type=', f'?token={CIVITAI_TOKEN}&type=') if '?type=' in url else f"{url}?token={CIVITAI_TOKEN}"
def get_model_data(url):
base_url = "https://civitai.com/api/v1"
try:
if "civitai.com/models/" in url:
if '?modelVersionId=' in url:
version_id = url.split('?modelVersionId=')[1]
else:
model_id = url.split('/models/')[1].split('/')[0]
model_data = requests.get(f"{base_url}/models/{model_id}").json()
version_id = model_data['modelVersions'][0].get('id')
else:
version_id = url.split('/models/')[1].split('/')[0]
return requests.get(f"{base_url}/model-versions/{version_id}").json()
except (KeyError, IndexError, requests.RequestException) as e:
return None
data = get_model_data(url)
if not data:
print("\033[31m[Data Info]:\033[0m Failed to retrieve data from the API.\n")
return 'None', None, None, None, None, None, None
def extract_model_info(url, data):
model_type = data['model']['type']
model_name = data['files'][0]['name']
if 'type=' in url:
url_model_type = parse_qs(urlparse(url).query).get('type', [''])[0].lower()
if 'vae' in url_model_type:
model_type = data['files'][1]['type']
model_name = data['files'][1]['name']
return model_type, model_name
model_type, model_name = extract_model_info(url, data)
model_name = file_name or model_name
def get_download_url(data, model_type):
if any(t.lower() in model_type.lower() for t in SUPPORT_TYPES):
return data['files'][0]['downloadUrl']
return data['files'][1]['downloadUrl'] if 'type' in url else data['files'][0]['downloadUrl']
download_url = get_download_url(data, model_type)
def get_image_info(data, model_type, model_name):
if not any(t in model_type for t in SUPPORT_TYPES):
return None, None
for image in data.get('images', []):
if image['nsfwLevel'] >= 4 and env == 'Kaggle': # Filter NSFW images for Kaggle
continue
image_url = image['url']
image_extension = image_url.split('.')[-1]
image_name = f"{model_name.split('.')[0]}.preview.{image_extension}" if image_url else None
return image_url, image_name
return None, None
image_url, image_name = get_image_info(data, model_type, model_name)
return f"{download_url}{'&' if '?' in download_url else '?'}token={CIVITAI_TOKEN}", download_url, model_type, model_name, image_url, image_name, data
''' Main Download Code '''
def strip_(url):
if 'github.com' in url:
return url.replace('/blob/', '/raw/')
elif "huggingface.co" in url:
url = url.replace('/blob/', '/resolve/')
return url.split('?')[0] if '?' in url else url
return url
def download(url):
links_and_paths = [link_or_path.strip() for link_or_path in url.split(',') if link_or_path.strip()]
for link_or_path in links_and_paths:
if any(link_or_path.lower().startswith(prefix) for prefix in PREFIXES):
handle_manual(link_or_path)
else:
url, dst_dir, file_name = link_or_path.split()
manual_download(url, dst_dir, file_name)
# Unpuck ZIPs Files
for directory in directories:
for root, _, files in os.walk(directory):
for file in files:
if file.endswith(".zip"):
zip_path = os.path.join(root, file)
extract_path = os.path.splitext(zip_path)[0]
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(extract_path)
os.remove(zip_path)
def handle_manual(url):
url_parts = url.split(':', 1)
prefix, path = url_parts[0], url_parts[1]
file_name_match = re.search(r'\[(.*?)\]', path)
file_name = file_name_match.group(1) if file_name_match else None
if file_name:
path = re.sub(r'\[.*?\]', '', path)
if prefix in PREFIXES:
dir = PREFIXES[prefix]
if prefix != "extension":
try:
manual_download(path, dir, file_name=file_name, prefix=prefix)
except Exception as e:
print(f"Error downloading file: {e}")
else:
extension_repo.append((path, file_name))
def manual_download(url, dst_dir, file_name, prefix=None):
header_option = f"--header={user_header}"
aria2c_header = "--header='User-Agent: Mozilla/5.0' --allow-overwrite=true"
aria2_args = "--optimize-concurrent-downloads --console-log-level=error --summary-interval=10 --stderr=true -c -x16 -s16 -k1M -j5"
clean_url = strip_(url)
if 'civitai' in url:
url, clean_url, model_type, file_name, image_url, image_name, data = CivitAi_API(url, file_name)
if image_url and image_name:
command = ["aria2c"] + aria2_args.split() + ["-d", dst_dir, "-o", image_name, image_url]
subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
elif 'github' in url or "huggingface.co" in url:
basename = url.split("/")[-1] if file_name is None else file_name
""" Formatted info output """
model_name_or_basename = file_name if file_name else basename
try:
format_output(clean_url or url, dst_dir, model_name_or_basename, image_name, image_url)
except UnboundLocalError:
format_output(clean_url or url, dst_dir, model_name_or_basename, None, None)
# =====================
def run_aria2c(url, dst_dir, file_name=None, args="", header=""):
file_path = os.path.join(dst_dir, file_name) # replaces config files
if os.path.exists(file_path) and prefix == 'config':
os.remove(file_path)
out = f"-o '{file_name}'" if file_name else ""
get_ipython().system("aria2c {header} {args} -d {dst_dir} {out} '{url}'")
# -- Google Drive --
if 'drive.google' in url:
if not globals().get('have_drive_link', False):
os.system("pip install -U gdown > /dev/null")
globals()['have_drive_link'] = True
if 'folders' in url:
os.system(f"gdown --folder \"{url}\" -O {dst_dir} --fuzzy -c")
else:
out_path = f"{dst_dir}/{file_name}" if file_name else dst_dir
os.system(f"gdown \"{url}\" -O {out_path} --fuzzy -c")
# -- GitHub or Hugging Face --
elif 'github' in url or 'huggingface' in url:
run_aria2c(clean_url, dst_dir, basename, aria2_args, header_option if 'huggingface' in url else '')
# -- Other HTTP/Sources --
elif 'http' in url:
run_aria2c(url, dst_dir, file_name, aria2_args, aria2c_header)
''' SubModels - Added URLs '''
def add_submodels(selection, num_selection, model_dict, dst_dir):
if selection == "none":
return []
if selection == "ALL":
all_models = []
for models in model_dict.values():
all_models.extend(models)
selected_models = all_models
else:
selected_models = model_dict[selection]
selected_nums = map(int, num_selection.replace(',', '').split())
for num in selected_nums:
if 1 <= num <= len(model_dict):
name = list(model_dict)[num - 1]
selected_models.extend(model_dict[name])
unique_models = list({model['name']: model for model in selected_models}.values())
for model in unique_models:
model['dst_dir'] = dst_dir
return unique_models
def handle_submodels(selection, num_selection, model_dict, dst_dir, url):
submodels = add_submodels(selection, num_selection, model_dict, dst_dir)
for submodel in submodels:
if not inpainting_model and "inpainting" in submodel['name']:
continue
url += f"{submodel['url']} {submodel['dst_dir']} {submodel['name']}, "
return url
url = handle_submodels(model, model_num, model_list, models_dir, url)
url = handle_submodels(vae, vae_num, vae_list, vaes_dir, url)
url = handle_submodels(controlnet, controlnet_num, controlnet_list, control_dir, url)
''' file.txt - added urls '''
def process_file_download(file_url, PREFIXES, unique_urls):
files_urls = ""
if file_url.startswith("http"):
if "blob" in file_url:
file_url = file_url.replace("blob", "raw")
response = requests.get(file_url)
lines = response.text.split('\n')
else:
with open(file_url, 'r') as file:
lines = file.readlines()
current_tag = None
for line in lines:
line = line.strip()
if any(f'# {tag}' in line.lower() for tag in PREFIXES):
current_tag = next((tag for tag in PREFIXES if tag in line.lower()))
urls = [url.split('#')[0].strip() for url in line.split(',')] # filter urls
for url in urls:
filter_url = url.split('[')[0] # same url filter
if url.startswith("http") and filter_url not in unique_urls:
files_urls += f"{current_tag}:{url}, "
unique_urls.add(filter_url)
return files_urls
file_urls = ""
unique_urls = set()
if custom_file_urls:
for custom_file_url in custom_file_urls.replace(',', '').split():
if not custom_file_url.endswith('.txt'):
custom_file_url += '.txt'
if not custom_file_url.startswith('http'):
if not custom_file_url.startswith(root_path):
custom_file_url = f'{root_path}/{custom_file_url}'
try:
file_urls += process_file_download(custom_file_url, PREFIXES, unique_urls)
except FileNotFoundError:
pass
# url prefixing
urls = (Model_url, Vae_url, LoRA_url, Embedding_url, Extensions_url)
prefixed_urls = (f"{prefix}:{url}" for prefix, url in zip(PREFIXES.keys(), urls) if url for url in url.replace(',', '').split())
url += ", ".join(prefixed_urls) + ", " + file_urls
if detailed_download == "on":
print("\n\n\033[33m# ====== Detailed Download ====== #\n\033[0m")
download(url)
print("\n\033[33m# =============================== #\n\033[0m")
else:
with capture.capture_output() as cap:
download(url)
del cap
print("\r🏁 Download Complete!" + " "*15)
# Cleaning shit after downloading...
get_ipython().system('find {webui_path} \\( -type d \\( -name ".ipynb_checkpoints" -o -name ".aria2" \\) -o -type f -name "*.aria2" \\) -exec rm -r {{}} \\; >/dev/null 2>&1')
## Install of Custom extensions
if len(extension_repo) > 0:
print("✨ Installing custom extensions...", end='')
with capture.capture_output() as cap:
for repo, repo_name in extension_repo:
if not repo_name:
repo_name = repo.split('/')[-1]
get_ipython().system('cd {extensions_dir} && git clone {repo} {repo_name} && cd {repo_name} && git fetch')
del cap
print(f"\rπŸ“¦ Installed '{len(extension_repo)}', Custom extensions!")
## List Models and stuff V2
if detailed_download == "off":
print("\n\n\033[33mIf you don't see any downloaded files, enable the 'Detailed Downloads' feature in the widget.")
get_ipython().run_line_magic('run', '{root_path}/file_cell/special/dl_display_results.py # display widgets result')