File size: 3,852 Bytes
6bcacf9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
from typing import Any
from diffusers import DiffusionPipeline
from os import path
import torch
from backend.models.lcmdiffusion_setting import LCMDiffusionSetting
import numpy as np
from constants import DEVICE


if DEVICE == "cpu":
    from backend.lcmdiffusion.pipelines.openvino.lcm_ov_pipeline import (
        OVLatentConsistencyModelPipeline,
    )
    from backend.lcmdiffusion.pipelines.openvino.lcm_scheduler import (
        LCMScheduler,
    )


class LCMTextToImage:
    def __init__(
        self,
        device: str = "cpu",
    ) -> None:
        self.pipeline = None
        self.use_openvino = False
        self.device = None
        self.previous_model_id = None

    def _get_lcm_diffusion_pipeline_path(self) -> str:
        script_path = path.dirname(path.abspath(__file__))
        file_path = path.join(
            script_path,
            "lcmdiffusion",
            "pipelines",
            "latent_consistency_txt2img.py",
        )
        return file_path

    def init(
        self,
        model_id: str,
        use_openvino: bool = False,
        device: str = "cpu",
        use_local_model: bool = False,
    ) -> None:
        self.device = device
        self.use_openvino = use_openvino
        if self.pipeline is None or self.previous_model_id != model_id:
            if self.use_openvino and DEVICE == "cpu":
                if self.pipeline:
                    del self.pipeline
                scheduler = LCMScheduler.from_pretrained(
                    model_id,
                    subfolder="scheduler",
                )
                self.pipeline = OVLatentConsistencyModelPipeline.from_pretrained(
                    model_id,
                    scheduler=scheduler,
                    compile=False,
                    local_files_only=use_local_model,
                )
            else:
                if self.pipeline:
                    del self.pipeline

                self.pipeline = DiffusionPipeline.from_pretrained(
                    model_id,
                    custom_pipeline=self._get_lcm_diffusion_pipeline_path(),
                    custom_revision="main",
                    local_files_only=use_local_model,
                )
                self.pipeline.to(
                    torch_device=self.device,
                    torch_dtype=torch.float32,
                )
            self.previous_model_id = model_id

    def generate(
        self,
        lcm_diffusion_setting: LCMDiffusionSetting,
        reshape: bool = False,
    ) -> Any:
        if lcm_diffusion_setting.use_seed:
            cur_seed = lcm_diffusion_setting.seed
            if self.use_openvino:
                np.random.seed(cur_seed)
            else:
                torch.manual_seed(cur_seed)

        if self.use_openvino and DEVICE == "cpu":
            print("Using OpenVINO")
            if reshape:
                print("Reshape and compile")
                self.pipeline.reshape(
                    batch_size=1,
                    height=lcm_diffusion_setting.image_height,
                    width=lcm_diffusion_setting.image_width,
                    num_images_per_prompt=lcm_diffusion_setting.number_of_images,
                )
                self.pipeline.compile()

        if not lcm_diffusion_setting.use_safety_checker:
            self.pipeline.safety_checker = None

        result_images = self.pipeline(
            prompt=lcm_diffusion_setting.prompt,
            num_inference_steps=lcm_diffusion_setting.inference_steps,
            guidance_scale=lcm_diffusion_setting.guidance_scale,
            width=lcm_diffusion_setting.image_width,
            height=lcm_diffusion_setting.image_height,
            output_type="pil",
            num_images_per_prompt=lcm_diffusion_setting.number_of_images,
        ).images

        return result_images