from typing import Dict, List, Any import torch from diffusers import DPMSolverMultistepScheduler, DiffusionPipeline, StableDiffusionImg2ImgPipeline, StableDiffusionInpaintPipelineLegacy from PIL import Image import base64 from io import BytesIO # set device device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') class EndpointHandler(): def __init__(self, path=""): # load StableDiffusionInpaintPipeline pipeline self.txt2img_pipe = DiffusionPipeline.from_pretrained(path, torch_dtype=torch.float16) # Set safety_checker self.txt2img_pipe.safety_checker = None # use DPMSolverMultistepScheduler self.txt2img_pipe.scheduler = DPMSolverMultistepScheduler.from_config(self.txt2img_pipe.scheduler.config) self.img2img_pipe = StableDiffusionImg2ImgPipeline( vae=self.txt2img_pipe.vae, text_encoder=self.txt2img_pipe.text_encoder, tokenizer=self.txt2img_pipe.tokenizer, unet=self.txt2img_pipe.unet, scheduler=self.txt2img_pipe.scheduler, safety_checker=self.txt2img_pipe.safety_checker, feature_extractor=self.txt2img_pipe.feature_extractor, ).to(device) self.inpaint_pipe = StableDiffusionInpaintPipelineLegacy( vae=self.txt2img_pipe.vae, text_encoder=self.txt2img_pipe.text_encoder, tokenizer=self.txt2img_pipe.tokenizer, unet=self.txt2img_pipe.unet, scheduler=self.txt2img_pipe.scheduler, safety_checker=self.txt2img_pipe.safety_checker, feature_extractor=self.txt2img_pipe.feature_extractor, ).to(device) def __call__(self, data: Any) -> List[List[Dict[str, float]]]: """ :param data: A dictionary contains `inputs` and optional `image` field. :return: A dictionary with `image` field contains image in base64. """ inputs = data.pop("inputs", data) encoded_image = data.pop("image", None) encoded_mask_image = data.pop("mask_image", None) # hyperparamters num_inference_steps = data.pop("num_inference_steps", 25) guidance_scale = data.pop("guidance_scale", 7.5) negative_prompt = data.pop("negative_prompt", None) height = data.pop("height", 512) width = data.pop("width", 512) strength = data.pop("strength", 0.8) # run inference pipeline if encoded_image is not None and encoded_mask_image is not None: image = self.decode_base64_image(encoded_image) mask_image = self.decode_base64_image(encoded_mask_image) out = self.inpaint_pipe(inputs, init_image=image, mask_image=mask_image, strength=strength, num_inference_steps=num_inference_steps, guidance_scale=guidance_scale, num_images_per_prompt=1, negative_prompt=negative_prompt ) return out.images[0] elif encoded_image is not None: image = self.decode_base64_image(encoded_image) out = self.img2img_pipe(inputs, init_image=image, strength=strength, num_inference_steps=num_inference_steps, guidance_scale=guidance_scale, num_images_per_prompt=1, negative_prompt=negative_prompt ) return out.images[0] else: out = self.txt2img_pipe(inputs, num_inference_steps=num_inference_steps, guidance_scale=guidance_scale, num_images_per_prompt=1, negative_prompt=negative_prompt, height=height, width=width ) # return first generate PIL image return out.images[0] # helper to decode input image def decode_base64_image(self, image_string): base64_image = base64.b64decode(image_string) buffer = BytesIO(base64_image) image = Image.open(buffer) return image