File size: 3,771 Bytes
ca25718
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import os

import clip
import pytorch_lightning as pl
import torch
import torch.nn as nn
import torch.nn.functional as F

from rewards.base_reward import BaseRewardLoss


class AestheticLoss(BaseRewardLoss):
    """CLIP reward loss function for optimization."""

    def __init__(
        self,
        weigthing: float,
        dtype: torch.dtype,
        device: torch.device,
        cache_dir: str,
        memsave: bool = False,
    ):
        self.clip_model, self.preprocess_fn = clip.load(
            "ViT-L/14", device=device, download_root=cache_dir
        )
        self.clip_model = self.clip_model.to(device, dtype=dtype)
        self.mlp = MLP(768).to(device, dtype=dtype)
        s = torch.load(
            f"{os.getcwd()}/ckpts/aesthetic-model.pth"
        )  # load the model you trained previously or the model available in this repo
        self.mlp.load_state_dict(s)
        self.clip_model.eval()
        if memsave:
            import memsave_torch.nn

            self.mlp = memsave_torch.nn.convert_to_memory_saving(self.mlp)
            self.clip_model = memsave_torch.nn.convert_to_memory_saving(
                self.clip_model
            ).to(device, dtype=dtype)

        self.freeze_parameters(self.clip_model.parameters())
        self.freeze_parameters(self.mlp.parameters())
        super().__init__("Aesthetic", weigthing)

    def get_image_features(self, image: torch.Tensor) -> torch.Tensor:
        with torch.autocast("cuda"):
            clip_img_features = self.clip_model.encode_image(image)
            l2 = torch.norm(clip_img_features, p=2, dim=-1, keepdim=True)
            l2 = torch.where(
                l2 == 0,
                torch.tensor(
                    1.0, device=clip_img_features.device, dtype=clip_img_features.dtype
                ),
                l2,
            )
            clip_img_features = clip_img_features / l2
        return clip_img_features

    def get_text_features(self, prompt: str) -> torch.Tensor:
        return None

    def compute_loss(
        self, image_features: torch.Tensor, text_features: torch.Tensor
    ) -> torch.Tensor:
        return None

    def __call__(self, image: torch.Tensor, prompt: torch.Tensor) -> torch.Tensor:
        if self.memsave:
            image = image.to(torch.float32)
        image_features = self.get_image_features(image)

        image_features_normed = self.process_features(image_features.to(torch.float16))

        aesthetic_loss = 10.0 - self.mlp(image_features_normed).mean()
        return aesthetic_loss


class MLP(pl.LightningModule):
    def __init__(self, input_size, xcol="emb", ycol="avg_rating"):
        super().__init__()
        self.input_size = input_size
        self.xcol = xcol
        self.ycol = ycol
        self.layers = nn.Sequential(
            nn.Linear(self.input_size, 1024),
            # nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(1024, 128),
            # nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, 64),
            # nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(64, 16),
            # nn.ReLU(),
            nn.Linear(16, 1),
        )

    def forward(self, x):
        return self.layers(x)

    def training_step(self, batch, batch_idx):
        x = batch[self.xcol]
        y = batch[self.ycol].reshape(-1, 1)
        x_hat = self.layers(x)
        loss = F.mse_loss(x_hat, y)
        return loss

    def validation_step(self, batch, batch_idx):
        x = batch[self.xcol]
        y = batch[self.ycol].reshape(-1, 1)
        x_hat = self.layers(x)
        loss = F.mse_loss(x_hat, y)
        return loss

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=1e-3)
        return optimizer