Spaces:
				
			
			
	
			
			
		Running
		
			on 
			
			Zero
	
	
	
			
			
	
	
	
	
		
		
		Running
		
			on 
			
			Zero
	| import spaces | |
| from typing import Tuple, Union, List | |
| import os | |
| import time | |
| import numpy as np | |
| from PIL import Image | |
| import requests | |
| import torch | |
| from diffusers import StableDiffusionControlNetImg2ImgPipeline, ControlNetModel, DDIMScheduler | |
| from diffusers.pipelines.stable_diffusion import StableDiffusionSafetyChecker | |
| from diffusers.models import AutoencoderKL | |
| from diffusers.models.attention_processor import AttnProcessor2_0 | |
| from diffusers.pipelines.controlnet import StableDiffusionControlNetInpaintPipeline | |
| from diffusers import ControlNetModel, UniPCMultistepScheduler, AutoPipelineForText2Image | |
| from transformers import AutoImageProcessor, UperNetForSemanticSegmentation, AutoModelForDepthEstimation | |
| from colors import ade_palette | |
| from utils import map_colors_rgb | |
| from diffusers import StableDiffusionXLPipeline | |
| import gradio as gr | |
| import gc | |
| device = "cuda" | |
| dtype = torch.float16 | |
| css = """ | |
| #img-display-container { | |
| max-height: 50vh; | |
| } | |
| #img-display-input { | |
| max-height: 40vh; | |
| } | |
| #img-display-output { | |
| max-height: 40vh; | |
| } | |
| """ | |
| def download_file(url, folder_path, filename): | |
| if not os.path.exists(folder_path): | |
| os.makedirs(folder_path) | |
| file_path = os.path.join(folder_path, filename) | |
| if os.path.isfile(file_path): | |
| print(f"File already exists: {file_path}") | |
| else: | |
| response = requests.get(url, stream=True) | |
| if response.status_code == 200: | |
| with open(file_path, 'wb') as file: | |
| for chunk in response.iter_content(chunk_size=1024): | |
| file.write(chunk) | |
| print(f"File successfully downloaded and saved: {file_path}") | |
| else: | |
| print(f"Error downloading the file. Status code: {response.status_code}") | |
| def download_models(): | |
| models = { | |
| "MODEL": ("https://huggingface.co/dantea1118/juggernaut_reborn/resolve/main/juggernaut_reborn.safetensors?download=true", "models/models/Stable-diffusion", "juggernaut_reborn.safetensors"), | |
| "UPSCALER_X2": ("https://huggingface.co/ai-forever/Real-ESRGAN/resolve/main/RealESRGAN_x2.pth?download=true", "models/upscalers/", "RealESRGAN_x2.pth"), | |
| "UPSCALER_X4": ("https://huggingface.co/ai-forever/Real-ESRGAN/resolve/main/RealESRGAN_x4.pth?download=true", "models/upscalers/", "RealESRGAN_x4.pth"), | |
| "NEGATIVE_1": ("https://huggingface.co/philz1337x/embeddings/resolve/main/verybadimagenegative_v1.3.pt?download=true", "models/embeddings", "verybadimagenegative_v1.3.pt"), | |
| "NEGATIVE_2": ("https://huggingface.co/datasets/AddictiveFuture/sd-negative-embeddings/resolve/main/JuggernautNegative-neg.pt?download=true", "models/embeddings", "JuggernautNegative-neg.pt"), | |
| "LORA_1": ("https://huggingface.co/philz1337x/loras/resolve/main/SDXLrender_v2.0.safetensors?download=true", "models/Lora", "SDXLrender_v2.0.safetensors"), | |
| "LORA_2": ("https://huggingface.co/philz1337x/loras/resolve/main/more_details.safetensors?download=true", "models/Lora", "more_details.safetensors"), | |
| "CONTROLNET": ("https://huggingface.co/lllyasviel/ControlNet-v1-1/resolve/main/control_v11f1e_sd15_tile.pth?download=true", "models/ControlNet", "control_v11f1e_sd15_tile.pth"), | |
| "VAE": ("https://huggingface.co/stabilityai/sd-vae-ft-mse-original/resolve/main/vae-ft-mse-840000-ema-pruned.safetensors?download=true", "models/VAE", "vae-ft-mse-840000-ema-pruned.safetensors"), | |
| } | |
| for model, (url, folder, filename) in models.items(): | |
| download_file(url, folder, filename) | |
| def timer_func(func): | |
| def wrapper(*args, **kwargs): | |
| start_time = time.time() | |
| result = func(*args, **kwargs) | |
| end_time = time.time() | |
| print(f"{func.__name__} took {end_time - start_time:.2f} seconds") | |
| return result | |
| return wrapper | |
| class LazyLoadPipeline: | |
| def __init__(self): | |
| self.pipe = None | |
| def load(self): | |
| if self.pipe is None: | |
| print("Starting to load the pipeline...") | |
| self.pipe = self.setup_pipeline() | |
| print(f"Moving pipeline to device: {device}") | |
| self.pipe.to(device) | |
| if USE_TORCH_COMPILE: | |
| print("Compiling the model...") | |
| self.pipe.unet = torch.compile(self.pipe.unet, mode="reduce-overhead", fullgraph=True) | |
| def setup_pipeline(self): | |
| print("Setting up the pipeline...") | |
| controlnet = ControlNetModel.from_single_file( | |
| "models/ControlNet/control_v11f1e_sd15_tile.pth", torch_dtype=torch.float16 | |
| ) | |
| safety_checker = StableDiffusionSafetyChecker.from_pretrained("CompVis/stable-diffusion-safety-checker") | |
| model_path = "models/models/Stable-diffusion/juggernaut_reborn.safetensors" | |
| pipe = StableDiffusionControlNetImg2ImgPipeline.from_single_file( | |
| model_path, | |
| controlnet=controlnet, | |
| torch_dtype=torch.float16, | |
| use_safetensors=True, | |
| safety_checker=safety_checker | |
| ) | |
| vae = AutoencoderKL.from_single_file( | |
| "models/VAE/vae-ft-mse-840000-ema-pruned.safetensors", | |
| torch_dtype=torch.float16 | |
| ) | |
| pipe.vae = vae | |
| pipe.load_textual_inversion("models/embeddings/verybadimagenegative_v1.3.pt") | |
| pipe.load_textual_inversion("models/embeddings/JuggernautNegative-neg.pt") | |
| pipe.load_lora_weights("models/Lora/SDXLrender_v2.0.safetensors") | |
| pipe.fuse_lora(lora_scale=0.5) | |
| pipe.load_lora_weights("models/Lora/more_details.safetensors") | |
| pipe.fuse_lora(lora_scale=1.) | |
| pipe.scheduler = DDIMScheduler.from_config(pipe.scheduler.config) | |
| pipe.enable_freeu(s1=0.9, s2=0.2, b1=1.3, b2=1.4) | |
| return pipe | |
| def __call__(self, *args, **kwargs): | |
| return self.pipe(*args, **kwargs) | |
| class LazyRealESRGAN: | |
| def __init__(self, device, scale): | |
| self.device = device | |
| self.scale = scale | |
| self.model = None | |
| def load_model(self): | |
| if self.model is None: | |
| self.model = RealESRGAN(self.device, scale=self.scale) | |
| self.model.load_weights(f'models/upscalers/RealESRGAN_x{self.scale}.pth', download=False) | |
| def predict(self, img): | |
| self.load_model() | |
| return self.model.predict(img) | |
| def resize_and_upscale(input_image, resolution): | |
| scale = 2 if resolution <= 2048 else 4 | |
| input_image = input_image.convert("RGB") | |
| W, H = input_image.size | |
| k = float(resolution) / min(H, W) | |
| H = int(round(H * k / 64.0)) * 64 | |
| W = int(round(W * k / 64.0)) * 64 | |
| img = input_image.resize((W, H), resample=Image.LANCZOS) | |
| if scale == 2: | |
| img = lazy_realesrgan_x2.predict(img) | |
| else: | |
| img = lazy_realesrgan_x4.predict(img) | |
| return img | |
| def create_hdr_effect(original_image, hdr): | |
| if hdr == 0: | |
| return original_image | |
| cv_original = cv2.cvtColor(np.array(original_image), cv2.COLOR_RGB2BGR) | |
| factors = [1.0 - 0.9 * hdr, 1.0 - 0.7 * hdr, 1.0 - 0.45 * hdr, | |
| 1.0 - 0.25 * hdr, 1.0, 1.0 + 0.2 * hdr, | |
| 1.0 + 0.4 * hdr, 1.0 + 0.6 * hdr, 1.0 + 0.8 * hdr] | |
| images = [cv2.convertScaleAbs(cv_original, alpha=factor) for factor in factors] | |
| merge_mertens = cv2.createMergeMertens() | |
| hdr_image = merge_mertens.process(images) | |
| hdr_image_8bit = np.clip(hdr_image * 255, 0, 255).astype('uint8') | |
| return Image.fromarray(cv2.cvtColor(hdr_image_8bit, cv2.COLOR_BGR2RGB)) | |
| def prepare_image(input_image, resolution, hdr): | |
| condition_image = resize_and_upscale(input_image, resolution) | |
| condition_image = create_hdr_effect(condition_image, hdr) | |
| return condition_image | |
| def gradio_process_image(input_image, resolution, num_inference_steps, strength, hdr, guidance_scale): | |
| print("Starting image processing...") | |
| torch.cuda.empty_cache() | |
| condition_image = prepare_image(input_image, resolution, hdr) | |
| prompt = "masterpiece, best quality, highres" | |
| negative_prompt = "low quality, normal quality, ugly, blurry, blur, lowres, bad anatomy, bad hands, cropped, worst quality, verybadimagenegative_v1.3, JuggernautNegative-neg" | |
| options = { | |
| "prompt": prompt, | |
| "negative_prompt": negative_prompt, | |
| "image": condition_image, | |
| "control_image": condition_image, | |
| "width": condition_image.size[0], | |
| "height": condition_image.size[1], | |
| "strength": strength, | |
| "num_inference_steps": num_inference_steps, | |
| "guidance_scale": guidance_scale, | |
| "generator": torch.Generator(device=device).manual_seed(0), | |
| } | |
| print("Running inference...") | |
| result = lazy_pipe(**options).images[0] | |
| print("Image processing completed successfully") | |
| # Convert input_image and result to numpy arrays | |
| input_array = np.array(input_image) | |
| result_array = np.array(result) | |
| return [input_array, result_array] | |
| def filter_items( | |
| colors_list: Union[List, np.ndarray], | |
| items_list: Union[List, np.ndarray], | |
| items_to_remove: Union[List, np.ndarray] | |
| ) -> Tuple[Union[List, np.ndarray], Union[List, np.ndarray]]: | |
| """ | |
| Filters items and their corresponding colors from given lists, excluding | |
| specified items. | |
| Args: | |
| colors_list: A list or numpy array of colors corresponding to items. | |
| items_list: A list or numpy array of items. | |
| items_to_remove: A list or numpy array of items to be removed. | |
| Returns: | |
| A tuple of two lists or numpy arrays: filtered colors and filtered | |
| items. | |
| """ | |
| filtered_colors = [] | |
| filtered_items = [] | |
| for color, item in zip(colors_list, items_list): | |
| if item not in items_to_remove: | |
| filtered_colors.append(color) | |
| filtered_items.append(item) | |
| return filtered_colors, filtered_items | |
| def get_segmentation_pipeline( | |
| ) -> Tuple[AutoImageProcessor, UperNetForSemanticSegmentation]: | |
| """Method to load the segmentation pipeline | |
| Returns: | |
| Tuple[AutoImageProcessor, UperNetForSemanticSegmentation]: segmentation pipeline | |
| """ | |
| image_processor = AutoImageProcessor.from_pretrained( | |
| "openmmlab/upernet-convnext-small" | |
| ) | |
| image_segmentor = UperNetForSemanticSegmentation.from_pretrained( | |
| "openmmlab/upernet-convnext-small" | |
| ) | |
| return image_processor, image_segmentor | |
| def segment_image( | |
| image: Image, | |
| image_processor: AutoImageProcessor, | |
| image_segmentor: UperNetForSemanticSegmentation | |
| ) -> Image: | |
| """ | |
| Segments an image using a semantic segmentation model. | |
| Args: | |
| image (Image): The input image to be segmented. | |
| image_processor (AutoImageProcessor): The processor to prepare the | |
| image for segmentation. | |
| image_segmentor (UperNetForSemanticSegmentation): The semantic | |
| segmentation model used to identify different segments in the image. | |
| Returns: | |
| Image: The segmented image with each segment colored differently based | |
| on its identified class. | |
| """ | |
| # image_processor, image_segmentor = get_segmentation_pipeline() | |
| pixel_values = image_processor(image, return_tensors="pt").pixel_values | |
| with torch.no_grad(): | |
| outputs = image_segmentor(pixel_values) | |
| seg = image_processor.post_process_semantic_segmentation( | |
| outputs, target_sizes=[image.size[::-1]])[0] | |
| color_seg = np.zeros((seg.shape[0], seg.shape[1], 3), dtype=np.uint8) | |
| palette = np.array(ade_palette()) | |
| for label, color in enumerate(palette): | |
| color_seg[seg == label, :] = color | |
| color_seg = color_seg.astype(np.uint8) | |
| seg_image = Image.fromarray(color_seg).convert('RGB') | |
| return seg_image | |
| def get_depth_pipeline(): | |
| feature_extractor = AutoImageProcessor.from_pretrained("LiheYoung/depth-anything-large-hf", | |
| torch_dtype=dtype) | |
| depth_estimator = AutoModelForDepthEstimation.from_pretrained("LiheYoung/depth-anything-large-hf", | |
| torch_dtype=dtype) | |
| return feature_extractor, depth_estimator | |
| def get_depth_image( | |
| image: Image, | |
| feature_extractor: AutoImageProcessor, | |
| depth_estimator: AutoModelForDepthEstimation | |
| ) -> Image: | |
| image_to_depth = feature_extractor(images=image, return_tensors="pt").to(device) | |
| with torch.no_grad(): | |
| depth_map = depth_estimator(**image_to_depth).predicted_depth | |
| width, height = image.size | |
| depth_map = torch.nn.functional.interpolate( | |
| depth_map.unsqueeze(1).float(), | |
| size=(height, width), | |
| mode="bicubic", | |
| align_corners=False, | |
| ) | |
| depth_min = torch.amin(depth_map, dim=[1, 2, 3], keepdim=True) | |
| depth_max = torch.amax(depth_map, dim=[1, 2, 3], keepdim=True) | |
| depth_map = (depth_map - depth_min) / (depth_max - depth_min) | |
| image = torch.cat([depth_map] * 3, dim=1) | |
| image = image.permute(0, 2, 3, 1).cpu().numpy()[0] | |
| image = Image.fromarray((image * 255.0).clip(0, 255).astype(np.uint8)) | |
| return image | |
| def resize_dimensions(dimensions, target_size): | |
| """ | |
| Resize PIL to target size while maintaining aspect ratio | |
| If smaller than target size leave it as is | |
| """ | |
| width, height = dimensions | |
| # Check if both dimensions are smaller than the target size | |
| if width < target_size and height < target_size: | |
| return dimensions | |
| # Determine the larger side | |
| if width > height: | |
| # Calculate the aspect ratio | |
| aspect_ratio = height / width | |
| # Resize dimensions | |
| return (target_size, int(target_size * aspect_ratio)) | |
| else: | |
| # Calculate the aspect ratio | |
| aspect_ratio = width / height | |
| # Resize dimensions | |
| return (int(target_size * aspect_ratio), target_size) | |
| def flush(): | |
| gc.collect() | |
| torch.cuda.empty_cache() | |
| class ControlNetDepthDesignModelMulti: | |
| """ Produces random noise images """ | |
| def __init__(self): | |
| """ Initialize your model(s) here """ | |
| #os.environ['HF_HUB_OFFLINE'] = "True" | |
| self.seed = 323*111 | |
| self.neg_prompt = "window, door, low resolution, banner, logo, watermark, text, deformed, blurry, out of focus, surreal, ugly, beginner" | |
| self.control_items = ["windowpane;window", "door;double;door"] | |
| self.additional_quality_suffix = "interior design, 4K, high resolution, photorealistic" | |
| def generate_design(self, empty_room_image: Image, prompt: str, guidance_scale: int = 10, num_steps: int = 50, strength: float =0.9, img_size: int = 640) -> Image: | |
| """ | |
| Given an image of an empty room and a prompt | |
| generate the designed room according to the prompt | |
| Inputs - | |
| empty_room_image - An RGB PIL Image of the empty room | |
| prompt - Text describing the target design elements of the room | |
| Returns - | |
| design_image - PIL Image of the same size as the empty room image | |
| If the size is not the same the submission will fail. | |
| """ | |
| print(prompt) | |
| flush() | |
| self.generator = torch.Generator(device=device).manual_seed(self.seed) | |
| pos_prompt = prompt + f', {self.additional_quality_suffix}' | |
| orig_w, orig_h = empty_room_image.size | |
| new_width, new_height = resize_dimensions(empty_room_image.size, img_size) | |
| input_image = empty_room_image.resize((new_width, new_height)) | |
| real_seg = np.array(segment_image(input_image, | |
| seg_image_processor, | |
| image_segmentor)) | |
| unique_colors = np.unique(real_seg.reshape(-1, real_seg.shape[2]), axis=0) | |
| unique_colors = [tuple(color) for color in unique_colors] | |
| segment_items = [map_colors_rgb(i) for i in unique_colors] | |
| chosen_colors, segment_items = filter_items( | |
| colors_list=unique_colors, | |
| items_list=segment_items, | |
| items_to_remove=self.control_items | |
| ) | |
| mask = np.zeros_like(real_seg) | |
| for color in chosen_colors: | |
| color_matches = (real_seg == color).all(axis=2) | |
| mask[color_matches] = 1 | |
| image_np = np.array(input_image) | |
| image = Image.fromarray(image_np).convert("RGB") | |
| mask_image = Image.fromarray((mask * 255).astype(np.uint8)).convert("RGB") | |
| segmentation_cond_image = Image.fromarray(real_seg).convert("RGB") | |
| image_depth = get_depth_image(image, depth_feature_extractor, depth_estimator) | |
| # generate image that would be used as IP-adapter | |
| flush() | |
| new_width_ip = int(new_width / 8) * 8 | |
| new_height_ip = int(new_height / 8) * 8 | |
| ip_image = guide_pipe(pos_prompt, | |
| num_inference_steps=num_steps, | |
| negative_prompt=self.neg_prompt, | |
| height=new_height_ip, | |
| width=new_width_ip, | |
| generator=[self.generator]).images[0] | |
| flush() | |
| generated_image = pipe( | |
| prompt=pos_prompt, | |
| negative_prompt=self.neg_prompt, | |
| num_inference_steps=num_steps, | |
| strength=strength, | |
| guidance_scale=guidance_scale, | |
| generator=[self.generator], | |
| image=image, | |
| mask_image=mask_image, | |
| ip_adapter_image=ip_image, | |
| control_image=[image_depth, segmentation_cond_image], | |
| controlnet_conditioning_scale=[0.5, 0.5] | |
| ).images[0] | |
| flush() | |
| design_image = generated_image.resize( | |
| (orig_w, orig_h), Image.Resampling.LANCZOS | |
| ) | |
| return design_image | |
| def create_demo(model): | |
| gr.Markdown("### Just try space ...") | |
| with gr.Row(): | |
| with gr.Column(): | |
| input_image = gr.Image(label="Input Image", type='pil', elem_id='img-display-input') | |
| input_text = gr.Textbox(label='Prompt', placeholder='Please upload your image first', lines=2) | |
| with gr.Accordion('Advanced options', open=False): | |
| num_steps = gr.Slider(label='Steps', | |
| minimum=1, | |
| maximum=50, | |
| value=50, | |
| step=1) | |
| img_size = gr.Slider(label='Image size', | |
| minimum=256, | |
| maximum=768, | |
| value=768, | |
| step=64) | |
| guidance_scale = gr.Slider(label='Guidance Scale', | |
| minimum=0.1, | |
| maximum=30.0, | |
| value=10.0, | |
| step=0.1) | |
| seed = gr.Slider(label='Seed', | |
| minimum=-1, | |
| maximum=2147483647, | |
| value=323*111, | |
| step=1, | |
| randomize=True) | |
| strength = gr.Slider(label='Strength', | |
| minimum=0.1, | |
| maximum=1.0, | |
| value=0.9, | |
| step=0.1) | |
| a_prompt = gr.Textbox( | |
| label='Added Prompt', | |
| value="interior design, 4K, high resolution, photorealistic") | |
| n_prompt = gr.Textbox( | |
| label='Negative Prompt', | |
| value="window, door, low resolution, banner, logo, watermark, text, deformed, blurry, out of focus, surreal, ugly, beginner") | |
| submit = gr.Button("Submit") | |
| with gr.Column(): | |
| design_image = gr.Image(label="Output Mask", elem_id='img-display-output') | |
| def on_submit(image, text, num_steps, guidance_scale, seed, strength, a_prompt, n_prompt, img_size): | |
| model.seed = seed | |
| model.neg_prompt = n_prompt | |
| model.additional_quality_suffix = a_prompt | |
| with torch.no_grad(): | |
| out_img = model.generate_design(image, text, guidance_scale=guidance_scale, num_steps=num_steps, strength=strength, img_size=img_size) | |
| return out_img | |
| submit.click(on_submit, inputs=[input_image, input_text, num_steps, guidance_scale, seed, strength, a_prompt, n_prompt, img_size], outputs=design_image) | |
| controlnet_depth= ControlNetModel.from_pretrained( | |
| "controlnet_depth", torch_dtype=dtype, use_safetensors=True) | |
| controlnet_seg = ControlNetModel.from_pretrained( | |
| "own_controlnet", torch_dtype=dtype, use_safetensors=True) | |
| pipe = StableDiffusionControlNetInpaintPipeline.from_pretrained( | |
| "SG161222/Realistic_Vision_V6.0_B1_noVAE", | |
| #"models/runwayml--stable-diffusion-inpainting", | |
| controlnet=[controlnet_depth, controlnet_seg], | |
| safety_checker=None, | |
| torch_dtype=dtype | |
| ) | |
| pipe.load_ip_adapter("h94/IP-Adapter", subfolder="models", | |
| weight_name="ip-adapter_sd15.bin") | |
| pipe.set_ip_adapter_scale(0.4) | |
| pipe.scheduler = UniPCMultistepScheduler.from_config(pipe.scheduler.config) | |
| pipe = pipe.to(device) | |
| guide_pipe = StableDiffusionXLPipeline.from_pretrained("segmind/SSD-1B", | |
| torch_dtype=dtype, use_safetensors=True, variant="fp16") | |
| guide_pipe = guide_pipe.to(device) | |
| seg_image_processor, image_segmentor = get_segmentation_pipeline() | |
| depth_feature_extractor, depth_estimator = get_depth_pipeline() | |
| depth_estimator = depth_estimator.to(device) | |
| #download_models() | |
| #lazy_realesrgan_x2 = LazyRealESRGAN(device, scale=2) | |
| #lazy_realesrgan_x4 = LazyRealESRGAN(device, scale=4) | |
| #lazy_pipe = LazyLoadPipeline() | |
| #lazy_pipe.load() | |
| def main(): | |
| model = ControlNetDepthDesignModelMulti() | |
| print('Models uploaded successfully') | |
| title = "# Just try zeroGPU" | |
| description = """ | |
| For test only | |
| """ | |
| with gr.Blocks() as demo: | |
| gr.Markdown(title) | |
| gr.Markdown(description) | |
| create_demo(model) | |
| demo.queue().launch(share=False) | |
| if __name__ == '__main__': | |
| main() | |
