# from einops._torch_specific import allow_ops_in_compiled_graph # allow_ops_in_compiled_graph() import einops import torch import torch as th import torch.nn as nn from einops import rearrange, repeat from sgm.modules.diffusionmodules.util import ( avg_pool_nd, checkpoint, conv_nd, linear, normalization, timestep_embedding, zero_module, ) from sgm.modules.diffusionmodules.openaimodel import Downsample, Upsample, UNetModel, Timestep, \ TimestepEmbedSequential, ResBlock, AttentionBlock, TimestepBlock from sgm.modules.attention import SpatialTransformer, MemoryEfficientCrossAttention, CrossAttention from sgm.util import default, log_txt_as_img, exists, instantiate_from_config import re import torch from functools import partial try: import xformers import xformers.ops XFORMERS_IS_AVAILBLE = True except: XFORMERS_IS_AVAILBLE = False # dummy replace def convert_module_to_f16(x): pass def convert_module_to_f32(x): pass class ZeroConv(nn.Module): def __init__(self, label_nc, norm_nc, mask=False): super().__init__() self.zero_conv = zero_module(conv_nd(2, label_nc, norm_nc, 1, 1, 0)) self.mask = mask def forward(self, c, h, h_ori=None): # with torch.cuda.amp.autocast(enabled=False, dtype=torch.float32): if not self.mask: h = h + self.zero_conv(c) else: h = h + self.zero_conv(c) * torch.zeros_like(h) if h_ori is not None: h = th.cat([h_ori, h], dim=1) return h class ZeroSFT(nn.Module): def __init__(self, label_nc, norm_nc, concat_channels=0, norm=True, mask=False): super().__init__() # param_free_norm_type = str(parsed.group(1)) ks = 3 pw = ks // 2 self.norm = norm if self.norm: self.param_free_norm = normalization(norm_nc + concat_channels) else: self.param_free_norm = nn.Identity() nhidden = 128 self.mlp_shared = nn.Sequential( nn.Conv2d(label_nc, nhidden, kernel_size=ks, padding=pw), nn.SiLU() ) self.zero_mul = zero_module(nn.Conv2d(nhidden, norm_nc + concat_channels, kernel_size=ks, padding=pw)) self.zero_add = zero_module(nn.Conv2d(nhidden, norm_nc + concat_channels, kernel_size=ks, padding=pw)) # self.zero_mul = nn.Conv2d(nhidden, norm_nc + concat_channels, kernel_size=ks, padding=pw) # self.zero_add = nn.Conv2d(nhidden, norm_nc + concat_channels, kernel_size=ks, padding=pw) self.zero_conv = zero_module(conv_nd(2, label_nc, norm_nc, 1, 1, 0)) self.pre_concat = bool(concat_channels != 0) self.mask = mask def forward(self, c, h, h_ori=None, control_scale=1): assert self.mask is False if h_ori is not None and self.pre_concat: h_raw = th.cat([h_ori, h], dim=1) else: h_raw = h if self.mask: h = h + self.zero_conv(c) * torch.zeros_like(h) else: h = h + self.zero_conv(c) if h_ori is not None and self.pre_concat: h = th.cat([h_ori, h], dim=1) actv = self.mlp_shared(c) gamma = self.zero_mul(actv) beta = self.zero_add(actv) if self.mask: gamma = gamma * torch.zeros_like(gamma) beta = beta * torch.zeros_like(beta) h = self.param_free_norm(h) * (gamma + 1) + beta if h_ori is not None and not self.pre_concat: h = th.cat([h_ori, h], dim=1) return h * control_scale + h_raw * (1 - control_scale) class ZeroCrossAttn(nn.Module): ATTENTION_MODES = { "softmax": CrossAttention, # vanilla attention "softmax-xformers": MemoryEfficientCrossAttention } def __init__(self, context_dim, query_dim, zero_out=True, mask=False): super().__init__() attn_mode = "softmax-xformers" if XFORMERS_IS_AVAILBLE else "softmax" assert attn_mode in self.ATTENTION_MODES attn_cls = self.ATTENTION_MODES[attn_mode] self.attn = attn_cls(query_dim=query_dim, context_dim=context_dim, heads=query_dim//64, dim_head=64) self.norm1 = normalization(query_dim) self.norm2 = normalization(context_dim) self.mask = mask # if zero_out: # # for p in self.attn.to_out.parameters(): # # p.detach().zero_() # self.attn.to_out = zero_module(self.attn.to_out) def forward(self, context, x, control_scale=1): assert self.mask is False x_in = x x = self.norm1(x) context = self.norm2(context) b, c, h, w = x.shape x = rearrange(x, 'b c h w -> b (h w) c').contiguous() context = rearrange(context, 'b c h w -> b (h w) c').contiguous() x = self.attn(x, context) x = rearrange(x, 'b (h w) c -> b c h w', h=h, w=w).contiguous() if self.mask: x = x * torch.zeros_like(x) x = x_in + x * control_scale return x class GLVControl(nn.Module): def __init__( self, in_channels, model_channels, out_channels, num_res_blocks, attention_resolutions, dropout=0, channel_mult=(1, 2, 4, 8), conv_resample=True, dims=2, num_classes=None, use_checkpoint=False, use_fp16=False, num_heads=-1, num_head_channels=-1, num_heads_upsample=-1, use_scale_shift_norm=False, resblock_updown=False, use_new_attention_order=False, use_spatial_transformer=False, # custom transformer support transformer_depth=1, # custom transformer support context_dim=None, # custom transformer support n_embed=None, # custom support for prediction of discrete ids into codebook of first stage vq model legacy=True, disable_self_attentions=None, num_attention_blocks=None, disable_middle_self_attn=False, use_linear_in_transformer=False, spatial_transformer_attn_type="softmax", adm_in_channels=None, use_fairscale_checkpoint=False, offload_to_cpu=False, transformer_depth_middle=None, input_upscale=1, ): super().__init__() from omegaconf.listconfig import ListConfig if use_spatial_transformer: assert ( context_dim is not None ), "Fool!! You forgot to include the dimension of your cross-attention conditioning..." if context_dim is not None: assert ( use_spatial_transformer ), "Fool!! You forgot to use the spatial transformer for your cross-attention conditioning..." if type(context_dim) == ListConfig: context_dim = list(context_dim) if num_heads_upsample == -1: num_heads_upsample = num_heads if num_heads == -1: assert ( num_head_channels != -1 ), "Either num_heads or num_head_channels has to be set" if num_head_channels == -1: assert ( num_heads != -1 ), "Either num_heads or num_head_channels has to be set" self.in_channels = in_channels self.model_channels = model_channels self.out_channels = out_channels if isinstance(transformer_depth, int): transformer_depth = len(channel_mult) * [transformer_depth] elif isinstance(transformer_depth, ListConfig): transformer_depth = list(transformer_depth) transformer_depth_middle = default( transformer_depth_middle, transformer_depth[-1] ) if isinstance(num_res_blocks, int): self.num_res_blocks = len(channel_mult) * [num_res_blocks] else: if len(num_res_blocks) != len(channel_mult): raise ValueError( "provide num_res_blocks either as an int (globally constant) or " "as a list/tuple (per-level) with the same length as channel_mult" ) self.num_res_blocks = num_res_blocks # self.num_res_blocks = num_res_blocks if disable_self_attentions is not None: # should be a list of booleans, indicating whether to disable self-attention in TransformerBlocks or not assert len(disable_self_attentions) == len(channel_mult) if num_attention_blocks is not None: assert len(num_attention_blocks) == len(self.num_res_blocks) assert all( map( lambda i: self.num_res_blocks[i] >= num_attention_blocks[i], range(len(num_attention_blocks)), ) ) print( f"Constructor of UNetModel received num_attention_blocks={num_attention_blocks}. " f"This option has LESS priority than attention_resolutions {attention_resolutions}, " f"i.e., in cases where num_attention_blocks[i] > 0 but 2**i not in attention_resolutions, " f"attention will still not be set." ) # todo: convert to warning self.attention_resolutions = attention_resolutions self.dropout = dropout self.channel_mult = channel_mult self.conv_resample = conv_resample self.num_classes = num_classes self.use_checkpoint = use_checkpoint if use_fp16: print("WARNING: use_fp16 was dropped and has no effect anymore.") # self.dtype = th.float16 if use_fp16 else th.float32 self.num_heads = num_heads self.num_head_channels = num_head_channels self.num_heads_upsample = num_heads_upsample self.predict_codebook_ids = n_embed is not None assert use_fairscale_checkpoint != use_checkpoint or not ( use_checkpoint or use_fairscale_checkpoint ) self.use_fairscale_checkpoint = False checkpoint_wrapper_fn = ( partial(checkpoint_wrapper, offload_to_cpu=offload_to_cpu) if self.use_fairscale_checkpoint else lambda x: x ) time_embed_dim = model_channels * 4 self.time_embed = checkpoint_wrapper_fn( nn.Sequential( linear(model_channels, time_embed_dim), nn.SiLU(), linear(time_embed_dim, time_embed_dim), ) ) if self.num_classes is not None: if isinstance(self.num_classes, int): self.label_emb = nn.Embedding(num_classes, time_embed_dim) elif self.num_classes == "continuous": print("setting up linear c_adm embedding layer") self.label_emb = nn.Linear(1, time_embed_dim) elif self.num_classes == "timestep": self.label_emb = checkpoint_wrapper_fn( nn.Sequential( Timestep(model_channels), nn.Sequential( linear(model_channels, time_embed_dim), nn.SiLU(), linear(time_embed_dim, time_embed_dim), ), ) ) elif self.num_classes == "sequential": assert adm_in_channels is not None self.label_emb = nn.Sequential( nn.Sequential( linear(adm_in_channels, time_embed_dim), nn.SiLU(), linear(time_embed_dim, time_embed_dim), ) ) else: raise ValueError() self.input_blocks = nn.ModuleList( [ TimestepEmbedSequential( conv_nd(dims, in_channels, model_channels, 3, padding=1) ) ] ) self._feature_size = model_channels input_block_chans = [model_channels] ch = model_channels ds = 1 for level, mult in enumerate(channel_mult): for nr in range(self.num_res_blocks[level]): layers = [ checkpoint_wrapper_fn( ResBlock( ch, time_embed_dim, dropout, out_channels=mult * model_channels, dims=dims, use_checkpoint=use_checkpoint, use_scale_shift_norm=use_scale_shift_norm, ) ) ] ch = mult * model_channels if ds in attention_resolutions: if num_head_channels == -1: dim_head = ch // num_heads else: num_heads = ch // num_head_channels dim_head = num_head_channels if legacy: # num_heads = 1 dim_head = ( ch // num_heads if use_spatial_transformer else num_head_channels ) if exists(disable_self_attentions): disabled_sa = disable_self_attentions[level] else: disabled_sa = False if ( not exists(num_attention_blocks) or nr < num_attention_blocks[level] ): layers.append( checkpoint_wrapper_fn( AttentionBlock( ch, use_checkpoint=use_checkpoint, num_heads=num_heads, num_head_channels=dim_head, use_new_attention_order=use_new_attention_order, ) ) if not use_spatial_transformer else checkpoint_wrapper_fn( SpatialTransformer( ch, num_heads, dim_head, depth=transformer_depth[level], context_dim=context_dim, disable_self_attn=disabled_sa, use_linear=use_linear_in_transformer, attn_type=spatial_transformer_attn_type, use_checkpoint=use_checkpoint, ) ) ) self.input_blocks.append(TimestepEmbedSequential(*layers)) self._feature_size += ch input_block_chans.append(ch) if level != len(channel_mult) - 1: out_ch = ch self.input_blocks.append( TimestepEmbedSequential( checkpoint_wrapper_fn( ResBlock( ch, time_embed_dim, dropout, out_channels=out_ch, dims=dims, use_checkpoint=use_checkpoint, use_scale_shift_norm=use_scale_shift_norm, down=True, ) ) if resblock_updown else Downsample( ch, conv_resample, dims=dims, out_channels=out_ch ) ) ) ch = out_ch input_block_chans.append(ch) ds *= 2 self._feature_size += ch if num_head_channels == -1: dim_head = ch // num_heads else: num_heads = ch // num_head_channels dim_head = num_head_channels if legacy: # num_heads = 1 dim_head = ch // num_heads if use_spatial_transformer else num_head_channels self.middle_block = TimestepEmbedSequential( checkpoint_wrapper_fn( ResBlock( ch, time_embed_dim, dropout, dims=dims, use_checkpoint=use_checkpoint, use_scale_shift_norm=use_scale_shift_norm, ) ), checkpoint_wrapper_fn( AttentionBlock( ch, use_checkpoint=use_checkpoint, num_heads=num_heads, num_head_channels=dim_head, use_new_attention_order=use_new_attention_order, ) ) if not use_spatial_transformer else checkpoint_wrapper_fn( SpatialTransformer( # always uses a self-attn ch, num_heads, dim_head, depth=transformer_depth_middle, context_dim=context_dim, disable_self_attn=disable_middle_self_attn, use_linear=use_linear_in_transformer, attn_type=spatial_transformer_attn_type, use_checkpoint=use_checkpoint, ) ), checkpoint_wrapper_fn( ResBlock( ch, time_embed_dim, dropout, dims=dims, use_checkpoint=use_checkpoint, use_scale_shift_norm=use_scale_shift_norm, ) ), ) self.input_upscale = input_upscale self.input_hint_block = TimestepEmbedSequential( zero_module(conv_nd(dims, in_channels, model_channels, 3, padding=1)) ) def convert_to_fp16(self): """ Convert the torso of the model to float16. """ self.input_blocks.apply(convert_module_to_f16) self.middle_block.apply(convert_module_to_f16) def convert_to_fp32(self): """ Convert the torso of the model to float32. """ self.input_blocks.apply(convert_module_to_f32) self.middle_block.apply(convert_module_to_f32) def forward(self, x, timesteps, xt, context=None, y=None, **kwargs): # with torch.cuda.amp.autocast(enabled=False, dtype=torch.float32): # x = x.to(torch.float32) # timesteps = timesteps.to(torch.float32) # xt = xt.to(torch.float32) # context = context.to(torch.float32) # y = y.to(torch.float32) # print(x.dtype) xt, context, y = xt.to(x.dtype), context.to(x.dtype), y.to(x.dtype) if self.input_upscale != 1: x = nn.functional.interpolate(x, scale_factor=self.input_upscale, mode='bilinear', antialias=True) assert (y is not None) == ( self.num_classes is not None ), "must specify y if and only if the model is class-conditional" hs = [] t_emb = timestep_embedding(timesteps, self.model_channels, repeat_only=False).to(x.dtype) # import pdb # pdb.set_trace() emb = self.time_embed(t_emb) if self.num_classes is not None: assert y.shape[0] == xt.shape[0] emb = emb + self.label_emb(y) guided_hint = self.input_hint_block(x, emb, context) # h = x.type(self.dtype) h = xt for module in self.input_blocks: if guided_hint is not None: h = module(h, emb, context) h += guided_hint guided_hint = None else: h = module(h, emb, context) hs.append(h) # print(module) # print(h.shape) h = self.middle_block(h, emb, context) hs.append(h) return hs class LightGLVUNet(UNetModel): def __init__(self, mode='', project_type='ZeroSFT', project_channel_scale=1, *args, **kwargs): super().__init__(*args, **kwargs) if mode == 'XL-base': cond_output_channels = [320] * 4 + [640] * 3 + [1280] * 3 project_channels = [160] * 4 + [320] * 3 + [640] * 3 concat_channels = [320] * 2 + [640] * 3 + [1280] * 4 + [0] cross_attn_insert_idx = [6, 3] self.progressive_mask_nums = [0, 3, 7, 11] elif mode == 'XL-refine': cond_output_channels = [384] * 4 + [768] * 3 + [1536] * 6 project_channels = [192] * 4 + [384] * 3 + [768] * 6 concat_channels = [384] * 2 + [768] * 3 + [1536] * 7 + [0] cross_attn_insert_idx = [9, 6, 3] self.progressive_mask_nums = [0, 3, 6, 10, 14] else: raise NotImplementedError project_channels = [int(c * project_channel_scale) for c in project_channels] self.project_modules = nn.ModuleList() for i in range(len(cond_output_channels)): # if i == len(cond_output_channels) - 1: # _project_type = 'ZeroCrossAttn' # else: # _project_type = project_type _project_type = project_type if _project_type == 'ZeroSFT': self.project_modules.append(ZeroSFT(project_channels[i], cond_output_channels[i], concat_channels=concat_channels[i])) elif _project_type == 'ZeroCrossAttn': self.project_modules.append(ZeroCrossAttn(cond_output_channels[i], project_channels[i])) else: raise NotImplementedError for i in cross_attn_insert_idx: self.project_modules.insert(i, ZeroCrossAttn(cond_output_channels[i], concat_channels[i])) # print(self.project_modules[i]) def step_progressive_mask(self): if len(self.progressive_mask_nums) > 0: mask_num = self.progressive_mask_nums.pop() for i in range(len(self.project_modules)): if i < mask_num: self.project_modules[i].mask = True else: self.project_modules[i].mask = False return # print(f'step_progressive_mask, current masked layers: {mask_num}') else: return # print('step_progressive_mask, no more masked layers') # for i in range(len(self.project_modules)): # print(self.project_modules[i].mask) def forward(self, x, timesteps=None, context=None, y=None, control=None, control_scale=1, **kwargs): """ Apply the model to an input batch. :param x: an [N x C x ...] Tensor of inputs. :param timesteps: a 1-D batch of timesteps. :param context: conditioning plugged in via crossattn :param y: an [N] Tensor of labels, if class-conditional. :return: an [N x C x ...] Tensor of outputs. """ assert (y is not None) == ( self.num_classes is not None ), "must specify y if and only if the model is class-conditional" hs = [] _dtype = control[0].dtype x, context, y = x.to(_dtype), context.to(_dtype), y.to(_dtype) with torch.no_grad(): t_emb = timestep_embedding(timesteps, self.model_channels, repeat_only=False).to(x.dtype) emb = self.time_embed(t_emb) if self.num_classes is not None: assert y.shape[0] == x.shape[0] emb = emb + self.label_emb(y) # h = x.type(self.dtype) h = x for module in self.input_blocks: h = module(h, emb, context) hs.append(h) adapter_idx = len(self.project_modules) - 1 control_idx = len(control) - 1 h = self.middle_block(h, emb, context) h = self.project_modules[adapter_idx](control[control_idx], h, control_scale=control_scale) adapter_idx -= 1 control_idx -= 1 for i, module in enumerate(self.output_blocks): _h = hs.pop() h = self.project_modules[adapter_idx](control[control_idx], _h, h, control_scale=control_scale) adapter_idx -= 1 # h = th.cat([h, _h], dim=1) if len(module) == 3: assert isinstance(module[2], Upsample) for layer in module[:2]: if isinstance(layer, TimestepBlock): h = layer(h, emb) elif isinstance(layer, SpatialTransformer): h = layer(h, context) else: h = layer(h) # print('cross_attn_here') h = self.project_modules[adapter_idx](control[control_idx], h, control_scale=control_scale) adapter_idx -= 1 h = module[2](h) else: h = module(h, emb, context) control_idx -= 1 # print(module) # print(h.shape) h = h.type(x.dtype) if self.predict_codebook_ids: assert False, "not supported anymore. what the f*** are you doing?" else: return self.out(h) if __name__ == '__main__': from omegaconf import OmegaConf # refiner # opt = OmegaConf.load('../../options/train/debug_p2_xl.yaml') # # model = instantiate_from_config(opt.model.params.control_stage_config) # hint = model(torch.randn([1, 4, 64, 64]), torch.randn([1]), torch.randn([1, 4, 64, 64])) # hint = [h.cuda() for h in hint] # print(sum(map(lambda hint: hint.numel(), model.parameters()))) # # unet = instantiate_from_config(opt.model.params.network_config) # unet = unet.cuda() # # _output = unet(torch.randn([1, 4, 64, 64]).cuda(), torch.randn([1]).cuda(), torch.randn([1, 77, 1280]).cuda(), # torch.randn([1, 2560]).cuda(), hint) # print(sum(map(lambda _output: _output.numel(), unet.parameters()))) # base with torch.no_grad(): opt = OmegaConf.load('../../options/dev/SUPIR_tmp.yaml') model = instantiate_from_config(opt.model.params.control_stage_config) model = model.cuda() hint = model(torch.randn([1, 4, 64, 64]).cuda(), torch.randn([1]).cuda(), torch.randn([1, 4, 64, 64]).cuda(), torch.randn([1, 77, 2048]).cuda(), torch.randn([1, 2816]).cuda()) for h in hint: print(h.shape) # unet = instantiate_from_config(opt.model.params.network_config) unet = unet.cuda() _output = unet(torch.randn([1, 4, 64, 64]).cuda(), torch.randn([1]).cuda(), torch.randn([1, 77, 2048]).cuda(), torch.randn([1, 2816]).cuda(), hint) # model = instantiate_from_config(opt.model.params.control_stage_config) # model = model.cuda() # # hint = model(torch.randn([1, 4, 64, 64]), torch.randn([1]), torch.randn([1, 4, 64, 64])) # hint = model(torch.randn([1, 4, 64, 64]).cuda(), torch.randn([1]).cuda(), torch.randn([1, 4, 64, 64]).cuda(), torch.randn([1, 77, 1280]).cuda(), # torch.randn([1, 2560]).cuda()) # # hint = [h.cuda() for h in hint] # # for h in hint: # print(h.shape) # # unet = instantiate_from_config(opt.model.params.network_config) # unet = unet.cuda() # _output = unet(torch.randn([1, 4, 64, 64]).cuda(), torch.randn([1]).cuda(), torch.randn([1, 77, 1280]).cuda(), # torch.randn([1, 2560]).cuda(), hint)