# Copyright (c) 2023 Amphion. # # This source code is licensed under the MIT license found in the # LICENSE file in the root directory of this source tree. import torch import torch.nn as nn import functools import torch.nn.functional as F def hinge_d_loss(logits_real, logits_fake): loss_real = torch.mean(F.relu(1.0 - logits_real)) loss_fake = torch.mean(F.relu(1.0 + logits_fake)) d_loss = 0.5 * (loss_real + loss_fake) return d_loss def vanilla_d_loss(logits_real, logits_fake): d_loss = 0.5 * ( torch.mean(F.softplus(-logits_real)) + torch.mean(F.softplus(logits_fake)) ) return d_loss def adopt_weight(weight, global_step, threshold=0, value=0.0): if global_step < threshold: weight = value return weight class ActNorm(nn.Module): def __init__( self, num_features, logdet=False, affine=True, allow_reverse_init=False ): assert affine super().__init__() self.logdet = logdet self.loc = nn.Parameter(torch.zeros(1, num_features, 1, 1)) self.scale = nn.Parameter(torch.ones(1, num_features, 1, 1)) self.allow_reverse_init = allow_reverse_init self.register_buffer("initialized", torch.tensor(0, dtype=torch.uint8)) def initialize(self, input): with torch.no_grad(): flatten = input.permute(1, 0, 2, 3).contiguous().view(input.shape[1], -1) mean = ( flatten.mean(1) .unsqueeze(1) .unsqueeze(2) .unsqueeze(3) .permute(1, 0, 2, 3) ) std = ( flatten.std(1) .unsqueeze(1) .unsqueeze(2) .unsqueeze(3) .permute(1, 0, 2, 3) ) self.loc.data.copy_(-mean) self.scale.data.copy_(1 / (std + 1e-6)) def forward(self, input, reverse=False): if reverse: return self.reverse(input) if len(input.shape) == 2: input = input[:, :, None, None] squeeze = True else: squeeze = False _, _, height, width = input.shape if self.training and self.initialized.item() == 0: self.initialize(input) self.initialized.fill_(1) h = self.scale * (input + self.loc) if squeeze: h = h.squeeze(-1).squeeze(-1) if self.logdet: log_abs = torch.log(torch.abs(self.scale)) logdet = height * width * torch.sum(log_abs) logdet = logdet * torch.ones(input.shape[0]).to(input) return h, logdet return h def reverse(self, output): if self.training and self.initialized.item() == 0: if not self.allow_reverse_init: raise RuntimeError( "Initializing ActNorm in reverse direction is " "disabled by default. Use allow_reverse_init=True to enable." ) else: self.initialize(output) self.initialized.fill_(1) if len(output.shape) == 2: output = output[:, :, None, None] squeeze = True else: squeeze = False h = output / self.scale - self.loc if squeeze: h = h.squeeze(-1).squeeze(-1) return h def weights_init(m): classname = m.__class__.__name__ if classname.find("Conv") != -1: nn.init.normal_(m.weight.data, 0.0, 0.02) elif classname.find("BatchNorm") != -1: nn.init.normal_(m.weight.data, 1.0, 0.02) nn.init.constant_(m.bias.data, 0) class NLayerDiscriminator(nn.Module): """Defines a PatchGAN discriminator as in Pix2Pix --> see https://github.com/junyanz/pytorch-CycleGAN-and-pix2pix/blob/master/models/networks.py """ def __init__(self, input_nc=3, ndf=64, n_layers=3, use_actnorm=False): """Construct a PatchGAN discriminator Parameters: input_nc (int) -- the number of channels in input images ndf (int) -- the number of filters in the last conv layer n_layers (int) -- the number of conv layers in the discriminator norm_layer -- normalization layer """ super(NLayerDiscriminator, self).__init__() if not use_actnorm: norm_layer = nn.BatchNorm2d else: norm_layer = ActNorm if ( type(norm_layer) == functools.partial ): # no need to use bias as BatchNorm2d has affine parameters use_bias = norm_layer.func != nn.BatchNorm2d else: use_bias = norm_layer != nn.BatchNorm2d kw = 4 padw = 1 sequence = [ nn.Conv2d(input_nc, ndf, kernel_size=kw, stride=2, padding=padw), nn.LeakyReLU(0.2, True), ] nf_mult = 1 nf_mult_prev = 1 for n in range(1, n_layers): # gradually increase the number of filters nf_mult_prev = nf_mult nf_mult = min(2**n, 8) sequence += [ nn.Conv2d( ndf * nf_mult_prev, ndf * nf_mult, kernel_size=kw, stride=2, padding=padw, bias=use_bias, ), norm_layer(ndf * nf_mult), nn.LeakyReLU(0.2, True), ] nf_mult_prev = nf_mult nf_mult = min(2**n_layers, 8) sequence += [ nn.Conv2d( ndf * nf_mult_prev, ndf * nf_mult, kernel_size=kw, stride=1, padding=padw, bias=use_bias, ), norm_layer(ndf * nf_mult), nn.LeakyReLU(0.2, True), ] sequence += [ nn.Conv2d(ndf * nf_mult, 1, kernel_size=kw, stride=1, padding=padw) ] # output 1 channel prediction map self.main = nn.Sequential(*sequence) def forward(self, input): """Standard forward.""" return self.main(input) class AutoencoderLossWithDiscriminator(nn.Module): def __init__(self, cfg): super().__init__() self.cfg = cfg self.kl_weight = cfg.kl_weight self.logvar = nn.Parameter(torch.ones(size=()) * cfg.logvar_init) self.discriminator = NLayerDiscriminator( input_nc=cfg.disc_in_channels, n_layers=cfg.disc_num_layers, use_actnorm=cfg.use_actnorm, ).apply(weights_init) self.discriminator_iter_start = cfg.disc_start self.discriminator_weight = cfg.disc_weight self.disc_factor = cfg.disc_factor self.disc_loss = hinge_d_loss def calculate_adaptive_weight(self, nll_loss, g_loss, last_layer): nll_grads = torch.autograd.grad(nll_loss, last_layer, retain_graph=True)[0] g_grads = torch.autograd.grad(g_loss, last_layer, retain_graph=True)[0] d_weight = torch.norm(nll_grads) / (torch.norm(g_grads) + 1e-4) d_weight = torch.clamp( d_weight, self.cfg.min_adapt_d_weight, self.cfg.max_adapt_d_weight ).detach() d_weight = d_weight * self.discriminator_weight return d_weight def forward( self, inputs, reconstructions, posteriors, optimizer_idx, global_step, last_layer, split="train", weights=None, ): rec_loss = torch.abs( inputs.contiguous() - reconstructions.contiguous() ) # l1 loss nll_loss = rec_loss / torch.exp(self.logvar) + self.logvar weighted_nll_loss = nll_loss if weights is not None: weighted_nll_loss = weights * nll_loss # weighted_nll_loss = torch.sum(weighted_nll_loss) / weighted_nll_loss.shape[0] weighted_nll_loss = torch.mean(weighted_nll_loss) # nll_loss = torch.sum(nll_loss) / nll_loss.shape[0] nll_loss = torch.mean(nll_loss) kl_loss = posteriors.kl() kl_loss = torch.sum(kl_loss) / kl_loss.shape[0] # ? kl_loss = torch.mean(kl_loss) # now the GAN part if optimizer_idx == 0: logits_fake = self.discriminator(reconstructions.contiguous()) g_loss = -torch.mean(logits_fake) if self.disc_factor > 0.0: try: d_weight = self.calculate_adaptive_weight( nll_loss, g_loss, last_layer=last_layer ) except RuntimeError: assert not self.training d_weight = torch.tensor(0.0) else: d_weight = torch.tensor(0.0) disc_factor = adopt_weight( self.disc_factor, global_step, threshold=self.discriminator_iter_start ) total_loss = ( weighted_nll_loss + self.kl_weight * kl_loss + d_weight * disc_factor * g_loss ) return { "loss": total_loss, "kl_loss": kl_loss, "rec_loss": rec_loss.mean(), "nll_loss": nll_loss, "g_loss": g_loss, "d_weight": d_weight, "disc_factor": torch.tensor(disc_factor), } if optimizer_idx == 1: logits_real = self.discriminator(inputs.contiguous().detach()) logits_fake = self.discriminator(reconstructions.contiguous().detach()) disc_factor = adopt_weight( self.disc_factor, global_step, threshold=self.discriminator_iter_start ) d_loss = disc_factor * self.disc_loss(logits_real, logits_fake) return { "d_loss": d_loss, "logits_real": logits_real.mean(), "logits_fake": logits_fake.mean(), }