from typing import Dict, List, Any import torch from diffusers import DPMSolverMultistepScheduler, EulerDiscreteScheduler, EulerDiscreteScheduler, DDIMScheduler, StableDiffusionInpaintPipeline, AutoPipelineForInpainting, AutoPipelineForImage2Image, DiffusionPipeline, StableDiffusionXLImg2ImgPipeline, StableDiffusionControlNetInpaintPipeline, ControlNetModel, StableDiffusionPipeline from PIL import Image import base64 from io import BytesIO import numpy as np import cv2 # set device device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') if device.type != 'cuda': raise ValueError("need to run on GPU") class EndpointHandler(): def __init__(self, path=""): #self.fast_pipe = AutoPipelineForInpainting.from_pretrained("diffusers/stable-diffusion-xl-1.0-inpainting-0.1", torch_dtype=torch.float16, variant="fp16").to("cuda") #self.generator = torch.Generator(device="cuda").manual_seed(0) # self.smooth_pipe = StableDiffusionXLImg2ImgPipeline.from_pretrained( # "stabilityai/stable-diffusion-xl-refiner-1.0", torch_dtype=torch.float16, variant="fp16", use_safetensors=True # ) # self.smooth_pipe.to("cuda") self.controlnet = ControlNetModel.from_pretrained( "lllyasviel/control_v11p_sd15_inpaint", torch_dtype=torch.float16 ) self.pipe = StableDiffusionControlNetInpaintPipeline.from_pretrained( "runwayml/stable-diffusion-v1-5", controlnet=self.controlnet, torch_dtype=torch.float16 ) self.pipe.scheduler = EulerDiscreteScheduler.from_config(self.pipe.scheduler.config) self.pipe.enable_model_cpu_offload() self.pipe.enable_xformers_memory_efficient_attention() """ # load StableDiffusionInpaintPipeline pipeline self.pipe = AutoPipelineForInpainting.from_pretrained( "runwayml/stable-diffusion-inpainting", revision="fp16", torch_dtype=torch.float16, ) # use DPMSolverMultistepScheduler self.pipe.scheduler = DPMSolverMultistepScheduler.from_config(self.pipe.scheduler.config) self.pipe.enable_model_cpu_offload() self.pipe.enable_xformers_memory_efficient_attention() # move to device #self.pipe = self.pipe.to(device) self.pipe2 = AutoPipelineForInpainting.from_pretrained("stabilityai/stable-diffusion-xl-refiner-1.0", torch_dtype=torch.float16, variant="fp16", use_safetensors=True) #self.pipe2.enable_model_cpu_offload() self.pipe2.enable_xformers_memory_efficient_attention() self.pipe2.to("cuda") self.pipe3 = AutoPipelineForImage2Image.from_pipe(self.pipe2) #self.pipe3.enable_model_cpu_offload() self.pipe3.enable_xformers_memory_efficient_attention() """ def __call__(self, data: Any) -> List[List[Dict[str, float]]]: """ :param data: A dictionary contains `inputs` and optional `image` field. :return: A dictionary with `image` field contains image in base64. """ encoded_image = data.pop("image", None) encoded_mask_image = data.pop("mask_image", None) prompt = data.pop("prompt", "") negative_prompt = data.pop("negative_prompt", "") method = data.pop("method", "slow") strength = data.pop("strength", 0.2) guidance_scale = data.pop("guidance_scale", 8.0) num_inference_steps = data.pop("num_inference_steps", 20) """ if(method == "smooth"): if encoded_image is not None: image = self.decode_base64_image(encoded_image) out = self.smooth_pipe(prompt, image=image).images[0] return out """ # process image if encoded_image is not None and encoded_mask_image is not None: image = self.decode_base64_image(encoded_image).convert("RGB") mask_image = self.decode_base64_image(encoded_mask_image).convert("RGB") else: image = None mask_image = None """ if(method == "fast"): image = self.fast_pipe( prompt=prompt, negative_prompt=negative_prompt, image=image, mask_image=mask_image, guidance_scale=guidance_scale, num_inference_steps=num_inference_steps, # steps between 15 and 30 work well for us strength=strength, # make sure to use `strength` below 1.0 generator=self.generator, ).images[0] return image """ #pipe = AutoPipelineForInpainting.from_pretrained("diffusers/stable-diffusion-xl-1.0-inpainting-0.1", torch_dtype=torch.float16, variant="fp16").to("cuda") """ # run inference pipeline out = self.pipe(prompt=prompt, negative_prompt=negative_prompt, image=image, mask_image=mask_image, num_inference_steps=num_inference_steps, guidance_scale=guidance_scale) print("1st pipeline part successful!") image = out.images[0].resize((1024, 1024)) print("image resizing successful!") image = self.pipe2( prompt=prompt, negative_prompt=negative_prompt, image=image, mask_image=mask_image, guidance_scale=guidance_scale, #8.0 num_inference_steps=int(num_inference_steps/10), #100 strength=strength, #0.2 output_type="latent", # let's keep in latent to save some VRAM ).images[0] print("2nd pipeline part successful!") image2 = self.pipe3( prompt=prompt, image=image, guidance_scale=guidance_scale, #8.0 num_inference_steps=int(num_inference_steps/10), #100 strength=strength, #0.2 ).images[0] print("3rd pipeline part successful!") # return first generate PIL image return image2 """ control_image = self.make_inpaint_condition(image, mask_image) # generate image image = self.pipe( prompt=prompt, negative_prompt=negative_prompt, num_inference_steps=num_inference_steps, eta=1.0, image=image, mask_image=mask_image, control_image=control_image, guidance_scale=guidance_scale, strength=strength ).images[0] return image # helper to decode input image def decode_base64_image(self, image_string): base64_image = base64.b64decode(image_string) buffer = BytesIO(base64_image) image = Image.open(buffer) return image def make_inpaint_condition(self, image, image_mask): image = np.array(image.convert("RGB")).astype(np.float32) / 255.0 image_mask = np.array(image_mask.convert("L")).astype(np.float32) / 255.0 assert image.shape[0:1] == image_mask.shape[0:1], "image and image_mask must have the same image size" image[image_mask > 0.5] = -1.0 # set as masked pixel image = np.expand_dims(image, 0).transpose(0, 3, 1, 2) image = torch.from_numpy(image) return image