import torch import torch.nn.functional as F from torch import nn from taming.modules.diffusionmodules.model import Encoder, Decoder from taming.modules.vqvae.quantize import VectorQuantizer2 as VectorQuantizer class VQModel(nn.Module): def __init__(self, ddconfig, lossconfig, n_embed, embed_dim, ckpt_path=None, ignore_keys=[], image_key="image", colorize_nlabels=None, monitor=None, remap=None, sane_index_shape=False, # tell vector quantizer to return indices as bhw ): super().__init__() self.n_embed = n_embed self.embed_dim = embed_dim self.image_key = image_key self.encoder = Encoder(**ddconfig) self.decoder = Decoder(**ddconfig) self.quantize = VectorQuantizer(n_embed, embed_dim, beta=0.25, remap=remap, sane_index_shape=sane_index_shape) if ckpt_path is not None: self.init_from_ckpt(ckpt_path, ignore_keys=ignore_keys) self.image_key = image_key if colorize_nlabels is not None: assert type(colorize_nlabels) == int self.register_buffer("colorize", torch.randn(3, colorize_nlabels, 1, 1)) if monitor is not None: self.monitor = monitor self.eval() self.requires_grad_(False) def init_from_ckpt(self, path, ignore_keys=list()): sd = torch.load(path, map_location="cpu") if "state_dict" in sd.keys(): sd = sd["state_dict"] keys = list(sd.keys()) for k in keys: for ik in ignore_keys: if k.startswith(ik): print("Deleting key {} from state_dict.".format(k)) del sd[k] print("Strict load") self.load_state_dict(sd, strict=True) print(f"Restored from {path}") def encode(self, x): h = self.encoder(x) quant, emb_loss, info = self.quantize(h) return quant, emb_loss, info def decode(self, quant): dec = self.decoder(quant) return dec def decode_code(self, code_b): quant_b = self.quantize.get_codebook_entry(code_b, [*code_b.shape, self.embed_dim]) dec = self.decode(quant_b) return dec def forward(self, input): quant, diff, info = self.encode(input) return quant, diff, info def get_input(self, batch, k): x = batch[k] if len(x.shape) == 3: x = x[..., None] x = x.permute(0, 3, 1, 2).to(memory_format=torch.contiguous_format) return x.float() def get_last_layer(self): return self.decoder.conv_out.weight def log_images(self, batch, **kwargs): log = dict() x = self.get_input(batch, self.image_key) x = x.to(self.device) xrec, _ = self(x) if x.shape[1] > 3: # colorize with random projection assert xrec.shape[1] > 3 x = self.to_rgb(x) xrec = self.to_rgb(xrec) log["inputs"] = x log["reconstructions"] = xrec return log def to_rgb(self, x): assert self.image_key == "segmentation" if not hasattr(self, "colorize"): self.register_buffer("colorize", torch.randn(3, x.shape[1], 1, 1).to(x)) x = F.conv2d(x, weight=self.colorize) x = 2. * (x - x.min()) / (x.max() - x.min()) - 1. return x def get_model(config_file='vq-f16-jax.yaml'): from omegaconf import OmegaConf config = OmegaConf.load(f'configs/vae_configs/{config_file}').model return VQModel(ddconfig=config.params.ddconfig, lossconfig=config.params.lossconfig, n_embed=config.params.n_embed, embed_dim=config.params.embed_dim, ckpt_path='assets/vqgan_jax_strongaug.ckpt')