| |
|
|
| from directory_setup import * |
| from models_data import model_list, vae_list, controlnet_list |
|
|
| import os |
| import re |
| import time |
| import json |
| import shutil |
| import zipfile |
| import requests |
| import subprocess |
| from datetime import timedelta |
| from subprocess import getoutput |
| from IPython.utils import capture |
| from IPython.display import clear_output |
| from urllib.parse import urlparse, parse_qs |
|
|
|
|
| |
| env = os.getenv('ENV_NAME') |
| root_path = os.getenv('ROOT_PATH') |
| webui_path = os.getenv('WEBUI_PATH') |
| free_plan = os.getenv('FREE_PLAN') |
|
|
| UI = os.getenv('SDW_UI') |
| OLD_UI = os.getenv('SDW_OLD_UI') |
|
|
| os.chdir(root_path) |
|
|
|
|
| |
| def load_settings(path): |
| if os.path.exists(path): |
| with open(path, 'r') as file: |
| return json.load(file) |
| return {} |
|
|
| settings = load_settings(f'{root_path}/settings.json') |
|
|
| VARIABLES = [ |
| 'model', 'model_num', 'inpainting_model', |
| 'vae', 'vae_num', 'latest_webui', 'latest_exstensions', |
| 'change_webui', 'detailed_download', 'controlnet', |
| 'controlnet_num', 'commit_hash', 'huggingface_token', |
| 'ngrok_token', 'zrok_token', 'commandline_arguments', |
| 'Model_url', 'Vae_url', 'LoRA_url', 'Embedding_url', |
| 'Extensions_url', 'custom_file_urls' |
| ] |
|
|
| locals().update({key: settings.get(key) for key in VARIABLES}) |
|
|
|
|
| |
| flag_file = f"{root_path}/libraries_installed.txt" |
|
|
| if not os.path.exists(flag_file): |
| print("💿 Установка библиотек, это займет какое-то время:\n") |
|
|
| install_lib = { |
| |
| "aria2": "pip install aria2", |
| "localtunnel": "npm install -g localtunnel", |
| } |
| if controlnet != 'none': |
| install_lib["insightface"] = "pip install insightface" |
|
|
| additional_libs = { |
| "Google Colab": { |
| "xformers": "pip install xformers==0.0.27 --no-deps" |
| }, |
| "Kaggle": { |
| "xformers": "pip install xformers==0.0.26.post1", |
| |
| |
| } |
| } |
| if env in additional_libs: |
| install_lib.update(additional_libs[env]) |
|
|
| |
| for index, (package, install_cmd) in enumerate(install_lib.items(), start=1): |
| print(f"\r[{index}/{len(install_lib)}] \033[32m>>\033[0m Installing \033[33m{package}\033[0m..." + " "*35, end='') |
| subprocess.run(install_cmd, shell=True, capture_output=True) |
|
|
| |
| with capture.capture_output(): |
| get_ipython().system('curl -s -OL https://github.com/DEX-1101/sd-webui-notebook/raw/main/res/new_tunnel --output-dir {root_path}') |
| get_ipython().system('curl -s -Lo /usr/bin/cl https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-amd64 && chmod +x /usr/bin/cl') |
| get_ipython().system('curl -sLO https://github.com/openziti/zrok/releases/download/v0.4.32/zrok_0.4.32_linux_amd64.tar.gz && tar -xzf zrok_0.4.32_linux_amd64.tar.gz -C /usr/bin && rm -f zrok_0.4.32_linux_amd64.tar.gz') |
|
|
| clear_output() |
|
|
| |
| with open(flag_file, "w") as f: |
| f.write(">W<'") |
|
|
| print("🍪 Библиотеки установлены!" + " "*35) |
| time.sleep(2) |
| clear_output() |
|
|
|
|
| |
| |
| start_colab = int(os.environ.get("START_COLAB", time.time() - 5)) |
| os.environ["START_COLAB"] = str(start_colab) |
|
|
| def download_cfg_files(file_paths, destination_path): |
| base_url = "https://huggingface.co/NagisaNao/SD-CONFIGS/resolve/main" |
| for filename in file_paths: |
| file_name = filename.split('/')[-1] |
| get_ipython().system('wget -O {destination_path}/{file_name} {base_url}/{filename}') |
|
|
| def cfg_download(): |
| common_files = ["styles.csv"] |
| a1111_files = ["A1111/config.json", "A1111/ui-config.json"] |
| forge_files = ["reForge/config.json", "reForge/ui-config.json"] |
|
|
| with capture.capture_output(): |
| download_cfg_files(common_files, webui_path) |
| ui_files = a1111_files if UI == 'A1111' else forge_files |
| download_cfg_files(ui_files, webui_path) |
|
|
| def remove_dir(directory_path): |
| if directory_path and os.path.exists(directory_path): |
| try: |
| shutil.rmtree(directory_path) |
| except Exception: |
| get_ipython().system('rm -rf {directory_path}') |
|
|
| TEMPORARY_DIR = f'{root_path}/temp_dir' |
| def copy_items_with_replace(src_base, dst_base): |
| items_to_copy = [ |
| 'embeddings', |
| 'models/Stable-diffusion', |
| 'models/VAE', |
| 'models/Lora', |
| 'models/ControlNet' |
| ] |
|
|
| print("⌚ Перемещение файлов...", end='') |
| for item in items_to_copy: |
| src = os.path.join(src_base, item) |
| dst = os.path.join(dst_base, item) |
|
|
| if os.path.exists(src): |
| if os.path.exists(dst): |
| remove_dir(dst) |
| os.makedirs(os.path.dirname(dst), exist_ok=True) |
| shutil.move(src, dst) |
| print("\r🔥 Файлы перемещены!" + " "*15) |
|
|
| def handle_colab_timer(webui_path, timer_colab): |
| timer_file_path = os.path.join(webui_path, 'static', 'colabTimer.txt') |
| if not os.path.exists(timer_file_path): |
| with open(timer_file_path, 'w') as timer_file: |
| timer_file.write(str(timer_colab)) |
| else: |
| with open(timer_file_path, 'r') as timer_file: |
| timer_colab = float(timer_file.read()) |
| return timer_colab |
|
|
| def unpack_webui(): |
| start_install = time.time() |
| print(f"⌚ Распаковка Stable Diffusion{' (Forge)' if UI == 'Forge' else ''}...", end='') |
|
|
| with capture.capture_output(): |
| download_url = "https://huggingface.co/NagisaNao/fast_repo/resolve/main/FULL_REPO.zip" |
| if UI == 'Forge': |
| download_url = "https://huggingface.co/NagisaNao/fast_repo/resolve/main/FULL_REPO_forge.zip" |
|
|
| zip_path = f"{root_path}/repo.zip" |
| get_ipython().system('aria2c --console-log-level=error -c -x 16 -s 16 -k 1M {download_url} -d {root_path} -o repo.zip') |
| get_ipython().system('unzip -q -o {zip_path} -d {webui_path}') |
| get_ipython().system('rm -rf {zip_path}') |
|
|
| handle_colab_timer(webui_path, start_colab) |
|
|
| install_time = time.time() - start_install |
| minutes, seconds = divmod(int(install_time), 60) |
| print(f"\r🚀 Распаковка Завершена! За {minutes:02}:{seconds:02} ⚡" + " "*15) |
|
|
| if os.path.exists(TEMPORARY_DIR): |
| copy_items_with_replace(TEMPORARY_DIR, webui_path) |
| remove_dir(TEMPORARY_DIR) |
|
|
| |
| if os.path.exists(webui_path): |
| if UI != OLD_UI: |
| print(f'Переключение веб-интерфейса с \033[33m{OLD_UI}\033[0m на \033[33m{UI}\033[0m:') |
| copy_items_with_replace(webui_path, TEMPORARY_DIR) |
| remove_dir(webui_path) |
| os.environ['SDW_OLD_UI'] = UI |
| time.sleep(2) |
| clear_output() |
|
|
| if not os.path.exists(webui_path): |
| unpack_webui() |
| cfg_download() |
| else: |
| print("🚀 Все распакованно... Пропуск. ⚡") |
| timer_colab = handle_colab_timer(webui_path, start_colab) |
| elapsed_time = str(timedelta(seconds=time.time() - timer_colab)).split('.')[0] |
| print(f"⌚️ Вы проводите эту сессию в течение - \033[33m{elapsed_time}\033[0m") |
|
|
|
|
| |
| if latest_webui or latest_exstensions: |
| action = "WebUI и Расширений" if latest_webui and latest_exstensions else ("WebUI" if latest_webui else "Расширений") |
| print(f"⌚️ Обновление {action}...", end='') |
| with capture.capture_output(): |
| get_ipython().system('git config --global user.email "you@example.com"') |
| get_ipython().system('git config --global user.name "Your Name"') |
|
|
| |
| if latest_webui: |
| get_ipython().run_line_magic('cd', '{webui_path}') |
| get_ipython().system('git restore .') |
| get_ipython().system('git pull -X theirs --rebase --autostash') |
|
|
| |
| if latest_exstensions: |
| get_ipython().system('{\'for dir in \' + webui_path + \'/extensions/*/; do cd \\"$dir\\" && git reset --hard && git pull; done\'}') |
| print(f"\r✨ Обновление {action} Завершено!") |
|
|
|
|
| |
| anxety_repos = "https://huggingface.co/NagisaNao/fast_repo/resolve/main" |
| with capture.capture_output(): |
| |
| get_ipython().system("sed -i '521s/open=\\(False\\|True\\)/open=False/' {webui_path}/extensions/Umi-AI-Wildcards/scripts/wildcard_recursive.py # Closed accordion by default") |
| |
| get_ipython().system("sed -i '9,37d' {webui_path}/extensions/Encrypt-Image/javascript/encrypt_images_info.js # Removes the weird text in webui") |
|
|
|
|
| |
| if commit_hash: |
| print('⏳ Активация машины времени...', end="") |
| with capture.capture_output(): |
| get_ipython().run_line_magic('cd', '{webui_path}') |
| get_ipython().system('git config --global user.email "you@example.com"') |
| get_ipython().system('git config --global user.name "Your Name"') |
| get_ipython().system('git reset --hard {commit_hash}') |
| print(f"\r⌛️ Машина времени активированна! Текущий коммит: \033[34m{commit_hash}\033[0m") |
|
|
|
|
| |
| print("📦 Скачивание моделей и прочего...", end='') |
|
|
| extension_repo = [] |
| PREFIXES = { |
| "model": models_dir, |
| "vae": vaes_dir, |
| "lora": loras_dir, |
| "embed": embeddings_dir, |
| "extension": extensions_dir, |
| "control": control_dir, |
| "adetailer": adetailer_dir, |
| "config": webui_path |
| } |
| get_ipython().system('mkdir -p {" ".join(PREFIXES.values())}') |
|
|
| ''' Formatted Info Output ''' |
|
|
| def center_text(text, terminal_width=45): |
| padding = (terminal_width - len(text)) // 2 |
| return f"{' ' * padding}{text}{' ' * padding}" |
|
|
| def format_output(url, dst_dir, file_name, image_name=None, image_url=None): |
| info = center_text(f"[{file_name.split('.')[0]}]") |
| sep_line = '---' * 20 |
|
|
| print(f"\n\033[32m{sep_line}\033[36;1m{info}\033[32m{sep_line}\033[0m") |
| print(f"\033[33mURL: {url}") |
| print(f"\033[33mSAVE DIR: \033[34m{dst_dir}") |
| print(f"\033[33mFILE NAME: \033[34m{file_name}\033[0m") |
| if 'civitai' in url and image_url: |
| print(f"\033[32m[Preview DL]:\033[0m {image_name} - {image_url}\n") |
|
|
| ''' GET CivitAi API - DATA ''' |
|
|
| def CivitAi_API(url, file_name=None): |
| SUPPORT_TYPES = ('Checkpoint', 'TextualInversion', 'LORA') |
| CIVITAI_TOKEN = "62c0c5956b2f9defbd844d754000180b" |
|
|
| url = url.split('?token=')[0] if '?token=' in url else url |
| url = url.replace('?type=', f'?token={CIVITAI_TOKEN}&type=') if '?type=' in url else f"{url}?token={CIVITAI_TOKEN}" |
|
|
| def get_model_data(url): |
| base_url = "https://civitai.com/api/v1" |
| try: |
| if "civitai.com/models/" in url: |
| if '?modelVersionId=' in url: |
| version_id = url.split('?modelVersionId=')[1] |
| else: |
| model_id = url.split('/models/')[1].split('/')[0] |
| model_data = requests.get(f"{base_url}/models/{model_id}").json() |
| version_id = model_data['modelVersions'][0].get('id') |
| else: |
| version_id = url.split('/models/')[1].split('/')[0] |
|
|
| return requests.get(f"{base_url}/model-versions/{version_id}").json() |
| except (KeyError, IndexError, requests.RequestException) as e: |
| return None |
|
|
| data = get_model_data(url) |
|
|
| if not data: |
| print("\033[31m[Data Info]:\033[0m Failed to retrieve data from the API.\n") |
| return 'None', None, None, None, None, None, None |
|
|
| def get_model_info(url, data): |
| model_type = data['model']['type'] |
| model_name = data['files'][0]['name'] |
|
|
| if 'type=' in url: |
| url_model_type = parse_qs(urlparse(url).query).get('type', [''])[0].lower() |
| if 'vae' in url_model_type: |
| model_type = data['files'][1]['type'] |
| model_name = data['files'][1]['name'] |
|
|
| if file_name and '.' not in file_name: |
| file_extension = model_name.split('.')[-1] |
| model_name = f"{file_name}.{file_extension}" |
| elif file_name: |
| model_name = file_name |
|
|
| return model_type, model_name |
|
|
| def get_download_url(data, model_type): |
| if any(t.lower() in model_type.lower() for t in SUPPORT_TYPES): |
| return data['files'][0]['downloadUrl'] |
|
|
| return data['files'][1]['downloadUrl'] if 'type' in url else data['files'][0]['downloadUrl'] |
|
|
| def get_image_info(data, model_type, model_name): |
| if not any(t in model_type for t in SUPPORT_TYPES): |
| return None, None |
|
|
| for image in data.get('images', []): |
| if image['nsfwLevel'] >= 4 and env == 'Kaggle': |
| continue |
| image_url = image['url'] |
| image_extension = image_url.split('.')[-1] |
| image_name = f"{model_name.split('.')[0]}.preview.{image_extension}" if image_url else None |
| return image_url, image_name |
| return None, None |
|
|
| model_type, model_name = get_model_info(url, data) |
| download_url = get_download_url(data, model_type) |
| image_url, image_name = get_image_info(data, model_type, model_name) |
|
|
| return f"{download_url}{'&' if '?' in download_url else '?'}token={CIVITAI_TOKEN}", download_url, model_type, model_name, image_url, image_name, data |
|
|
| ''' Main Download Code ''' |
|
|
| def strip_(url): |
| if 'github.com' in url: |
| return url.replace('/blob/', '/raw/') |
| elif "huggingface.co" in url: |
| url = url.replace('/blob/', '/resolve/') |
| return url.split('?')[0] if '?' in url else url |
| return url |
|
|
| def download(url): |
| links_and_paths = [link_or_path.strip() for link_or_path in url.split(',') if link_or_path.strip()] |
|
|
| for link_or_path in links_and_paths: |
| if any(link_or_path.lower().startswith(prefix) for prefix in PREFIXES): |
| handle_manual(link_or_path) |
| else: |
| url, dst_dir, file_name = link_or_path.split() |
| manual_download(url, dst_dir, file_name) |
|
|
| |
| for directory in PREFIXES.values(): |
| for root, _, files in os.walk(directory): |
| for file in files: |
| if file.endswith(".zip"): |
| zip_path = os.path.join(root, file) |
| extract_path = os.path.splitext(zip_path)[0] |
| with zipfile.ZipFile(zip_path, 'r') as zip_ref: |
| zip_ref.extractall(extract_path) |
| os.remove(zip_path) |
|
|
| def handle_manual(url): |
| url_parts = url.split(':', 1) |
| prefix, path = url_parts[0], url_parts[1] |
|
|
| file_name_match = re.search(r'\[(.*?)\]', path) |
| file_name = file_name_match.group(1) if file_name_match else None |
| if file_name: |
| path = re.sub(r'\[.*?\]', '', path) |
|
|
| if prefix in PREFIXES: |
| dir = PREFIXES[prefix] |
| if prefix != "extension": |
| try: |
| manual_download(path, dir, file_name=file_name, prefix=prefix) |
| except Exception as e: |
| print(f"Error downloading file: {e}") |
| else: |
| extension_repo.append((path, file_name)) |
|
|
| def manual_download(url, dst_dir, file_name, prefix=None): |
| hf_header = f"--header='Authorization: Bearer {huggingface_token}'" if huggingface_token else "" |
| aria2c_header = "--header='User-Agent: Mozilla/5.0' --allow-overwrite=true" |
| aria2_args = "--optimize-concurrent-downloads --console-log-level=error --summary-interval=10 --stderr=true -c -x16 -s16 -k1M -j5" |
|
|
| clean_url = strip_(url) |
|
|
| if 'civitai' in url: |
| url, clean_url, model_type, file_name, image_url, image_name, data = CivitAi_API(url, file_name) |
| if image_url and image_name: |
| command = ["aria2c"] + aria2_args.split() + ["-d", dst_dir, "-o", image_name, image_url] |
| subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
|
|
| elif 'github' in url or 'huggingface.co' in url: |
| if file_name and '.' not in file_name: |
| file_extension = f"{clean_url.split('/')[-1].split('.', 1)[1]}" |
| file_name = f"{file_name}.{file_extension}" |
| if not file_name: |
| file_name = clean_url.split("/")[-1] |
|
|
| """ Formatted info output """ |
| try: |
| format_output(clean_url, dst_dir, file_name, image_name, image_url) |
| except UnboundLocalError: |
| format_output(clean_url, dst_dir, file_name, None, None) |
|
|
| |
| def run_aria2c(url, dst_dir, file_name=None, args="", header=""): |
| file_path = os.path.join(dst_dir, file_name) |
| if os.path.exists(file_path) and prefix == 'config': |
| os.remove(file_path) |
|
|
| out = f"-o '{file_name}'" if file_name else "" |
| get_ipython().system("aria2c {header} {args} -d {dst_dir} {out} '{url}'") |
|
|
| |
| if 'drive.google' in url: |
| if not globals().get('have_drive_link', False): |
| os.system("pip install -U gdown > /dev/null") |
| globals()['have_drive_link'] = True |
|
|
| if 'folders' in url: |
| os.system(f"gdown --folder \"{url}\" -O {dst_dir} --fuzzy -c") |
| else: |
| out_path = f"{dst_dir}/{file_name}" if file_name else dst_dir |
| os.system(f"gdown \"{url}\" -O {out_path} --fuzzy -c") |
|
|
| |
| elif 'github' in url or 'huggingface' in url: |
| run_aria2c(clean_url, dst_dir, file_name, aria2_args, hf_header if 'huggingface' in url else '') |
|
|
| |
| elif 'http' in url: |
| run_aria2c(url, dst_dir, file_name, aria2_args, aria2c_header) |
|
|
| ''' SubModels - Added URLs ''' |
|
|
| |
| def split_numbers(num_str, max_num): |
| result = [] |
| i = 0 |
| while i < len(num_str): |
| found = False |
| for length in range(2, 0, -1): |
| if i + length <= len(num_str): |
| part = int(num_str[i:i + length]) |
| if part <= max_num: |
| result.append(part) |
| i += length |
| found = True |
| break |
| if not found: |
| break |
| return result |
|
|
| def add_submodels(selection, num_selection, model_dict, dst_dir): |
| if selection == "none": |
| return [] |
| selected_models = [] |
|
|
| if selection == "ALL": |
| selected_models = sum(model_dict.values(), []) |
| else: |
| if selection in model_dict: |
| selected_models.extend(model_dict[selection]) |
|
|
| nums = num_selection.replace(',', ' ').split() |
| max_num = len(model_dict) |
| unique_nums = set() |
|
|
| for num_part in nums: |
| split_nums = split_numbers(num_part, max_num) |
| unique_nums.update(split_nums) |
|
|
| for num in unique_nums: |
| if 1 <= num <= max_num: |
| name = list(model_dict.keys())[num - 1] |
| selected_models.extend(model_dict[name]) |
|
|
| unique_models = {model['name']: model for model in selected_models}.values() |
|
|
| for model in unique_models: |
| model['dst_dir'] = dst_dir |
|
|
| return list(unique_models) |
|
|
| def handle_submodels(selection, num_selection, model_dict, dst_dir, url): |
| submodels = add_submodels(selection, num_selection, model_dict, dst_dir) |
| for submodel in submodels: |
| if not inpainting_model and "inpainting" in submodel['name']: |
| continue |
| url += f"{submodel['url']} {submodel['dst_dir']} {submodel['name']}, " |
| return url |
|
|
| url = "" |
| url = handle_submodels(model, model_num, model_list, models_dir, url) |
| url = handle_submodels(vae, vae_num, vae_list, vaes_dir, url) |
| url = handle_submodels(controlnet, controlnet_num, controlnet_list, control_dir, url) |
|
|
| ''' file.txt - added urls ''' |
|
|
| def process_file_download(file_url, PREFIXES, unique_urls): |
| files_urls = "" |
|
|
| if file_url.startswith("http"): |
| if "blob" in file_url: |
| file_url = file_url.replace("blob", "raw") |
| response = requests.get(file_url) |
| lines = response.text.split('\n') |
| else: |
| with open(file_url, 'r') as file: |
| lines = file.readlines() |
|
|
| current_tag = None |
| for line in lines: |
| line = line.strip() |
| if any(f'# {tag}' in line.lower() for tag in PREFIXES): |
| current_tag = next((tag for tag in PREFIXES if tag in line.lower())) |
|
|
| urls = [url.split('#')[0].strip() for url in line.split(',')] |
| for url in urls: |
| filter_url = url.split('[')[0] |
|
|
| if url.startswith("http") and filter_url not in unique_urls: |
| files_urls += f"{current_tag}:{url}, " |
| unique_urls.add(filter_url) |
|
|
| return files_urls |
|
|
| file_urls = "" |
| unique_urls = set() |
|
|
| if custom_file_urls: |
| for custom_file_url in custom_file_urls.replace(',', '').split(): |
| if not custom_file_url.endswith('.txt'): |
| custom_file_url += '.txt' |
| if not custom_file_url.startswith('http'): |
| if not custom_file_url.startswith(root_path): |
| custom_file_url = f'{root_path}/{custom_file_url}' |
|
|
| try: |
| file_urls += process_file_download(custom_file_url, PREFIXES, unique_urls) |
| except FileNotFoundError: |
| pass |
|
|
| |
| urls = (Model_url, Vae_url, LoRA_url, Embedding_url, Extensions_url) |
| prefixed_urls = (f"{prefix}:{url}" for prefix, url in zip(PREFIXES.keys(), urls) if url for url in url.replace(',', '').split()) |
| url += ", ".join(prefixed_urls) + ", " + file_urls |
|
|
| if detailed_download == "on": |
| print("\n\n\033[33m# ====== Подробная Загрузка ====== #\n\033[0m") |
| download(url) |
| print("\n\033[33m# =============================== #\n\033[0m") |
| else: |
| with capture.capture_output(): |
| download(url) |
|
|
| print("\r🏁 Скачивание Завершено!" + " "*15) |
|
|
|
|
| |
| get_ipython().system('find {webui_path} \\( -type d \\( -name ".ipynb_checkpoints" -o -name ".aria2" \\) -o -type f -name "*.aria2" \\) -exec rm -r {{}} \\; >/dev/null 2>&1') |
|
|
|
|
| |
| if len(extension_repo) > 0: |
| print("✨ Установка кастомных расширений...", end='') |
| with capture.capture_output(): |
| for repo, repo_name in extension_repo: |
| if not repo_name: |
| repo_name = repo.split('/')[-1] |
| get_ipython().system('cd {extensions_dir} && git clone {repo} {repo_name} && cd {repo_name} && git fetch') |
| print(f"\r📦 Установлено '{len(extension_repo)}', Кастомных расширений!") |
|
|
|
|
| |
| if detailed_download == "off": |
| print("\n\n\033[33mЕсли вы не видете каких-то скаченных файлов, включите в виджетах функцию 'Подробная Загрузка'.") |
|
|
| get_ipython().run_line_magic('run', '{root_path}/file_cell/special/dl_display_results.py # display widgets result') |
|
|
|
|