File size: 3,507 Bytes
db7a329
 
4ccc92a
db7a329
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4ccc92a
db7a329
 
 
2671954
 
db7a329
 
2671954
 
 
4ccc92a
bba78d3
2671954
 
 
 
 
 
 
 
 
 
 
 
db7a329
 
 
 
 
 
2671954
 
 
 
 
db7a329
 
2671954
db7a329
 
 
2671954
db7a329
 
 
2671954
 
 
 
 
 
 
 
 
 
 
 
 
 
b034b83
2671954
 
 
 
 
 
 
 
7aa9aed
 
 
 
 
db7a329
7aa9aed
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
from typing import Dict, List, Any
import torch
from diffusers import DPMSolverMultistepScheduler, DiffusionPipeline
from PIL import Image
import base64
from io import BytesIO


# set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

if device.type != "cuda":
    raise ValueError("need to run on GPU")


class EndpointHandler:
    def __init__(self, path=""):
        # load StableDiffusionInpaintPipeline pipeline
        self.base = DiffusionPipeline.from_pretrained(
            path, torch_dtype=torch.float16, variant="fp16", use_safetensors=True
        )
        # use DPMSolverMultistepScheduler
        self.base.scheduler = DPMSolverMultistepScheduler.from_config(
            self.base.scheduler.config
        )
        # move to device
        self.base = self.base.to(device)
        self.base.unet = torch.compile(self.base.unet, mode="reduce-overhead", fullgraph=True)

        self.refiner = DiffusionPipeline.from_pretrained(
            "stabilityai/stable-diffusion-xl-refiner-1.0",
            text_encoder_2=self.base.text_encoder_2,
            vae=self.base.vae,
            torch_dtype=torch.float16,
            use_safetensors=True,
            variant="fp16",
        )
        # use DPMSolverMultistepScheduler
        self.refiner.scheduler = DPMSolverMultistepScheduler.from_config(
            self.refiner.scheduler.config
        )        
        self.refiner = self.refiner.to(device)        
        self.refiner.unet = torch.compile(self.refiner.unet, mode="reduce-overhead", fullgraph=True)

    def __call__(self, data: Any) -> List[List[Dict[str, float]]]:
        """
        :param data: A dictionary contains `inputs` and optional `image` field.
        :return: A dictionary with `image` field contains image in base64.
        """
        prompt = data.pop("inputs", None)

        if prompt is None:
            return {"error": "Please provide a prompt"}
        

        # hyperparamters
        use_refiner = True if data.pop("use_refiner", False) else False
        num_inference_steps = data.pop("num_inference_steps", 30)
        guidance_scale = data.pop("guidance_scale", 8)
        negative_prompt = data.pop("negative_prompt", None)
        high_noise_frac = data.pop("high_noise_frac", 0.8)
        height = data.pop("height", None)
        width = data.pop("width", None)

        if use_refiner:
            image = self.base(
                prompt=prompt,
                num_inference_steps=num_inference_steps,
                denoising_end=high_noise_frac,
                output_type="latent",
            ).images
            out = self.refiner(
                prompt=prompt,
                num_inference_steps=num_inference_steps,
                denoising_start=high_noise_frac,
                image=image,
            )
        else:
            out = self.base(
                prompt,
                num_inference_steps=num_inference_steps,
                guidance_scale=guidance_scale,
                num_images_per_prompt=1,
                negative_prompt=negative_prompt,
                height=height,
                width=width,
            )
            
        # encode image as base 64
        buffered = BytesIO()
        out.images[0].save(buffered, format="JPEG")
        img_str = base64.b64encode(buffered.getvalue())

        # postprocess the prediction
        return {"image": img_str.decode()}        

        # # return first generate PIL image
        # return out.images[0]