#!/usr/bin/env python3 import collections.abc import math import warnings from itertools import repeat from typing import List, Optional, Tuple import torch import torch.nn as nn import torch.nn.functional as F try: from torch import _assert except ImportError: def _assert(condition: bool, message: str): assert condition, message def drop_block_2d( x, drop_prob: float = 0.1, block_size: int = 7, gamma_scale: float = 1.0, with_noise: bool = False, inplace: bool = False, batchwise: bool = False, ): """DropBlock. See https://arxiv.org/pdf/1810.12890.pdf DropBlock with an experimental gaussian noise option. This layer has been tested on a few training runs with success, but needs further validation and possibly optimization for lower runtime impact. """ b, c, h, w = x.shape total_size = w * h clipped_block_size = min(block_size, min(w, h)) # seed_drop_rate, the gamma parameter gamma = ( gamma_scale * drop_prob * total_size / clipped_block_size**2 / ((w - block_size + 1) * (h - block_size + 1)) ) # Forces the block to be inside the feature map. w_i, h_i = torch.meshgrid(torch.arange(w).to(x.device), torch.arange(h).to(x.device)) valid_block = ((w_i >= clipped_block_size // 2) & (w_i < w - (clipped_block_size - 1) // 2)) & ( (h_i >= clipped_block_size // 2) & (h_i < h - (clipped_block_size - 1) // 2) ) valid_block = torch.reshape(valid_block, (1, 1, h, w)).to(dtype=x.dtype) if batchwise: # one mask for whole batch, quite a bit faster uniform_noise = torch.rand((1, c, h, w), dtype=x.dtype, device=x.device) else: uniform_noise = torch.rand_like(x) block_mask = ((2 - gamma - valid_block + uniform_noise) >= 1).to(dtype=x.dtype) block_mask = -F.max_pool2d( -block_mask, kernel_size=clipped_block_size, stride=1, padding=clipped_block_size // 2 # block_size, ) if with_noise: normal_noise = torch.randn((1, c, h, w), dtype=x.dtype, device=x.device) if batchwise else torch.randn_like(x) if inplace: x.mul_(block_mask).add_(normal_noise * (1 - block_mask)) else: x = x * block_mask + normal_noise * (1 - block_mask) else: normalize_scale = (block_mask.numel() / block_mask.to(dtype=torch.float32).sum().add(1e-7)).to(x.dtype) if inplace: x.mul_(block_mask * normalize_scale) else: x = x * block_mask * normalize_scale return x def drop_block_fast_2d( x: torch.Tensor, drop_prob: float = 0.1, block_size: int = 7, gamma_scale: float = 1.0, with_noise: bool = False, inplace: bool = False, ): """DropBlock. See https://arxiv.org/pdf/1810.12890.pdf DropBlock with an experimental gaussian noise option. Simplied from above without concern for valid block mask at edges. """ b, c, h, w = x.shape total_size = w * h clipped_block_size = min(block_size, min(w, h)) gamma = ( gamma_scale * drop_prob * total_size / clipped_block_size**2 / ((w - block_size + 1) * (h - block_size + 1)) ) block_mask = torch.empty_like(x).bernoulli_(gamma) block_mask = F.max_pool2d( block_mask.to(x.dtype), kernel_size=clipped_block_size, stride=1, padding=clipped_block_size // 2 ) if with_noise: normal_noise = torch.empty_like(x).normal_() if inplace: x.mul_(1.0 - block_mask).add_(normal_noise * block_mask) else: x = x * (1.0 - block_mask) + normal_noise * block_mask else: block_mask = 1 - block_mask normalize_scale = (block_mask.numel() / block_mask.to(dtype=torch.float32).sum().add(1e-6)).to(dtype=x.dtype) if inplace: x.mul_(block_mask * normalize_scale) else: x = x * block_mask * normalize_scale return x class DropBlock2d(nn.Module): """DropBlock. See https://arxiv.org/pdf/1810.12890.pdf""" def __init__( self, drop_prob=0.1, block_size=7, gamma_scale=1.0, with_noise=False, inplace=False, batchwise=False, fast=True ): super(DropBlock2d, self).__init__() self.drop_prob = drop_prob self.gamma_scale = gamma_scale self.block_size = block_size self.with_noise = with_noise self.inplace = inplace self.batchwise = batchwise self.fast = fast # FIXME finish comparisons of fast vs not def forward(self, x): if not self.training or not self.drop_prob: return x if self.fast: return drop_block_fast_2d( x, self.drop_prob, self.block_size, self.gamma_scale, self.with_noise, self.inplace ) else: return drop_block_2d( x, self.drop_prob, self.block_size, self.gamma_scale, self.with_noise, self.inplace, self.batchwise ) def drop_path(x, drop_prob: float = 0.0, training: bool = False, scale_by_keep: bool = True): """Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks). This is the same as the DropConnect impl I created for EfficientNet, etc networks, however, the original name is misleading as 'Drop Connect' is a different form of dropout in a separate paper... See discussion: https://github.com/tensorflow/tpu/issues/494#issuecomment-532968956 ... I've opted for changing the layer and argument names to 'drop path' rather than mix DropConnect as a layer name and use 'survival rate' as the argument. """ if drop_prob == 0.0 or not training: return x keep_prob = 1 - drop_prob shape = (x.shape[0],) + (1,) * (x.ndim - 1) # work with diff dim tensors, not just 2D ConvNets random_tensor = x.new_empty(shape).bernoulli_(keep_prob) if keep_prob > 0.0 and scale_by_keep: random_tensor.div_(keep_prob) return x * random_tensor class DropPath(nn.Module): """Drop paths (Stochastic Depth) per sample (when applied in main path of residual blocks).""" def __init__(self, drop_prob=None, scale_by_keep=True): super(DropPath, self).__init__() self.drop_prob = drop_prob self.scale_by_keep = scale_by_keep def forward(self, x): return drop_path(x, self.drop_prob, self.training, self.scale_by_keep) def create_conv3d(in_channels, out_channels, kernel_size, **kwargs): """Select a 2d convolution implementation based on arguments Creates and returns one of torch.nn.Conv2d, Conv2dSame, MixedConv3d, or CondConv2d. Used extensively by EfficientNet, MobileNetv3 and related networks. """ depthwise = kwargs.pop("depthwise", False) # for DW out_channels must be multiple of in_channels as must have out_channels % groups == 0 groups = in_channels if depthwise else kwargs.pop("groups", 1) m = create_conv3d_pad(in_channels, out_channels, kernel_size, groups=groups, **kwargs) return m def conv3d_same( x, weight: torch.Tensor, bias: Optional[torch.Tensor] = None, stride: Tuple[int, int] = (1, 1, 1), padding: Tuple[int, int] = (0, 0, 0), dilation: Tuple[int, int] = (1, 1, 1), groups: int = 1, ): x = pad_same(x, weight.shape[-3:], stride, dilation) return F.conv3d(x, weight, bias, stride, (0, 0, 0), dilation, groups) class Conv3dSame(nn.Conv2d): """Tensorflow like 'SAME' convolution wrapper for 2D convolutions""" def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0, dilation=1, groups=1, bias=True): super(Conv3dSame, self).__init__(in_channels, out_channels, kernel_size, stride, 0, dilation, groups, bias) def forward(self, x): return conv3d_same(x, self.weight, self.bias, self.stride, self.padding, self.dilation, self.groups) def create_conv3d_pad(in_chs, out_chs, kernel_size, **kwargs): padding = kwargs.pop("padding", "") kwargs.setdefault("bias", False) padding, is_dynamic = get_padding_value(padding, kernel_size, **kwargs) if is_dynamic: return Conv3dSame(in_chs, out_chs, kernel_size, **kwargs) else: return nn.Conv3d(in_chs, out_chs, kernel_size, padding=padding, **kwargs) # Calculate symmetric padding for a convolution def get_padding(kernel_size: int, stride: int = 1, dilation: int = 1, **_) -> int: padding = ((stride - 1) + dilation * (kernel_size - 1)) // 2 return padding # Calculate asymmetric TensorFlow-like 'SAME' padding for a convolution def get_same_padding(x: int, k: int, s: int, d: int): return max((math.ceil(x / s) - 1) * s + (k - 1) * d + 1 - x, 0) # Can SAME padding for given args be done statically? def is_static_pad(kernel_size: int, stride: int = 1, dilation: int = 1, **_): return stride == 1 and (dilation * (kernel_size - 1)) % 2 == 0 # Dynamically pad input x with 'SAME' padding for conv with specified args def pad_same(x, k: List[int], s: List[int], d: List[int] = (1, 1, 1), value: float = 0): id, ih, iw = x.size()[-3:] pad_d, pad_h, pad_w = ( get_same_padding(id, k[0], s[0], d[0]), get_same_padding(ih, k[1], s[1], d[1]), get_same_padding(iw, k[2], s[2], d[2]), ) if pad_d > 0 or pad_h > 0 or pad_w > 0: x = F.pad( x, [pad_d // 2, pad_d - pad_d // 2, pad_w // 2, pad_w - pad_w // 2, pad_h // 2, pad_h - pad_h // 2], value=value, ) return x def get_padding_value(padding, kernel_size, **kwargs) -> Tuple[Tuple, bool]: dynamic = False if isinstance(padding, str): # for any string padding, the padding will be calculated for you, one of three ways padding = padding.lower() if padding == "same": # TF compatible 'SAME' padding, has a performance and GPU memory allocation impact if is_static_pad(kernel_size, **kwargs): # static case, no extra overhead padding = get_padding(kernel_size, **kwargs) else: # dynamic 'SAME' padding, has runtime/GPU memory overhead padding = 0 dynamic = True elif padding == "valid": # 'VALID' padding, same as padding=0 padding = 0 else: # Default to PyTorch style 'same'-ish symmetric padding padding = get_padding(kernel_size, **kwargs) return padding, dynamic # From PyTorch internals def _ntuple(n): def parse(x): if isinstance(x, collections.abc.Iterable): return x return tuple(repeat(x, n)) return parse to_1tuple = _ntuple(1) to_2tuple = _ntuple(2) to_3tuple = _ntuple(3) to_4tuple = _ntuple(4) to_ntuple = _ntuple def make_divisible(v, divisor=8, min_value=None, round_limit=0.9): min_value = min_value or divisor new_v = max(min_value, int(v + divisor / 2) // divisor * divisor) # Make sure that round down does not go down by more than 10%. if new_v < round_limit * v: new_v += divisor return new_v class Linear(nn.Linear): r"""Applies a linear transformation to the incoming data: :math:`y = xA^T + b` Wraps torch.nn.Linear to support AMP + torchscript usage by manually casting weight & bias to input.dtype to work around an issue w/ torch.addmm in this use case. """ def forward(self, input: torch.Tensor) -> torch.Tensor: if torch.jit.is_scripting(): bias = self.bias.to(dtype=input.dtype) if self.bias is not None else None return F.linear(input, self.weight.to(dtype=input.dtype), bias=bias) else: return F.linear(input, self.weight, self.bias) class Mlp(nn.Module): """MLP as used in Vision Transformer, MLP-Mixer and related networks""" def __init__(self, in_features, hidden_features=None, out_features=None, act_layer=nn.GELU, drop=0.0): super().__init__() out_features = out_features or in_features hidden_features = hidden_features or in_features drop_probs = to_2tuple(drop) self.fc1 = nn.Linear(in_features, hidden_features) self.act = act_layer() self.drop1 = nn.Dropout(drop_probs[0]) self.fc2 = nn.Linear(hidden_features, out_features) self.drop2 = nn.Dropout(drop_probs[1]) def forward(self, x): x = self.fc1(x) x = self.act(x) x = self.drop1(x) x = self.fc2(x) x = self.drop2(x) return x def avg_pool3d_same( x, kernel_size: List[int], stride: List[int], padding: List[int] = (0, 0, 0), ceil_mode: bool = False, count_include_pad: bool = True, ): # FIXME how to deal with count_include_pad vs not for external padding? x = pad_same(x, kernel_size, stride) return F.avg_pool3d(x, kernel_size, stride, (0, 0, 0), ceil_mode, count_include_pad) class AvgPool3dSame(nn.AvgPool2d): """Tensorflow like 'SAME' wrapper for 2D average pooling""" def __init__(self, kernel_size: int, stride=None, padding=0, ceil_mode=False, count_include_pad=True): kernel_size = to_2tuple(kernel_size) stride = to_2tuple(stride) super(AvgPool3dSame, self).__init__(kernel_size, stride, (0, 0, 0), ceil_mode, count_include_pad) def forward(self, x): x = pad_same(x, self.kernel_size, self.stride) return F.avg_pool3d(x, self.kernel_size, self.stride, self.padding, self.ceil_mode, self.count_include_pad) def max_pool3d_same( x, kernel_size: List[int], stride: List[int], padding: List[int] = (0, 0, 0), dilation: List[int] = (1, 1, 1), ceil_mode: bool = False, ): x = pad_same(x, kernel_size, stride, value=-float("inf")) return F.max_pool3d(x, kernel_size, stride, (0, 0, 0), dilation, ceil_mode) class MaxPool3dSame(nn.MaxPool2d): """Tensorflow like 'SAME' wrapper for 3D max pooling""" def __init__(self, kernel_size: int, stride=None, padding=0, dilation=1, ceil_mode=False): kernel_size = to_2tuple(kernel_size) stride = to_2tuple(stride) dilation = to_2tuple(dilation) super(MaxPool3dSame, self).__init__(kernel_size, stride, (0, 0, 0), dilation, ceil_mode) def forward(self, x): x = pad_same(x, self.kernel_size, self.stride, value=-float("inf")) return F.max_pool3d(x, self.kernel_size, self.stride, (0, 0, 0), self.dilation, self.ceil_mode) def create_pool3d(pool_type, kernel_size, stride=None, **kwargs): stride = stride or kernel_size padding = kwargs.pop("padding", "") padding, is_dynamic = get_padding_value(padding, kernel_size, stride=stride, **kwargs) if is_dynamic: if pool_type == "avg": return AvgPool3dSame(kernel_size, stride=stride, **kwargs) elif pool_type == "max": return MaxPool3dSame(kernel_size, stride=stride, **kwargs) else: raise AssertionError() # assert False, f"Unsupported pool type {pool_type}" else: if pool_type == "avg": return nn.AvgPool3d(kernel_size, stride=stride, padding=padding, **kwargs) elif pool_type == "max": return nn.MaxPool3d(kernel_size, stride=stride, padding=padding, **kwargs) else: raise AssertionError() # assert False, f"Unsupported pool type {pool_type}" def _float_to_int(x: float) -> int: """ Symbolic tracing helper to substitute for inbuilt `int`. Hint: Inbuilt `int` can't accept an argument of type `Proxy` """ return int(x) def _no_grad_trunc_normal_(tensor, mean, std, a, b): # Cut & paste from PyTorch official master until it's in a few official releases - RW # Method based on https://people.sc.fsu.edu/~jburkardt/presentations/truncated_normal.pdf def norm_cdf(x): # Computes standard normal cumulative distribution function return (1.0 + math.erf(x / math.sqrt(2.0))) / 2.0 if (mean < a - 2 * std) or (mean > b + 2 * std): warnings.warn( "mean is more than 2 std from [a, b] in nn.init.trunc_normal_. " "The distribution of values may be incorrect.", stacklevel=2, ) with torch.no_grad(): # Values are generated by using a truncated uniform distribution and # then using the inverse CDF for the normal distribution. # Get upper and lower cdf values l = norm_cdf((a - mean) / std) u = norm_cdf((b - mean) / std) # Uniformly fill tensor with values from [l, u], then translate to # [2l-1, 2u-1]. tensor.uniform_(2 * l - 1, 2 * u - 1) # Use inverse cdf transform for normal distribution to get truncated # standard normal tensor.erfinv_() # Transform to proper mean, std tensor.mul_(std * math.sqrt(2.0)) tensor.add_(mean) # Clamp to ensure it's in the proper range tensor.clamp_(min=a, max=b) return tensor def trunc_normal_(tensor, mean=0.0, std=1.0, a=-2.0, b=2.0): r"""Fills the input Tensor with values drawn from a truncated normal distribution. The values are effectively drawn from the normal distribution :math:`\mathcal{N}(\text{mean}, \text{std}^2)` with values outside :math:`[a, b]` redrawn until they are within the bounds. The method used for generating the random values works best when :math:`a \leq \text{mean} \leq b`. Args: tensor: an n-dimensional `torch.Tensor` mean: the mean of the normal distribution std: the standard deviation of the normal distribution a: the minimum cutoff value b: the maximum cutoff value Examples: >>> w = torch.empty(3, 5) >>> nn.init.trunc_normal_(w) """ return _no_grad_trunc_normal_(tensor, mean, std, a, b)