# -*- coding: utf-8 -*- # Copyright (c) Alibaba, Inc. and its affiliates. import math import warnings import torch import torch.nn as nn from .pos_embed import rope_apply_multires as rope_apply try: from flash_attn import (flash_attn_varlen_func) FLASHATTN_IS_AVAILABLE = True except ImportError as e: FLASHATTN_IS_AVAILABLE = False flash_attn_varlen_func = None warnings.warn(f'{e}') __all__ = [ "drop_path", "modulate", "PatchEmbed", "DropPath", "RMSNorm", "Mlp", "TimestepEmbedder", "DiTEditBlock", "MultiHeadAttentionDiTEdit", "T2IFinalLayer", ] def drop_path(x, drop_prob: float = 0., training: bool = False): """Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks). This is the same as the DropConnect impl I created for EfficientNet, etc networks, however, the original name is misleading as 'Drop Connect' is a different form of dropout in a separate paper... See discussion: https://github.com/tensorflow/tpu/issues/494#issuecomment-532968956 ... I've opted for changing the layer and argument names to 'drop path' rather than mix DropConnect as a layer name and use 'survival rate' as the argument. """ if drop_prob == 0. or not training: return x keep_prob = 1 - drop_prob shape = (x.shape[0], ) + (1, ) * ( x.ndim - 1) # work with diff dim tensors, not just 2D ConvNets random_tensor = keep_prob + torch.rand( shape, dtype=x.dtype, device=x.device) random_tensor.floor_() # binarize output = x.div(keep_prob) * random_tensor return output def modulate(x, shift, scale, unsqueeze=False): if unsqueeze: return x * (1 + scale.unsqueeze(1)) + shift.unsqueeze(1) else: return x * (1 + scale) + shift class PatchEmbed(nn.Module): """ 2D Image to Patch Embedding """ def __init__( self, patch_size=16, in_chans=3, embed_dim=768, norm_layer=None, flatten=True, bias=True, ): super().__init__() self.flatten = flatten self.proj = nn.Conv2d(in_chans, embed_dim, kernel_size=patch_size, stride=patch_size, bias=bias) self.norm = norm_layer(embed_dim) if norm_layer else nn.Identity() def forward(self, x): x = self.proj(x) if self.flatten: x = x.flatten(2).transpose(1, 2) # BCHW -> BNC x = self.norm(x) return x class DropPath(nn.Module): """Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks). """ def __init__(self, drop_prob=None): super(DropPath, self).__init__() self.drop_prob = drop_prob def forward(self, x): return drop_path(x, self.drop_prob, self.training) class RMSNorm(nn.Module): def __init__(self, dim, eps=1e-6): super().__init__() self.dim = dim self.eps = eps self.weight = nn.Parameter(torch.ones(dim)) def forward(self, x): return self._norm(x.float()).type_as(x) * self.weight def _norm(self, x): return x * torch.rsqrt(x.pow(2).mean(dim=-1, keepdim=True) + self.eps) class Mlp(nn.Module): """ MLP as used in Vision Transformer, MLP-Mixer and related networks """ def __init__(self, in_features, hidden_features=None, out_features=None, act_layer=nn.GELU, drop=0.): super().__init__() out_features = out_features or in_features hidden_features = hidden_features or in_features self.fc1 = nn.Linear(in_features, hidden_features) self.act = act_layer() self.fc2 = nn.Linear(hidden_features, out_features) self.drop = nn.Dropout(drop) def forward(self, x): x = self.fc1(x) x = self.act(x) x = self.drop(x) x = self.fc2(x) x = self.drop(x) return x class TimestepEmbedder(nn.Module): """ Embeds scalar timesteps into vector representations. """ def __init__(self, hidden_size, frequency_embedding_size=256): super().__init__() self.mlp = nn.Sequential( nn.Linear(frequency_embedding_size, hidden_size, bias=True), nn.SiLU(), nn.Linear(hidden_size, hidden_size, bias=True), ) self.frequency_embedding_size = frequency_embedding_size @staticmethod def timestep_embedding(t, dim, max_period=10000): """ Create sinusoidal timestep embeddings. :param t: a 1-D Tensor of N indices, one per batch element. These may be fractional. :param dim: the dimension of the output. :param max_period: controls the minimum frequency of the embeddings. :return: an (N, D) Tensor of positional embeddings. """ # https://github.com/openai/glide-text2im/blob/main/glide_text2im/nn.py half = dim // 2 freqs = torch.exp( -math.log(max_period) * torch.arange(start=0, end=half, dtype=torch.float32) / half).to(device=t.device) args = t[:, None].float() * freqs[None] embedding = torch.cat([torch.cos(args), torch.sin(args)], dim=-1) if dim % 2: embedding = torch.cat( [embedding, torch.zeros_like(embedding[:, :1])], dim=-1) return embedding def forward(self, t): t_freq = self.timestep_embedding(t, self.frequency_embedding_size) t_emb = self.mlp(t_freq) return t_emb class DiTACEBlock(nn.Module): def __init__(self, hidden_size, num_heads, mlp_ratio=4.0, drop_path=0., window_size=0, backend=None, use_condition=True, qk_norm=False, **block_kwargs): super().__init__() self.hidden_size = hidden_size self.use_condition = use_condition self.norm1 = nn.LayerNorm(hidden_size, elementwise_affine=False, eps=1e-6) self.attn = MultiHeadAttention(hidden_size, num_heads=num_heads, qkv_bias=True, backend=backend, qk_norm=qk_norm, **block_kwargs) if self.use_condition: self.cross_attn = MultiHeadAttention( hidden_size, context_dim=hidden_size, num_heads=num_heads, qkv_bias=True, backend=backend, qk_norm=qk_norm, **block_kwargs) self.norm2 = nn.LayerNorm(hidden_size, elementwise_affine=False, eps=1e-6) # to be compatible with lower version pytorch approx_gelu = lambda: nn.GELU(approximate='tanh') self.mlp = Mlp(in_features=hidden_size, hidden_features=int(hidden_size * mlp_ratio), act_layer=approx_gelu, drop=0) self.drop_path = DropPath( drop_path) if drop_path > 0. else nn.Identity() self.window_size = window_size self.scale_shift_table = nn.Parameter( torch.randn(6, hidden_size) / hidden_size**0.5) def forward(self, x, y, t, **kwargs): B = x.size(0) shift_msa, scale_msa, gate_msa, shift_mlp, scale_mlp, gate_mlp = ( self.scale_shift_table[None] + t.reshape(B, 6, -1)).chunk(6, dim=1) shift_msa, scale_msa, gate_msa, shift_mlp, scale_mlp, gate_mlp = ( shift_msa.squeeze(1), scale_msa.squeeze(1), gate_msa.squeeze(1), shift_mlp.squeeze(1), scale_mlp.squeeze(1), gate_mlp.squeeze(1)) x = x + self.drop_path(gate_msa * self.attn( modulate(self.norm1(x), shift_msa, scale_msa, unsqueeze=False), ** kwargs)) if self.use_condition: x = x + self.cross_attn(x, context=y, **kwargs) x = x + self.drop_path(gate_mlp * self.mlp( modulate(self.norm2(x), shift_mlp, scale_mlp, unsqueeze=False))) return x class MultiHeadAttention(nn.Module): def __init__(self, dim, context_dim=None, num_heads=None, head_dim=None, attn_drop=0.0, qkv_bias=False, dropout=0.0, backend=None, qk_norm=False, eps=1e-6, **block_kwargs): super().__init__() # consider head_dim first, then num_heads num_heads = dim // head_dim if head_dim else num_heads head_dim = dim // num_heads assert num_heads * head_dim == dim context_dim = context_dim or dim self.dim = dim self.context_dim = context_dim self.num_heads = num_heads self.head_dim = head_dim self.scale = math.pow(head_dim, -0.25) # layers self.q = nn.Linear(dim, dim, bias=qkv_bias) self.k = nn.Linear(context_dim, dim, bias=qkv_bias) self.v = nn.Linear(context_dim, dim, bias=qkv_bias) self.o = nn.Linear(dim, dim) self.norm_q = RMSNorm(dim, eps=eps) if qk_norm else nn.Identity() self.norm_k = RMSNorm(dim, eps=eps) if qk_norm else nn.Identity() self.dropout = nn.Dropout(dropout) self.attention_op = None self.attn_drop = nn.Dropout(attn_drop) self.backend = backend assert self.backend in ('flash_attn', 'xformer_attn', 'pytorch_attn', None) if FLASHATTN_IS_AVAILABLE and self.backend in ('flash_attn', None): self.backend = 'flash_attn' self.softmax_scale = block_kwargs.get('softmax_scale', None) self.causal = block_kwargs.get('causal', False) self.window_size = block_kwargs.get('window_size', (-1, -1)) self.deterministic = block_kwargs.get('deterministic', False) else: raise NotImplementedError def flash_attn(self, x, context=None, **kwargs): ''' The implementation will be very slow when mask is not None, because we need rearange the x/context features according to mask. Args: x: context: mask: **kwargs: Returns: x ''' dtype = kwargs.get('dtype', torch.float16) def half(x): return x if x.dtype in [torch.float16, torch.bfloat16 ] else x.to(dtype) x_shapes = kwargs['x_shapes'] freqs = kwargs['freqs'] self_x_len = kwargs['self_x_len'] cross_x_len = kwargs['cross_x_len'] txt_lens = kwargs['txt_lens'] n, d = self.num_heads, self.head_dim if context is None: # self-attn q = self.norm_q(self.q(x)).view(-1, n, d) k = self.norm_q(self.k(x)).view(-1, n, d) v = self.v(x).view(-1, n, d) q = rope_apply(q, self_x_len, x_shapes, freqs, pad=False) k = rope_apply(k, self_x_len, x_shapes, freqs, pad=False) q_lens = k_lens = self_x_len else: # cross-attn q = self.norm_q(self.q(x)).view(-1, n, d) k = self.norm_q(self.k(context)).view(-1, n, d) v = self.v(context).view(-1, n, d) q_lens = cross_x_len k_lens = txt_lens cu_seqlens_q = torch.cat([q_lens.new_zeros([1]), q_lens]).cumsum(0, dtype=torch.int32) cu_seqlens_k = torch.cat([k_lens.new_zeros([1]), k_lens]).cumsum(0, dtype=torch.int32) max_seqlen_q = q_lens.max() max_seqlen_k = k_lens.max() out_dtype = q.dtype q, k, v = half(q), half(k), half(v) x = flash_attn_varlen_func(q, k, v, cu_seqlens_q=cu_seqlens_q, cu_seqlens_k=cu_seqlens_k, max_seqlen_q=max_seqlen_q, max_seqlen_k=max_seqlen_k, dropout_p=self.attn_drop.p, softmax_scale=self.softmax_scale, causal=self.causal, window_size=self.window_size, deterministic=self.deterministic) x = x.type(out_dtype) x = x.reshape(-1, n * d) x = self.o(x) x = self.dropout(x) return x def forward(self, x, context=None, **kwargs): x = getattr(self, self.backend)(x, context=context, **kwargs) return x class T2IFinalLayer(nn.Module): """ The final layer of PixArt. """ def __init__(self, hidden_size, patch_size, out_channels): super().__init__() self.norm_final = nn.LayerNorm(hidden_size, elementwise_affine=False, eps=1e-6) self.linear = nn.Linear(hidden_size, patch_size * patch_size * out_channels, bias=True) self.scale_shift_table = nn.Parameter( torch.randn(2, hidden_size) / hidden_size**0.5) self.out_channels = out_channels def forward(self, x, t): shift, scale = (self.scale_shift_table[None] + t[:, None]).chunk(2, dim=1) shift, scale = shift.squeeze(1), scale.squeeze(1) x = modulate(self.norm_final(x), shift, scale) x = self.linear(x) return x