# -*- coding: utf-8 -*- import math import torch import torch.nn as nn from typing import Optional import warnings from michelangelo.models.modules.checkpoint import checkpoint def _trunc_normal_(tensor, mean, std, a, b): # Cut & paste from PyTorch official master until it's in a few official releases - RW # Method based on https://people.sc.fsu.edu/~jburkardt/presentations/truncated_normal.pdf def norm_cdf(x): # Computes standard normal cumulative distribution function return (1. + math.erf(x / math.sqrt(2.))) / 2. if (mean < a - 2 * std) or (mean > b + 2 * std): warnings.warn("mean is more than 2 std from [a, b] in nn.init.trunc_normal_. " "The distribution of values may be incorrect.", stacklevel=2) # Values are generated by using a truncated uniform distribution and # then using the inverse CDF for the normal distribution. # Get upper and lower cdf values l = norm_cdf((a - mean) / std) u = norm_cdf((b - mean) / std) # Uniformly fill tensor with values from [l, u], then translate to # [2l-1, 2u-1]. tensor.uniform_(2 * l - 1, 2 * u - 1) # Use inverse cdf transform for normal distribution to get truncated # standard normal tensor.erfinv_() # Transform to proper mean, std tensor.mul_(std * math.sqrt(2.)) tensor.add_(mean) # Clamp to ensure it's in the proper range tensor.clamp_(min=a, max=b) return tensor def trunc_normal_(tensor, mean=0., std=1., a=-2., b=2.): # type: (Tensor | nn.Parameter, float, float, float, float) -> Tensor r"""Fills the input Tensor with values drawn from a truncated normal distribution. The values are effectively drawn from the normal distribution :math:`\mathcal{N}(\text{mean}, \text{std}^2)` with values outside :math:`[a, b]` redrawn until they are within the bounds. The method used for generating the random values works best when :math:`a \leq \text{mean} \leq b`. NOTE: this impl is similar to the PyTorch trunc_normal_, the bounds [a, b] are applied while sampling the normal with mean/std applied, therefore a, b args should be adjusted to match the range of mean, std args. Args: tensor: an n-dimensional `torch.Tensor` mean: the mean of the normal distribution std: the standard deviation of the normal distribution a: the minimum cutoff value b: the maximum cutoff value Examples: >>> w = torch.empty(3, 5) >>> nn.init.trunc_normal_(w) """ with torch.no_grad(): return _trunc_normal_(tensor, mean, std, a, b) def init_weights(m): if isinstance(m, nn.Linear): trunc_normal_(m.weight, std=.02) if isinstance(m, nn.Linear) and m.bias is not None: nn.init.constant_(m.bias, 0) elif isinstance(m, nn.LayerNorm): nn.init.constant_(m.bias, 0) nn.init.constant_(m.weight, 1.0) class MultiheadAttention(nn.Module): def __init__( self, *, device: torch.device, dtype: torch.dtype, n_ctx: int, width: int, heads: int, qkv_bias: bool ): super().__init__() self.n_ctx = n_ctx self.width = width self.heads = heads self.c_qkv = nn.Linear(width, width * 3, bias=qkv_bias, device=device, dtype=dtype) self.c_proj = nn.Linear(width, width, device=device, dtype=dtype) self.attention = QKVMultiheadAttention(device=device, dtype=dtype, heads=heads, n_ctx=n_ctx) def forward(self, x): x = self.c_qkv(x) x = checkpoint(self.attention, (x,), (), True) x = self.c_proj(x) return x class QKVMultiheadAttention(nn.Module): def __init__(self, *, device: torch.device, dtype: torch.dtype, heads: int, n_ctx: int): super().__init__() self.device = device self.dtype = dtype self.heads = heads self.n_ctx = n_ctx def forward(self, qkv): bs, n_ctx, width = qkv.shape attn_ch = width // self.heads // 3 scale = 1 / math.sqrt(attn_ch) qkv = qkv.view(bs, n_ctx, self.heads, -1) q, k, v = torch.split(qkv, attn_ch, dim=-1) weight = torch.einsum("bthc,bshc->bhts", q, k) * scale wdtype = weight.dtype weight = torch.softmax(weight.float(), dim=-1).type(wdtype) return torch.einsum("bhts,bshc->bthc", weight, v).reshape(bs, n_ctx, -1) class ResidualAttentionBlock(nn.Module): def __init__( self, *, device: torch.device, dtype: torch.dtype, n_ctx: int, width: int, heads: int, qkv_bias: bool = True, use_checkpoint: bool = False ): super().__init__() self.use_checkpoint = use_checkpoint self.attn = MultiheadAttention( device=device, dtype=dtype, n_ctx=n_ctx, width=width, heads=heads, qkv_bias=qkv_bias ) self.ln_1 = nn.LayerNorm(width, device=device, dtype=dtype) self.mlp = MLP(device=device, dtype=dtype, width=width) self.ln_2 = nn.LayerNorm(width, device=device, dtype=dtype) def _forward(self, x: torch.Tensor): x = x + self.attn(self.ln_1(x)) x = x + self.mlp(self.ln_2(x)) return x def forward(self, x: torch.Tensor): return checkpoint(self._forward, (x,), self.parameters(), self.use_checkpoint) class MultiheadCrossAttention(nn.Module): def __init__( self, *, device: torch.device, dtype: torch.dtype, width: int, heads: int, qkv_bias: bool = True, n_data: Optional[int] = None, data_width: Optional[int] = None, ): super().__init__() self.n_data = n_data self.width = width self.heads = heads self.data_width = width if data_width is None else data_width self.c_q = nn.Linear(width, width, bias=qkv_bias, device=device, dtype=dtype) self.c_kv = nn.Linear(self.data_width, width * 2, bias=qkv_bias, device=device, dtype=dtype) self.c_proj = nn.Linear(width, width, device=device, dtype=dtype) self.attention = QKVMultiheadCrossAttention( device=device, dtype=dtype, heads=heads, n_data=n_data ) def forward(self, x, data): x = self.c_q(x) data = self.c_kv(data) x = checkpoint(self.attention, (x, data), (), True) x = self.c_proj(x) return x class QKVMultiheadCrossAttention(nn.Module): def __init__(self, *, device: torch.device, dtype: torch.dtype, heads: int, n_data: Optional[int] = None): super().__init__() self.device = device self.dtype = dtype self.heads = heads self.n_data = n_data def forward(self, q, kv): _, n_ctx, _ = q.shape bs, n_data, width = kv.shape attn_ch = width // self.heads // 2 scale = 1 / math.sqrt(attn_ch) q = q.view(bs, n_ctx, self.heads, -1) kv = kv.view(bs, n_data, self.heads, -1) k, v = torch.split(kv, attn_ch, dim=-1) weight = torch.einsum("bthc,bshc->bhts", q, k) * scale wdtype = weight.dtype weight = torch.softmax(weight.float(), dim=-1).type(wdtype) return torch.einsum("bhts,bshc->bthc", weight, v).reshape(bs, n_ctx, -1) class ResidualCrossAttentionBlock(nn.Module): def __init__( self, *, device: Optional[torch.device], dtype: Optional[torch.dtype], n_data: Optional[int] = None, width: int, heads: int, data_width: Optional[int] = None, qkv_bias: bool = True ): super().__init__() if data_width is None: data_width = width self.attn = MultiheadCrossAttention( device=device, dtype=dtype, n_data=n_data, width=width, heads=heads, data_width=data_width, qkv_bias=qkv_bias ) self.ln_1 = nn.LayerNorm(width, device=device, dtype=dtype) self.ln_2 = nn.LayerNorm(data_width, device=device, dtype=dtype) self.mlp = MLP(device=device, dtype=dtype, width=width) self.ln_3 = nn.LayerNorm(width, device=device, dtype=dtype) def forward(self, x: torch.Tensor, data: torch.Tensor): x = x + self.attn(self.ln_1(x), self.ln_2(data)) x = x + self.mlp(self.ln_3(x)) return x class MLP(nn.Module): def __init__(self, *, device: Optional[torch.device], dtype: Optional[torch.dtype], width: int): super().__init__() self.width = width self.c_fc = nn.Linear(width, width * 4, device=device, dtype=dtype) self.c_proj = nn.Linear(width * 4, width, device=device, dtype=dtype) self.gelu = nn.GELU() def forward(self, x): return self.c_proj(self.gelu(self.c_fc(x))) class Transformer(nn.Module): def __init__( self, *, device: Optional[torch.device], dtype: Optional[torch.dtype], n_ctx: int, width: int, layers: int, heads: int, qkv_bias: bool = True, use_checkpoint: bool = False ): super().__init__() self.n_ctx = n_ctx self.width = width self.layers = layers self.resblocks = nn.ModuleList( [ ResidualAttentionBlock( device=device, dtype=dtype, n_ctx=n_ctx, width=width, heads=heads, qkv_bias=qkv_bias, use_checkpoint=use_checkpoint ) for _ in range(layers) ] ) self.apply(init_weights) def forward(self, x: torch.Tensor): for block in self.resblocks: x = block(x) return x