karimbenharrak's picture
Update handler.py
5a8046e verified
from typing import Dict, List, Any
import torch
from diffusers import DPMSolverMultistepScheduler, EulerDiscreteScheduler, EulerDiscreteScheduler, DDIMScheduler, StableDiffusionInpaintPipeline, AutoPipelineForInpainting, AutoPipelineForImage2Image, DiffusionPipeline, StableDiffusionXLImg2ImgPipeline, StableDiffusionControlNetInpaintPipeline, ControlNetModel, StableDiffusionPipeline
from PIL import Image
import base64
from io import BytesIO
import numpy as np
import cv2
# set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
if device.type != 'cuda':
raise ValueError("need to run on GPU")
class EndpointHandler():
def __init__(self, path=""):
#self.fast_pipe = AutoPipelineForInpainting.from_pretrained("diffusers/stable-diffusion-xl-1.0-inpainting-0.1", torch_dtype=torch.float16, variant="fp16").to("cuda")
#self.generator = torch.Generator(device="cuda").manual_seed(0)
# self.smooth_pipe = StableDiffusionXLImg2ImgPipeline.from_pretrained(
# "stabilityai/stable-diffusion-xl-refiner-1.0", torch_dtype=torch.float16, variant="fp16", use_safetensors=True
# )
# self.smooth_pipe.to("cuda")
self.controlnet = ControlNetModel.from_pretrained(
"lllyasviel/control_v11p_sd15_inpaint", torch_dtype=torch.float16
)
self.pipe = StableDiffusionControlNetInpaintPipeline.from_pretrained(
"runwayml/stable-diffusion-v1-5", controlnet=self.controlnet, torch_dtype=torch.float16
)
self.pipe.scheduler = EulerDiscreteScheduler.from_config(self.pipe.scheduler.config)
self.pipe.enable_model_cpu_offload()
self.pipe.enable_xformers_memory_efficient_attention()
"""
# load StableDiffusionInpaintPipeline pipeline
self.pipe = AutoPipelineForInpainting.from_pretrained(
"runwayml/stable-diffusion-inpainting",
revision="fp16",
torch_dtype=torch.float16,
)
# use DPMSolverMultistepScheduler
self.pipe.scheduler = DPMSolverMultistepScheduler.from_config(self.pipe.scheduler.config)
self.pipe.enable_model_cpu_offload()
self.pipe.enable_xformers_memory_efficient_attention()
# move to device
#self.pipe = self.pipe.to(device)
self.pipe2 = AutoPipelineForInpainting.from_pretrained("stabilityai/stable-diffusion-xl-refiner-1.0", torch_dtype=torch.float16, variant="fp16", use_safetensors=True)
#self.pipe2.enable_model_cpu_offload()
self.pipe2.enable_xformers_memory_efficient_attention()
self.pipe2.to("cuda")
self.pipe3 = AutoPipelineForImage2Image.from_pipe(self.pipe2)
#self.pipe3.enable_model_cpu_offload()
self.pipe3.enable_xformers_memory_efficient_attention()
"""
def __call__(self, data: Any) -> List[List[Dict[str, float]]]:
"""
:param data: A dictionary contains `inputs` and optional `image` field.
:return: A dictionary with `image` field contains image in base64.
"""
encoded_image = data.pop("image", None)
encoded_mask_image = data.pop("mask_image", None)
prompt = data.pop("prompt", "")
negative_prompt = data.pop("negative_prompt", "")
method = data.pop("method", "slow")
strength = data.pop("strength", 0.2)
guidance_scale = data.pop("guidance_scale", 8.0)
num_inference_steps = data.pop("num_inference_steps", 20)
"""
if(method == "smooth"):
if encoded_image is not None:
image = self.decode_base64_image(encoded_image)
out = self.smooth_pipe(prompt, image=image).images[0]
return out
"""
# process image
if encoded_image is not None and encoded_mask_image is not None:
image = self.decode_base64_image(encoded_image).convert("RGB")
mask_image = self.decode_base64_image(encoded_mask_image).convert("RGB")
else:
image = None
mask_image = None
"""
if(method == "fast"):
image = self.fast_pipe(
prompt=prompt,
negative_prompt=negative_prompt,
image=image,
mask_image=mask_image,
guidance_scale=guidance_scale,
num_inference_steps=num_inference_steps, # steps between 15 and 30 work well for us
strength=strength, # make sure to use `strength` below 1.0
generator=self.generator,
).images[0]
return image
"""
#pipe = AutoPipelineForInpainting.from_pretrained("diffusers/stable-diffusion-xl-1.0-inpainting-0.1", torch_dtype=torch.float16, variant="fp16").to("cuda")
"""
# run inference pipeline
out = self.pipe(prompt=prompt, negative_prompt=negative_prompt, image=image, mask_image=mask_image, num_inference_steps=num_inference_steps, guidance_scale=guidance_scale)
print("1st pipeline part successful!")
image = out.images[0].resize((1024, 1024))
print("image resizing successful!")
image = self.pipe2(
prompt=prompt,
negative_prompt=negative_prompt,
image=image,
mask_image=mask_image,
guidance_scale=guidance_scale, #8.0
num_inference_steps=int(num_inference_steps/10), #100
strength=strength, #0.2
output_type="latent", # let's keep in latent to save some VRAM
).images[0]
print("2nd pipeline part successful!")
image2 = self.pipe3(
prompt=prompt,
image=image,
guidance_scale=guidance_scale, #8.0
num_inference_steps=int(num_inference_steps/10), #100
strength=strength, #0.2
).images[0]
print("3rd pipeline part successful!")
# return first generate PIL image
return image2
"""
control_image = self.make_inpaint_condition(image, mask_image)
# generate image
image = self.pipe(
prompt=prompt,
negative_prompt=negative_prompt,
num_inference_steps=num_inference_steps,
eta=1.0,
image=image,
mask_image=mask_image,
control_image=control_image,
guidance_scale=guidance_scale,
strength=strength
).images[0]
return image
# helper to decode input image
def decode_base64_image(self, image_string):
base64_image = base64.b64decode(image_string)
buffer = BytesIO(base64_image)
image = Image.open(buffer)
return image
def make_inpaint_condition(self, image, image_mask):
image = np.array(image.convert("RGB")).astype(np.float32) / 255.0
image_mask = np.array(image_mask.convert("L")).astype(np.float32) / 255.0
assert image.shape[0:1] == image_mask.shape[0:1], "image and image_mask must have the same image size"
image[image_mask > 0.5] = -1.0 # set as masked pixel
image = np.expand_dims(image, 0).transpose(0, 3, 1, 2)
image = torch.from_numpy(image)
return image