File size: 9,586 Bytes
2a7e546
d751051
88deab4
2a7e546
d751051
 
 
 
 
 
 
2a7e546
 
d751051
2a7e546
d751051
 
 
 
 
 
 
 
 
 
 
88deab4
 
 
 
04d70cd
d751051
f6f5f48
2a7e546
d751051
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
88deab4
 
d751051
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
04d70cd
88deab4
 
 
 
 
 
d751051
 
 
 
88deab4
d751051
 
88deab4
d751051
 
 
88deab4
d751051
 
 
 
 
 
88deab4
 
 
 
 
 
d751051
 
 
 
 
 
 
 
 
2a7e546
d751051
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
from diffusers import DiffusionPipeline
from diffusers import DDPMPipeline
from diffusers import DDPMScheduler, UNet2DConditionModel
import torch
import torchvision.transforms as T
from PIL import Image
from transformers import AutoTokenizer
from datasets import load_dataset
import numpy as np
import pandas as pd
from tqdm.auto import tqdm

class RCTDiffusionPipeline(DiffusionPipeline):
    def __init__(self):
        super().__init__()

        # dictionnary that keeps the different classes of object description, color1, color2 and color3
        self.object_description_dict = {}
        self.color1_dict = {}
        self.color2_dict = {}
        self.color3_dict = {}
        self.load_dictionaries_from_dataset()

        self.scheduler = DDPMScheduler()

        # the number of hidden features is dependant on the loaded dictionaries!
        hidden_dim = self.get_class_labels_size()
        self.unet = UNet2DConditionModel(sample_size=256, in_channels=12, out_channels=12, \
                        down_block_types=('CrossAttnDownBlock2D', 'CrossAttnDownBlock2D', 'DownBlock2D'),\
                              up_block_types=('UpBlock2D', 'CrossAttnUpBlock2D', 'CrossAttnUpBlock2D'), cross_attention_dim=160,
                            block_out_channels=(64, 128, 256), norm_num_groups=32)

        self.unet.to(dtype=torch.float16)
    
    def load_dictionaries_from_dataset(self):
        dataset = load_dataset('frutiemax/rct_dataset')
        dataset = dataset['train']

        for row in dataset:
            if not row['object_description'] in self.object_description_dict:
                self.object_description_dict[row['object_description']] = len(self.object_description_dict)
            if not row['color1'] in self.color1_dict and row['color1'] != 'none':
                self.color1_dict[row['color1']] = len(self.color1_dict)
            if not row['color2'] in self.color2_dict and row['color2'] != 'none':
                self.color2_dict[row['color2']] = len(self.color2_dict)
            if not row['color3'] in self.color3_dict and row['color3'] != 'none':
                self.color3_dict[row['color3']] = len(self.color3_dict)
    
    # helper functions to know the classes
    def print_class_tokens_to_csv(self):
        object_descriptions = pd.DataFrame(self.object_description_dict.items())
        object_descriptions.to_csv('object_descriptions_tokens.csv')

        color1 = pd.DataFrame(self.color1_dict.items())
        color1.to_csv('color1_tokens.csv')

        color2 = pd.DataFrame(self.color2_dict.items())
        color2.to_csv('color2_tokens.csv')

        color3 = pd.DataFrame(self.color3_dict.items())
        color3.to_csv('color3_tokens.csv')
    
    # helper functions to build weight tables
    def get_object_description_weights(self, classifiers : list[tuple[str, float]]) -> np.array:
        result = np.zeros(len(self.object_description_dict.items()))

        for classifier in classifiers:
            id, weight = classifier
            if id in self.object_description_dict:
                weight_index = self.object_description_dict[id]
                result[weight_index] = weight
        return result
    
    def get_color1_weights(self, classifiers : list[tuple[str, float]]) -> np.array:
        result = np.zeros(len(self.color1_dict.items()))

        for classifier in classifiers:
            id, weight = classifier
            if id in self.color1_dict:
                weight_index = self.color1_dict[id]
                result[weight_index] = weight
        return result

    def get_color2_weights(self, classifiers : list[tuple[str, float]]) -> np.array:
        result = np.zeros(len(self.color2_dict.items()))

        for classifier in classifiers:
            id, weight = classifier
            if id in self.color2_dict:
                weight_index = self.color2_dict[id]
                result[weight_index] = weight
        return result

    def get_color3_weights(self, classifiers : list[tuple[str, float]]) -> np.array:
        result = np.zeros(len(self.color3_dict.items()))

        for classifier in classifiers:
            id, weight = classifier
            if id in self.color3_dict:
                weight_index = self.color3_dict[id]
                result[weight_index] = weight
        return result
    
    def get_class_labels_size(self):
        return len(self.object_description_dict.items()) + len(self.color1_dict.items()) + len(self.color2_dict.items()) + len(self.color3_dict.items())

    def pack_labels_to_tensor(self, num_images, object_descriptions : np.array, colors1: np.array, colors2 : np.array, colors3 : np.array) -> torch.Tensor:
        num_labels = self.get_class_labels_size()
        class_labels = torch.Tensor(size=(num_images, num_labels))

        for batch_index in range(num_images):
            offset = 0
            class_labels[batch_index, offset:offset + len(self.object_description_dict)] = torch.from_numpy(object_descriptions[batch_index])

            offset += len(self.object_description_dict.items())
            class_labels[batch_index, offset:offset + len(self.color1_dict)] = torch.from_numpy(colors1[batch_index])

            offset += len(self.color1_dict.items())
            class_labels[batch_index, offset:offset + len(self.color2_dict)] = torch.from_numpy(colors2[batch_index])

            offset += len(self.color2_dict.items())
            class_labels[batch_index, offset:offset + len(self.color3_dict)] = torch.from_numpy(colors3[batch_index])
        
        class_labels = torch.reshape(class_labels, (num_images, 1, self.get_class_labels_size()))
        return class_labels
        
    def __call__(self, object_description : list[list[tuple[str, float]]], color1 : list[list[tuple[str, float]]], \
                color2 : list[list[tuple[str, float]]] = None, color3 : list[list[tuple[str, float]]] = None, \
                 batch_size=1, num_inference_steps=20, generator=torch.manual_seed(torch.random.seed())):
        
        # check if the labels are the correct size
        if len(object_description) != batch_size:
            return None
        
        if len(color1) != batch_size:
            return None
        
        if color2 != None and len(color2) != batch_size:
            return None
        
        if color3 != None and len(color3) != batch_size:
            return None
        
        # ok build the labels for each batch
        object_descriptions = []
        colors1 = []
        colors2 = []
        colors3 = []

        for batch_index in range(batch_size):
            obj_desc = self.get_object_description_weights(object_description[batch_index])
            c1 = self.get_color1_weights(color1[batch_index])

            if color2 != None:
                c2 = self.get_color2_weights(color2[batch_index])
            else:
                c2 = self.get_color2_weights([])
            
            if color3 != None:
                c3 = self.get_color3_weights(color3[batch_index])
            else:
                c3 = self.get_color3_weights([])

            object_descriptions.append(obj_desc)
            colors1.append(c1)
            colors2.append(c2)
            colors3.append(c3)

        # now put those weights into a tensor
        class_labels = self.pack_labels_to_tensor(batch_size, object_descriptions, colors1, colors2, colors3).to(device='cuda',dtype=torch.float16)

        # we need those class labels for the 12 channels
        #new_class_labels = torch.Tensor(size=(batch_size, 12, self.get_class_labels_size()))
        #new_class_labels[:, :] = class_labels
        #class_labels = new_class_labels.to(device='cuda', dtype=torch.float16)
        #del new_class_labels

        # set the inference steps
        self.scheduler.set_timesteps(num_inference_steps)

        noise_batches = torch.Tensor(size=(batch_size, 4, 3, 256, 256)).to(dtype=torch.float16, device='cuda')
        for batch_index in range(batch_size):
            for view_index in range(4):
                noise = torch.randn(3, 256, 256).to(dtype=torch.float16, device='cuda')
                noise_batches[batch_index, view_index] = noise

        # reshape the data so it's (batch_size, 12, 256, 256)
        noise_batches = torch.reshape(noise_batches, (batch_size, 1, 12, 256, 256)).to(dtype=torch.float16, device='cuda')

        # now call the model for the n interations
        progress_bar = tqdm(total=num_inference_steps)
        epoch = 0
        for t in self.scheduler.timesteps:
            progress_bar.set_description(f'Inference step {epoch}')

            for batch_index in range(batch_size):
                with torch.no_grad():
                    noise_residual = self.unet(noise_batches[batch_index], t, encoder_hidden_states=class_labels).sample
                previous_noisy_sample = self.scheduler.step(noise_residual, t, noise_batches[batch_index]).prev_sample
                noise_batches[batch_index] = previous_noisy_sample
            progress_bar.update(1)
            epoch = epoch + 1

        # reshape the data so we get back 4 RGB images
        noise_batches = torch.reshape(noise_batches, (batch_size, 4, 3, 256, 256)).to('cpu')

        # convert those tensors to PIL images
        output_images = []
        tensor_to_pil = T.ToPILImage('RGB')

        for batch_index in range(batch_size):
            for image_index in range(4):
                output_images.append(tensor_to_pil(noise_batches[batch_index, image_index]))
        
        # for now just return the images
        return output_images