Spaces:
Runtime error
Runtime error
File size: 3,942 Bytes
2de3774 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
import os
import traceback
import numpy as np
import torch
import modules.async_worker as worker
import modules.controlnet
from shared import path_manager
import comfy.utils
from comfy_extras.chainner_models import model_loading
from comfy_extras.nodes_upscale_model import ImageUpscaleWithModel
from PIL import Image
Image.MAX_IMAGE_PIXELS = None
class pipeline:
pipeline_type = ["template"]
model_hash = ""
def parse_gen_data(self, gen_data):
gen_data["original_image_number"] = gen_data["image_number"]
gen_data["image_number"] = 1
gen_data["show_preview"] = False
return gen_data
def load_upscaler_model(self, model_name):
model_path = path_manager.get_file_path(
model_name,
default = os.path.join(path_manager.model_paths["upscaler_path"], model_name)
)
sd = comfy.utils.load_torch_file(str(model_path), safe_load=True)
if "module.layers.0.residual_group.blocks.0.norm1.weight" in sd:
sd = comfy.utils.state_dict_prefix_replace(sd, {"module.": ""})
out = model_loading.load_state_dict(sd).eval()
return out
def load_base_model(self, name):
# Check if model is already loaded
if self.model_hash == name:
return
print(f"Loading model: {name}")
self.model_hash = name
return
def load_keywords(self, lora):
filename = lora.replace(".safetensors", ".txt")
try:
with open(filename, "r") as file:
data = file.read()
return data
except FileNotFoundError:
return " "
def load_loras(self, loras):
return
def refresh_controlnet(self, name=None):
return
def clean_prompt_cond_caches(self):
return
def process(
self,
gen_data=None,
callback=None,
):
input_image = gen_data["input_image"]
input_image = input_image.convert("RGB")
input_image = np.array(input_image).astype(np.float32) / 255.0
input_image = torch.from_numpy(input_image)[None,]
worker.add_result(
gen_data["task_id"],
"preview",
(-1, f"Load upscaling model ...", None)
)
cn_settings = modules.controlnet.get_settings(gen_data)
upscaler_name = cn_settings["upscaler"]
upscale_path = path_manager.get_file_path(upscaler_name)
if upscale_path == None:
upscale_path = path_manager.get_file_path("4x-UltraSharp.pth")
upscaler_model = self.load_upscaler_model(upscale_path)
worker.add_result(
gen_data["task_id"],
"preview",
(-1, f"Upscaling image ...", None)
)
decoded_latent = ImageUpscaleWithModel().upscale(
upscaler_model, input_image
)[0]
try:
upscaler_model = self.load_upscaler_model(upscale_path)
worker.add_result(
gen_data["task_id"],
"preview",
(-1, f"Upscaling image ...", None)
)
decoded_latent = ImageUpscaleWithModel().upscale(
upscaler_model, input_image
)[0]
worker.add_result(
gen_data["task_id"],
"preview",
(-1, f"Converting ...", None)
)
images = [
np.clip(255.0 * y.cpu().numpy(), 0, 255).astype(np.uint8)
for y in decoded_latent
]
worker.add_result(
gen_data["task_id"],
"preview",
(-1, f"Done ...", None)
)
except:
traceback.print_exc()
worker.add_result(
gen_data["task_id"],
"preview",
(-1, f"Oops ...", "html/error.png")
)
images = []
return images
|