File size: 6,596 Bytes
4fb3c5e
 
 
 
cb229bd
4fb3c5e
af2a8f5
4fb3c5e
af2a8f5
cb229bd
4fb3c5e
 
af2a8f5
6f3a230
 
4fb3c5e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cb229bd
4fb3c5e
80b7378
 
6f3a230
4fb3c5e
 
 
 
 
 
 
bbe671a
 
4fb3c5e
 
 
bbe671a
 
4fb3c5e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6f3a230
4fb3c5e
 
 
cb229bd
 
 
 
4fb3c5e
 
 
 
 
cb229bd
 
4fb3c5e
cb229bd
4fb3c5e
cb229bd
4fb3c5e
 
 
 
 
 
af2a8f5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4fb3c5e
 
 
26f3e39
 
af2a8f5
 
 
 
 
 
cb229bd
 
 
 
 
 
 
eab3b8e
cb229bd
b6c80e7
 
cb229bd
 
 
 
0dc864d
cb229bd
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
from __future__ import annotations

import logging
import os
import random
import sys
import tempfile

import imageio
import numpy as np
import PIL.Image
import torch
import tqdm.auto
from diffusers import (DDIMPipeline, DDIMScheduler, DDPMPipeline,
                       DiffusionPipeline, PNDMPipeline, PNDMScheduler)

HF_TOKEN = os.environ['HF_TOKEN']

formatter = logging.Formatter(
    '[%(asctime)s] %(name)s %(levelname)s: %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S')
stream_handler = logging.StreamHandler(stream=sys.stdout)
stream_handler.setLevel(logging.INFO)
stream_handler.setFormatter(formatter)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.propagate = False
logger.addHandler(stream_handler)


class Model:

    MODEL_NAMES = [
        'ddpm-128-exp000',
    ]

    def __init__(self, device: str | torch.device):
        self.device = torch.device(device)
        self._download_all_models()

        self.model_name = self.MODEL_NAMES[0]
        self.scheduler_type = 'DDIM'
        self.pipeline = self._load_pipeline(self.model_name,
                                            self.scheduler_type)
        self.rng = random.Random()

    @staticmethod
    def _load_pipeline(model_name: str,
                       scheduler_type: str) -> DiffusionPipeline:
        repo_id = f'hysts/diffusers-anime-faces-{model_name}'
        if scheduler_type == 'DDPM':
            pipeline = DDPMPipeline.from_pretrained(repo_id,
                                                    use_auth_token=HF_TOKEN)
        elif scheduler_type == 'DDIM':
            pipeline = DDIMPipeline.from_pretrained(repo_id,
                                                    use_auth_token=HF_TOKEN)
            pipeline.scheduler = DDIMScheduler.from_config(
                repo_id, subfolder='scheduler', use_auth_token=HF_TOKEN)
        elif scheduler_type == 'PNDM':
            pipeline = PNDMPipeline.from_pretrained(repo_id,
                                                    use_auth_token=HF_TOKEN)
            pipeline.scheduler = PNDMScheduler.from_config(
                repo_id, subfolder='scheduler', use_auth_token=HF_TOKEN)
        else:
            raise ValueError
        return pipeline

    def set_pipeline(self, model_name: str, scheduler_type: str) -> None:
        logger.info('--- set_pipeline ---')
        logger.info(f'{model_name=}, {scheduler_type=}')

        if model_name == self.model_name and scheduler_type == self.scheduler_type:
            logger.info('Skipping')
            logger.info('--- done ---')
            return
        self.model_name = model_name
        self.scheduler_type = scheduler_type
        self.pipeline = self._load_pipeline(model_name, scheduler_type)

        logger.info('--- done ---')

    def _download_all_models(self) -> None:
        for name in self.MODEL_NAMES:
            self._load_pipeline(name, 'DDPM')

    def generate(self,
                 seed: int,
                 num_steps: int,
                 num_images: int = 1) -> list[PIL.Image.Image]:
        logger.info('--- generate ---')
        logger.info(f'{seed=}, {num_steps=}')

        torch.manual_seed(seed)
        if self.scheduler_type == 'DDPM':
            res = self.pipeline(batch_size=num_images,
                                torch_device=self.device)['sample']
        elif self.scheduler_type in ['DDIM', 'PNDM']:
            res = self.pipeline(batch_size=num_images,
                                torch_device=self.device,
                                num_inference_steps=num_steps)['sample']
        else:
            raise ValueError

        logger.info('--- done ---')
        return res

    @staticmethod
    def postprocess(sample: torch.Tensor) -> np.ndarray:
        res = (sample / 2 + 0.5).clamp(0, 1)
        res = (res * 255).to(torch.uint8)
        res = res.cpu().permute(0, 2, 3, 1).numpy()
        return res

    @torch.inference_mode()
    def generate_with_video(self, seed: int,
                            num_steps: int) -> tuple[PIL.Image.Image, str]:
        logger.info('--- generate_with_video ---')
        if self.scheduler_type == 'DDPM':
            num_steps = 1000
            fps = 100
        else:
            fps = 10
        logger.info(f'{seed=}, {num_steps=}')

        model = self.pipeline.unet.to(self.device)
        scheduler = self.pipeline.scheduler
        scheduler.set_timesteps(num_inference_steps=num_steps)
        input_shape = (1, model.config.in_channels, model.config.sample_size,
                       model.config.sample_size)
        torch.manual_seed(seed)

        out_file = tempfile.NamedTemporaryFile(suffix='.mp4', delete=False)
        writer = imageio.get_writer(out_file.name, fps=fps)
        sample = torch.randn(input_shape).to(self.device)
        for t in tqdm.auto.tqdm(scheduler.timesteps):
            out = model(sample, t)['sample']
            sample = scheduler.step(out, t, sample)['prev_sample']
            res = self.postprocess(sample)[0]
            writer.append_data(res)
        writer.close()

        logger.info('--- done ---')
        return res, out_file.name

    def run(self, model_name: str, scheduler_type: str, num_steps: int,
            randomize_seed: bool, seed: int, visualize_denoising: bool
            ) -> tuple[PIL.Image.Image, int, str | None]:
        self.set_pipeline(model_name, scheduler_type)
        if scheduler_type == 'PNDM':
            num_steps = max(4, min(num_steps, 100))
        if randomize_seed:
            seed = self.rng.randint(0, 100000)

        if not visualize_denoising:
            return self.generate(seed, num_steps)[0], seed, None
        else:
            res, filename = self.generate_with_video(seed, num_steps)
            return res, seed, filename

    @staticmethod
    def to_grid(images: list[PIL.Image.Image],
                ncols: int = 2) -> PIL.Image.Image:
        images = [np.asarray(image) for image in images]
        nrows = (len(images) + ncols - 1) // ncols
        h, w = images[0].shape[:2]
        if (d := nrows * ncols - len(images)) > 0:
            images += [np.full((h, w, 3), 255, dtype=np.uint8)] * d
        grid = np.asarray(images).reshape(nrows, ncols, h, w, 3).transpose(
            0, 2, 1, 3, 4).reshape(nrows * h, ncols * w, 3)
        return PIL.Image.fromarray(grid)

    def run_simple(self) -> PIL.Image.Image:
        self.set_pipeline(self.MODEL_NAMES[0], 'DDIM')
        seed = self.rng.randint(0, 1000000)
        images = self.generate(seed, num_steps=10, num_images=4)
        return self.to_grid(images, 2)