|
|
|
|
|
from directory_setup import * |
|
from models_data import model_list, vae_list, controlnet_list |
|
|
|
import os |
|
import re |
|
import time |
|
import json |
|
import shutil |
|
import zipfile |
|
import requests |
|
import subprocess |
|
from datetime import timedelta |
|
from subprocess import getoutput |
|
from IPython.utils import capture |
|
from IPython.display import clear_output |
|
from urllib.parse import urlparse, parse_qs |
|
|
|
|
|
|
|
env = os.getenv('ENV_NAME') |
|
root_path = os.getenv('ROOT_PATH') |
|
webui_path = os.getenv('WEBUI_PATH') |
|
free_plan = os.getenv('FREE_PLAN') |
|
|
|
UI = os.getenv('SDW_UI') |
|
OLD_UI = os.getenv('SDW_OLD_UI') |
|
|
|
|
|
|
|
def load_settings(path): |
|
if os.path.exists(path): |
|
with open(path, 'r') as file: |
|
return json.load(file) |
|
return {} |
|
|
|
settings = load_settings(f'{root_path}/settings.json') |
|
|
|
VARIABLES = [ |
|
'model', 'model_num', 'inpainting_model', |
|
'vae', 'vae_num', 'latest_webui', 'latest_exstensions', |
|
'change_webui', 'detailed_download', 'controlnet', |
|
'controlnet_num', 'commit_hash', 'huggingface_token', |
|
'ngrok_token', 'zrok_token', 'commandline_arguments', |
|
'Model_url', 'Vae_url', 'LoRA_url', 'Embedding_url', |
|
'Extensions_url', 'custom_file_urls' |
|
] |
|
|
|
locals().update({key: settings.get(key) for key in VARIABLES}) |
|
|
|
|
|
|
|
flag_file = f"{root_path}/libraries_installed.txt" |
|
|
|
if not os.path.exists(flag_file): |
|
print("πΏ Installing the libraries, it's going to take a while:\n") |
|
|
|
install_lib = { |
|
|
|
"aria2": "pip install aria2", |
|
"localtunnel": "npm install -g localtunnel", |
|
} |
|
if controlnet != 'none': |
|
install_lib["insightface"] = "pip install insightface" |
|
|
|
additional_libs = { |
|
"Google Colab": { |
|
"xformers": "pip install xformers==0.0.27 --no-deps" |
|
}, |
|
"Kaggle": { |
|
"xformers": "pip install xformers==0.0.26.post1", |
|
|
|
|
|
} |
|
} |
|
if env in additional_libs: |
|
install_lib.update(additional_libs[env]) |
|
|
|
|
|
for index, (package, install_cmd) in enumerate(install_lib.items(), start=1): |
|
print(f"\r[{index}/{len(install_lib)}] \033[32m>>\033[0m Installing \033[33m{package}\033[0m..." + " "*35, end='') |
|
subprocess.run(install_cmd, shell=True, capture_output=True) |
|
|
|
|
|
with capture.capture_output() as cap: |
|
get_ipython().system('curl -s -OL https://github.com/DEX-1101/sd-webui-notebook/raw/main/res/new_tunnel --output-dir {root_path}') |
|
get_ipython().system('curl -s -Lo /usr/bin/cl https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-amd64 && chmod +x /usr/bin/cl') |
|
get_ipython().system('curl -sLO https://github.com/openziti/zrok/releases/download/v0.4.32/zrok_0.4.32_linux_amd64.tar.gz && tar -xzf zrok_0.4.32_linux_amd64.tar.gz -C /usr/bin && rm -f zrok_0.4.32_linux_amd64.tar.gz') |
|
del cap |
|
|
|
clear_output() |
|
|
|
|
|
with open(flag_file, "w") as f: |
|
f.write(">W<'") |
|
|
|
print("πͺ Libraries are installed!" + " "*35) |
|
time.sleep(2) |
|
clear_output() |
|
|
|
|
|
|
|
|
|
if "START_COLAB" in os.environ: |
|
start_colab = int(os.environ["START_COLAB"]) |
|
else: |
|
start_colab = int(time.time()) - 5 |
|
os.environ["START_COLAB"] = str(start_colab) |
|
|
|
|
|
def _remove_dir(directory_path): |
|
if directory_path and os.path.exists(directory_path): |
|
try: |
|
shutil.rmtree(directory_path) |
|
except Exception: |
|
get_ipython().system('rm -rf {directory_path}') |
|
|
|
|
|
temporarily_dir = f'{root_path}/temp_dir' |
|
|
|
def copy_items_with_replace(src_base, dst_base): |
|
items_to_copy = [ |
|
'embeddings', |
|
'models/Stable-diffusion', |
|
'models/VAE', |
|
'models/Lora', |
|
'models/ControlNet' |
|
] |
|
|
|
print("Moving files...", end='') |
|
time.sleep(1) |
|
for item in items_to_copy: |
|
src = os.path.join(src_base, item) |
|
dst = os.path.join(dst_base, item) |
|
|
|
if os.path.exists(src): |
|
if os.path.exists(dst): |
|
_remove_dir(dst) |
|
os.makedirs(os.path.dirname(dst), exist_ok=True) |
|
shutil.move(src, dst) |
|
print("\rπ₯ Files moved!" + " "*15) |
|
|
|
def download_and_unpack(url, dest_path): |
|
aria2_args = "--console-log-level=error -c -x 16 -s 16 -k 1M" |
|
get_ipython().system("aria2c {aria2_args} '{url}' -o repo.zip") |
|
get_ipython().system('unzip -q -o repo.zip -d {dest_path}') |
|
get_ipython().system('rm -rf repo.zip') |
|
|
|
def handle_colab_timer(webui_path, timer_colab): |
|
timer_file_path = os.path.join(webui_path, 'static', 'colabTimer.txt') |
|
if not os.path.exists(timer_file_path): |
|
with open(timer_file_path, 'w') as timer_file: |
|
timer_file.write(str(timer_colab)) |
|
else: |
|
with open(timer_file_path, 'r') as timer_file: |
|
timer_colab = float(timer_file.read()) |
|
return timer_colab |
|
|
|
def unpack_webui(): |
|
start_install = time.time() |
|
print(f"β Unpacking Stable Diffusion{' (Forge)' if UI == 'Forge' else ''}...", end='') |
|
|
|
with capture.capture_output() as cap: |
|
download_url = "https://huggingface.co/NagisaNao/fast_repo/resolve/main/FULL_REPO.zip" |
|
if UI == 'Forge': |
|
download_url = "https://huggingface.co/NagisaNao/fast_repo/resolve/main/FULL_REPO_forge.zip" |
|
|
|
download_and_unpack(download_url, webui_path) |
|
|
|
get_ipython().system(f'echo -n {start_colab} > {webui_path}/static/colabTimer.txt') |
|
del cap |
|
|
|
install_time = time.time() - start_install |
|
minutes, seconds = divmod(int(install_time), 60) |
|
print(f"\rπ Unpacking complete! For {minutes:02}:{seconds:02} β‘" + " "*15) |
|
|
|
if os.path.exists(temporarily_dir): |
|
copy_items_with_replace(temporarily_dir, webui_path) |
|
_remove_dir(temporarily_dir) |
|
|
|
|
|
if os.path.exists(webui_path): |
|
if UI != OLD_UI: |
|
print(f'Switching the WebUI from \033[33m{OLD_UI}\033[0m to \033[33m{UI}\033[0m:\n') |
|
copy_items_with_replace(webui_path, temporarily_dir) |
|
_remove_dir(webui_path) |
|
os.environ['SDW_OLD_UI'] = UI |
|
time.sleep(2) |
|
clear_output() |
|
|
|
if not os.path.exists(webui_path): |
|
unpack_webui() |
|
else: |
|
print("π All unpacked... Skip. β‘") |
|
timer_colab = handle_colab_timer(webui_path, start_colab) |
|
elapsed_time = str(timedelta(seconds=time.time() - timer_colab)).split('.')[0] |
|
print(f"βοΈ You have been conducting this session for - \033[33m{elapsed_time}\033[0m") |
|
|
|
|
|
|
|
if latest_webui or latest_exstensions: |
|
action = "WebUI and Extensions" if latest_webui and latest_exstensions else ("WebUI" if latest_webui else "Extensions") |
|
print(f"βοΈ Updating {action}...", end='') |
|
with capture.capture_output() as cap: |
|
get_ipython().system('git config --global user.email "you@example.com"') |
|
get_ipython().system('git config --global user.name "Your Name"') |
|
|
|
|
|
if latest_webui: |
|
get_ipython().run_line_magic('cd', '{webui_path}') |
|
get_ipython().system('git restore .') |
|
get_ipython().system('git pull -X theirs --rebase --autostash') |
|
|
|
|
|
if latest_exstensions: |
|
get_ipython().system('{\'for dir in \' + webui_path + \'/extensions/*/; do cd \\"$dir\\" && git reset --hard && git pull; done\'}') |
|
del cap |
|
print(f"\r⨠Updating {action} Completed!") |
|
|
|
|
|
|
|
anxety_repos = "https://huggingface.co/NagisaNao/fast_repo/resolve/main" |
|
|
|
with capture.capture_output() as cap: |
|
|
|
get_ipython().system("sed -i '521s/open=\\(False\\|True\\)/open=False/' {webui_path}/extensions/Umi-AI-Wildcards/scripts/wildcard_recursive.py # Closed accordion by default") |
|
|
|
get_ipython().system("sed -i '9,37d' {webui_path}/extensions/Encrypt-Image/javascript/encrypt_images_info.js # Removes the weird text in webui") |
|
|
|
get_ipython().system('wget -O {webui_path}/extensions/additional-networks/scripts/metadata_editor.py {anxety_repos}/extensions/Additional-Networks/fix/metadata_editor.py # Fixing an error due to old style') |
|
del cap |
|
|
|
|
|
|
|
if commit_hash: |
|
print('β³ Time machine activation...', end="") |
|
with capture.capture_output() as cap: |
|
get_ipython().run_line_magic('cd', '{webui_path}') |
|
get_ipython().system('git config --global user.email "you@example.com"') |
|
get_ipython().system('git config --global user.name "Your Name"') |
|
get_ipython().system('git reset --hard {commit_hash}') |
|
del cap |
|
print(f"\rβοΈ The time machine has been activated! Current commit: \033[34m{commit_hash}\033[0m") |
|
|
|
|
|
|
|
print("π¦ Downloading models and stuff...", end='') |
|
|
|
url = "" |
|
PREFIXES = { |
|
"model": models_dir, |
|
"vae": vaes_dir, |
|
"lora": loras_dir, |
|
"embed": embeddings_dir, |
|
"extension": extensions_dir, |
|
"control": control_dir, |
|
"adetailer": adetailer_dir, |
|
"config": webui_path |
|
} |
|
|
|
extension_repo = [] |
|
directories = [value for key, value in PREFIXES.items()] |
|
get_ipython().system('mkdir -p {" ".join(directories)}') |
|
|
|
hf_token = huggingface_token if huggingface_token else "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" |
|
user_header = f"\"Authorization: Bearer {hf_token}\"" |
|
|
|
''' Formatted Info Output ''' |
|
|
|
from math import floor |
|
|
|
def center_text(text, terminal_width=45): |
|
padding = (terminal_width - len(text)) // 2 |
|
return f"\033[1m\033[36m{' ' * padding}{text}{' ' * padding}\033[0m\033[32m" |
|
|
|
def format_output(url, dst_dir, file_name, image_name=None, image_url=None): |
|
info = center_text(f"[{file_name.split('.')[0]}]") |
|
separation_line = '\033[32m' + '---' * 20 |
|
|
|
print(f"\n{separation_line}{info}{separation_line}") |
|
print(f"\033[33mURL: \033[34m{url}") |
|
print(f"\033[33mSAVE DIR: \033[34m{dst_dir}") |
|
print(f"\033[33mFILE NAME: \033[34m{file_name}\033[0m") |
|
|
|
if 'civitai' in url and image_url: |
|
print(f"\033[32m[Preview DL]:\033[0m {image_name} - {image_url}\n") |
|
|
|
''' GET CivitAi API - DATA ''' |
|
|
|
def CivitAi_API(url, file_name=None): |
|
SUPPORT_TYPES = ('Checkpoint', 'TextualInversion', 'LORA') |
|
CIVITAI_TOKEN = "62c0c5956b2f9defbd844d754000180b" |
|
|
|
url = url.split('?token=')[0] if '?token=' in url else url |
|
url = url.replace('?type=', f'?token={CIVITAI_TOKEN}&type=') if '?type=' in url else f"{url}?token={CIVITAI_TOKEN}" |
|
|
|
def get_model_data(url): |
|
base_url = "https://civitai.com/api/v1" |
|
try: |
|
if "civitai.com/models/" in url: |
|
if '?modelVersionId=' in url: |
|
version_id = url.split('?modelVersionId=')[1] |
|
else: |
|
model_id = url.split('/models/')[1].split('/')[0] |
|
model_data = requests.get(f"{base_url}/models/{model_id}").json() |
|
version_id = model_data['modelVersions'][0].get('id') |
|
else: |
|
version_id = url.split('/models/')[1].split('/')[0] |
|
|
|
return requests.get(f"{base_url}/model-versions/{version_id}").json() |
|
except (KeyError, IndexError, requests.RequestException) as e: |
|
return None |
|
|
|
data = get_model_data(url) |
|
|
|
if not data: |
|
print("\033[31m[Data Info]:\033[0m Failed to retrieve data from the API.\n") |
|
return 'None', None, None, None, None, None, None |
|
|
|
def extract_model_info(url, data): |
|
model_type = data['model']['type'] |
|
model_name = data['files'][0]['name'] |
|
|
|
if 'type=' in url: |
|
url_model_type = parse_qs(urlparse(url).query).get('type', [''])[0].lower() |
|
if 'vae' in url_model_type: |
|
model_type = data['files'][1]['type'] |
|
model_name = data['files'][1]['name'] |
|
|
|
return model_type, model_name |
|
|
|
model_type, model_name = extract_model_info(url, data) |
|
model_name = file_name or model_name |
|
|
|
def get_download_url(data, model_type): |
|
if any(t.lower() in model_type.lower() for t in SUPPORT_TYPES): |
|
return data['files'][0]['downloadUrl'] |
|
|
|
return data['files'][1]['downloadUrl'] if 'type' in url else data['files'][0]['downloadUrl'] |
|
|
|
download_url = get_download_url(data, model_type) |
|
|
|
def get_image_info(data, model_type, model_name): |
|
if not any(t in model_type for t in SUPPORT_TYPES): |
|
return None, None |
|
|
|
for image in data.get('images', []): |
|
if image['nsfwLevel'] >= 4 and env == 'Kaggle': |
|
continue |
|
image_url = image['url'] |
|
image_extension = image_url.split('.')[-1] |
|
image_name = f"{model_name.split('.')[0]}.preview.{image_extension}" if image_url else None |
|
return image_url, image_name |
|
|
|
return None, None |
|
|
|
image_url, image_name = get_image_info(data, model_type, model_name) |
|
|
|
return f"{download_url}{'&' if '?' in download_url else '?'}token={CIVITAI_TOKEN}", download_url, model_type, model_name, image_url, image_name, data |
|
|
|
''' Main Download Code ''' |
|
|
|
def strip_(url): |
|
if 'github.com' in url: |
|
return url.replace('/blob/', '/raw/') |
|
elif "huggingface.co" in url: |
|
url = url.replace('/blob/', '/resolve/') |
|
return url.split('?')[0] if '?' in url else url |
|
return url |
|
|
|
def download(url): |
|
links_and_paths = [link_or_path.strip() for link_or_path in url.split(',') if link_or_path.strip()] |
|
|
|
for link_or_path in links_and_paths: |
|
if any(link_or_path.lower().startswith(prefix) for prefix in PREFIXES): |
|
handle_manual(link_or_path) |
|
else: |
|
url, dst_dir, file_name = link_or_path.split() |
|
manual_download(url, dst_dir, file_name) |
|
|
|
|
|
for directory in directories: |
|
for root, _, files in os.walk(directory): |
|
for file in files: |
|
if file.endswith(".zip"): |
|
zip_path = os.path.join(root, file) |
|
extract_path = os.path.splitext(zip_path)[0] |
|
with zipfile.ZipFile(zip_path, 'r') as zip_ref: |
|
zip_ref.extractall(extract_path) |
|
os.remove(zip_path) |
|
|
|
def handle_manual(url): |
|
url_parts = url.split(':', 1) |
|
prefix, path = url_parts[0], url_parts[1] |
|
|
|
file_name_match = re.search(r'\[(.*?)\]', path) |
|
file_name = file_name_match.group(1) if file_name_match else None |
|
if file_name: |
|
path = re.sub(r'\[.*?\]', '', path) |
|
|
|
if prefix in PREFIXES: |
|
dir = PREFIXES[prefix] |
|
if prefix != "extension": |
|
try: |
|
manual_download(path, dir, file_name=file_name, prefix=prefix) |
|
except Exception as e: |
|
print(f"Error downloading file: {e}") |
|
else: |
|
extension_repo.append((path, file_name)) |
|
|
|
def manual_download(url, dst_dir, file_name, prefix=None): |
|
header_option = f"--header={user_header}" |
|
aria2c_header = "--header='User-Agent: Mozilla/5.0' --allow-overwrite=true" |
|
aria2_args = "--optimize-concurrent-downloads --console-log-level=error --summary-interval=10 --stderr=true -c -x16 -s16 -k1M -j5" |
|
|
|
clean_url = strip_(url) |
|
|
|
if 'civitai' in url: |
|
url, clean_url, model_type, file_name, image_url, image_name, data = CivitAi_API(url, file_name) |
|
if image_url and image_name: |
|
command = ["aria2c"] + aria2_args.split() + ["-d", dst_dir, "-o", image_name, image_url] |
|
subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
|
|
|
elif 'github' in url or "huggingface.co" in url: |
|
basename = url.split("/")[-1] if file_name is None else file_name |
|
|
|
""" Formatted info output """ |
|
model_name_or_basename = file_name if file_name else basename |
|
try: |
|
format_output(clean_url or url, dst_dir, model_name_or_basename, image_name, image_url) |
|
except UnboundLocalError: |
|
format_output(clean_url or url, dst_dir, model_name_or_basename, None, None) |
|
|
|
|
|
def run_aria2c(url, dst_dir, file_name=None, args="", header=""): |
|
file_path = os.path.join(dst_dir, file_name) |
|
if os.path.exists(file_path) and prefix == 'config': |
|
os.remove(file_path) |
|
|
|
out = f"-o '{file_name}'" if file_name else "" |
|
get_ipython().system("aria2c {header} {args} -d {dst_dir} {out} '{url}'") |
|
|
|
|
|
if 'drive.google' in url: |
|
if not globals().get('have_drive_link', False): |
|
os.system("pip install -U gdown > /dev/null") |
|
globals()['have_drive_link'] = True |
|
|
|
if 'folders' in url: |
|
os.system(f"gdown --folder \"{url}\" -O {dst_dir} --fuzzy -c") |
|
else: |
|
out_path = f"{dst_dir}/{file_name}" if file_name else dst_dir |
|
os.system(f"gdown \"{url}\" -O {out_path} --fuzzy -c") |
|
|
|
|
|
elif 'github' in url or 'huggingface' in url: |
|
run_aria2c(clean_url, dst_dir, basename, aria2_args, header_option if 'huggingface' in url else '') |
|
|
|
|
|
elif 'http' in url: |
|
run_aria2c(url, dst_dir, file_name, aria2_args, aria2c_header) |
|
|
|
''' SubModels - Added URLs ''' |
|
|
|
def add_submodels(selection, num_selection, model_dict, dst_dir): |
|
if selection == "none": |
|
return [] |
|
if selection == "ALL": |
|
all_models = [] |
|
for models in model_dict.values(): |
|
all_models.extend(models) |
|
selected_models = all_models |
|
else: |
|
selected_models = model_dict[selection] |
|
selected_nums = map(int, num_selection.replace(',', '').split()) |
|
for num in selected_nums: |
|
if 1 <= num <= len(model_dict): |
|
name = list(model_dict)[num - 1] |
|
selected_models.extend(model_dict[name]) |
|
|
|
unique_models = list({model['name']: model for model in selected_models}.values()) |
|
for model in unique_models: |
|
model['dst_dir'] = dst_dir |
|
|
|
return unique_models |
|
|
|
def handle_submodels(selection, num_selection, model_dict, dst_dir, url): |
|
submodels = add_submodels(selection, num_selection, model_dict, dst_dir) |
|
for submodel in submodels: |
|
if not inpainting_model and "inpainting" in submodel['name']: |
|
continue |
|
url += f"{submodel['url']} {submodel['dst_dir']} {submodel['name']}, " |
|
return url |
|
|
|
url = handle_submodels(model, model_num, model_list, models_dir, url) |
|
url = handle_submodels(vae, vae_num, vae_list, vaes_dir, url) |
|
url = handle_submodels(controlnet, controlnet_num, controlnet_list, control_dir, url) |
|
|
|
''' file.txt - added urls ''' |
|
|
|
def process_file_download(file_url, PREFIXES, unique_urls): |
|
files_urls = "" |
|
|
|
if file_url.startswith("http"): |
|
if "blob" in file_url: |
|
file_url = file_url.replace("blob", "raw") |
|
response = requests.get(file_url) |
|
lines = response.text.split('\n') |
|
else: |
|
with open(file_url, 'r') as file: |
|
lines = file.readlines() |
|
|
|
current_tag = None |
|
for line in lines: |
|
line = line.strip() |
|
if any(f'# {tag}' in line.lower() for tag in PREFIXES): |
|
current_tag = next((tag for tag in PREFIXES if tag in line.lower())) |
|
|
|
urls = [url.split('#')[0].strip() for url in line.split(',')] |
|
for url in urls: |
|
filter_url = url.split('[')[0] |
|
|
|
if url.startswith("http") and filter_url not in unique_urls: |
|
files_urls += f"{current_tag}:{url}, " |
|
unique_urls.add(filter_url) |
|
|
|
return files_urls |
|
|
|
file_urls = "" |
|
unique_urls = set() |
|
|
|
if custom_file_urls: |
|
for custom_file_url in custom_file_urls.replace(',', '').split(): |
|
if not custom_file_url.endswith('.txt'): |
|
custom_file_url += '.txt' |
|
if not custom_file_url.startswith('http'): |
|
if not custom_file_url.startswith(root_path): |
|
custom_file_url = f'{root_path}/{custom_file_url}' |
|
|
|
try: |
|
file_urls += process_file_download(custom_file_url, PREFIXES, unique_urls) |
|
except FileNotFoundError: |
|
pass |
|
|
|
|
|
urls = (Model_url, Vae_url, LoRA_url, Embedding_url, Extensions_url) |
|
prefixed_urls = (f"{prefix}:{url}" for prefix, url in zip(PREFIXES.keys(), urls) if url for url in url.replace(',', '').split()) |
|
url += ", ".join(prefixed_urls) + ", " + file_urls |
|
|
|
if detailed_download == "on": |
|
print("\n\n\033[33m# ====== Detailed Download ====== #\n\033[0m") |
|
download(url) |
|
print("\n\033[33m# =============================== #\n\033[0m") |
|
else: |
|
with capture.capture_output() as cap: |
|
download(url) |
|
del cap |
|
|
|
print("\rπ Download Complete!" + " "*15) |
|
|
|
|
|
|
|
get_ipython().system('find {webui_path} \\( -type d \\( -name ".ipynb_checkpoints" -o -name ".aria2" \\) -o -type f -name "*.aria2" \\) -exec rm -r {{}} \\; >/dev/null 2>&1') |
|
|
|
|
|
|
|
if len(extension_repo) > 0: |
|
print("β¨ Installing custom extensions...", end='') |
|
with capture.capture_output() as cap: |
|
for repo, repo_name in extension_repo: |
|
if not repo_name: |
|
repo_name = repo.split('/')[-1] |
|
get_ipython().system('cd {extensions_dir} && git clone {repo} {repo_name} && cd {repo_name} && git fetch') |
|
del cap |
|
print(f"\rπ¦ Installed '{len(extension_repo)}', Custom extensions!") |
|
|
|
|
|
|
|
if detailed_download == "off": |
|
print("\n\n\033[33mIf you don't see any downloaded files, enable the 'Detailed Downloads' feature in the widget.") |
|
|
|
get_ipython().run_line_magic('run', '{root_path}/file_cell/special/dl_display_results.py # display widgets result') |
|
|
|
|