File size: 3,112 Bytes
4efc065
 
0129d6f
4efc065
 
 
 
 
 
 
 
 
 
 
 
 
 
0129d6f
4efc065
 
 
 
 
 
 
 
 
0129d6f
 
 
 
 
 
4efc065
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0129d6f
75d29d6
4efc065
 
 
0129d6f
 
 
75d29d6
0129d6f
9d60b6e
0129d6f
 
 
 
 
 
 
 
 
75d29d6
0129d6f
9d60b6e
0129d6f
 
 
 
 
 
4efc065
 
0129d6f
4efc065
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
from typing import  Dict, List, Any
import torch
from diffusers import DPMSolverMultistepScheduler, StableDiffusionInpaintPipeline, AutoPipelineForInpainting, AutoPipelineForImage2Image
from PIL import Image
import base64
from io import BytesIO


# set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

if device.type != 'cuda':
    raise ValueError("need to run on GPU")

class EndpointHandler():
    def __init__(self, path=""):
        # load StableDiffusionInpaintPipeline pipeline
        self.pipe = AutoPipelineForInpainting.from_pretrained(
            "runwayml/stable-diffusion-inpainting",
            revision="fp16",
            torch_dtype=torch.float16,
        )
        # use DPMSolverMultistepScheduler
        self.pipe.scheduler = DPMSolverMultistepScheduler.from_config(self.pipe.scheduler.config)
        # move to device        
        self.pipe = self.pipe.to(device)

        self.pipe2 = AutoPipelineForInpainting.from_pretrained("stabilityai/stable-diffusion-xl-refiner-1.0", torch_dtype=torch.float16, variant="fp16", use_safetensors=True)
        self.pipe2.to("cuda")

        self.pipe3 = AutoPipelineForImage2Image.from_pipe(self.pipe2)
        


    def __call__(self, data: Any) -> List[List[Dict[str, float]]]:
        """
        :param data: A dictionary contains `inputs` and optional `image` field.
        :return: A dictionary with `image` field contains image in base64.
        """
        encoded_image = data.pop("image", None)
        encoded_mask_image = data.pop("mask_image", None)
        
        prompt = data.pop("prompt", "")
        
        # process image
        if encoded_image is not None and encoded_mask_image is not None:
            image = self.decode_base64_image(encoded_image)
            mask_image = self.decode_base64_image(encoded_mask_image)
        else:
            image = None
            mask_image = None 

        self.pipe.enable_xformers_memory_efficient_attention()
        
        # run inference pipeline
        out = self.pipe(prompt=prompt, image=image, mask_image=mask_image)

        image = out.images[0].resize((1024, 1024))

        self.pipe2.enable_xformers_memory_efficient_attention()

        image = self.pipe2(
            prompt=prompt,
            image=image,
            mask_image=mask_image,
            guidance_scale=8.0,
            num_inference_steps=100,
            strength=0.2,
            output_type="latent",  # let's keep in latent to save some VRAM
        ).images[0]
        
        self.pipe3.enable_xformers_memory_efficient_attention()
        
        image = self.pipe3(
            prompt=prompt,
            image=image,
            guidance_scale=8.0,
            num_inference_steps=100,
            strength=0.2,
        ).images[0]
            
        # return first generate PIL image
        return image
    
    # helper to decode input image
    def decode_base64_image(self, image_string):
        base64_image = base64.b64decode(image_string)
        buffer = BytesIO(base64_image)
        image = Image.open(buffer)
        return image