Spaces:
Runtime error
Runtime error
File size: 10,145 Bytes
bc32eea |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 |
import math
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.utils import weight_norm
from models.util import concat_elu, WNConv2d
class NN(nn.Module):
"""Neural network used to parametrize the transformations of an MLCoupling.
An `NN` is a stack of blocks, where each block consists of the following
two layers connected in a residual fashion:
1. Conv: input -> nonlinearit -> conv3x3 -> nonlinearity -> gate
2. Attn: input -> conv1x1 -> multihead self-attention -> gate,
where gate refers to a 1×1 convolution that doubles the number of channels,
followed by a gated linear unit (Dauphin et al., 2016).
The convolutional layer is identical to the one used by PixelCNN++
(Salimans et al., 2017), and the multi-head self attention mechanism we
use is identical to the one in the Transformer (Vaswani et al., 2017).
Args:
in_channels (int): Number of channels in the input.
out_channels (int): Number of channels in the output.
num_channels (int): Number of channels in each block of the network.
num_blocks (int): Number of blocks in the network.
num_components (int): Number of components in the mixture.
drop_prob (float): Dropout probability.
use_attn (bool): Use attention in each block.
aux_channels (int): Number of channels in optional auxiliary input.
"""
def __init__(self, in_channels, out_channels, num_channels, num_blocks, num_components, drop_prob, use_attn=True, aux_channels=None):
#import pdb;pdb.set_trace()
super(NN, self).__init__()
self.k = num_components # k = number of mixture components
self.in_conv = WNConv2d(in_channels, num_channels, kernel_size=3, padding=1)
self.share_attn_params = False
if self.share_attn_params:
self.mid_conv = ConvAttnBlock(num_channels, drop_prob, use_attn, aux_channels)
self.num_blocks = num_blocks
else:
self.mid_convs = nn.ModuleList([ConvAttnBlock(num_channels, drop_prob, use_attn, aux_channels)
for _ in range(num_blocks)])
self.out_conv = WNConv2d(num_channels, out_channels * (2 + 3 * self.k),
kernel_size=3, padding=1)
self.rescale = weight_norm(Rescale(out_channels))
self.out_channels = out_channels
def forward(self, x, aux=None):
b, c, h, w = x.size()
#import pdb;pdb.set_trace()
x = self.in_conv(x)
if self.share_attn_params:
for _ in range(self.num_blocks):
x = self.mid_conv(x, aux)
else:
for conv in self.mid_convs:
x = conv(x, aux)
x = self.out_conv(x)
#import pdb;pdb.set_trace()
# Split into components and post-process
x = x.view(b, -1, self.out_channels, h, w)
s, t, pi, mu, scales = x.split((1, 1, self.k, self.k, self.k), dim=1)
s = self.rescale(torch.tanh(s.squeeze(1)))
t = t.squeeze(1)
scales = scales.clamp(min=-7) # From the code in original Flow++ paper
return s, t, pi, mu, scales
class ConvAttnBlock(nn.Module):
def __init__(self, num_channels, drop_prob, use_attn, aux_channels):
super(ConvAttnBlock, self).__init__()
self.conv = GatedConv(num_channels, drop_prob, aux_channels)
self.norm_1 = nn.LayerNorm(num_channels)
if use_attn:
self.attn = GatedAttn(num_channels, drop_prob=drop_prob)
self.norm_2 = nn.LayerNorm(num_channels)
else:
self.attn = None
def forward(self, x, aux=None):
x = self.conv(x, aux) + x
x = x.permute(0, 2, 3, 1) # (b, h, w, c)
x = self.norm_1(x)
if self.attn:
x = self.attn(x) + x
x = self.norm_2(x)
x = x.permute(0, 3, 1, 2) # (b, c, h, w)
return x
class GatedAttn(nn.Module):
"""Gated Multi-Head Self-Attention Block
Based on the paper:
"Attention Is All You Need"
by Ashish Vaswani, Noam Shazeer, Niki Parmar, Jakob Uszkoreit, Llion Jones,
Aidan N. Gomez, Lukasz Kaiser, Illia Polosukhin
(https://arxiv.org/abs/1706.03762).
Args:
d_model (int): Number of channels in the input.
num_heads (int): Number of attention heads.
drop_prob (float): Dropout probability.
"""
def __init__(self, d_model, num_heads=4, drop_prob=0.):
super(GatedAttn, self).__init__()
self.d_model = d_model
self.num_heads = num_heads
self.drop_prob = drop_prob
self.in_proj = weight_norm(nn.Linear(d_model, 3 * d_model, bias=False))
self.gate = weight_norm(nn.Linear(d_model, 2 * d_model))
def forward(self, x):
# Flatten and encode position
b, h, w, c = x.size()
x = x.view(b, h * w, c)
_, seq_len, num_channels = x.size()
pos_encoding = self.get_pos_enc(seq_len, num_channels, x.device)
x = x + pos_encoding
# Compute q, k, v
memory, query = torch.split(self.in_proj(x), (2 * c, c), dim=-1)
q = self.split_last_dim(query, self.num_heads)
k, v = [self.split_last_dim(tensor, self.num_heads)
for tensor in torch.split(memory, self.d_model, dim=2)]
# Compute attention and reshape
key_depth_per_head = self.d_model // self.num_heads
q *= key_depth_per_head ** -0.5
x = self.dot_product_attention(q, k, v)
x = self.combine_last_two_dim(x.permute(0, 2, 1, 3))
x = x.transpose(1, 2).view(b, c, h, w).permute(0, 2, 3, 1) # (b, h, w, c)
x = self.gate(x)
a, b = x.chunk(2, dim=-1)
x = a * torch.sigmoid(b)
return x
def dot_product_attention(self, q, k, v, bias=False):
"""Dot-product attention.
Args:
q (torch.Tensor): Queries of shape (batch, heads, length_q, depth_k)
k (torch.Tensor): Keys of shape (batch, heads, length_kv, depth_k)
v (torch.Tensor): Values of shape (batch, heads, length_kv, depth_v)
bias (bool): Use bias for attention.
Returns:
attn (torch.Tensor): Output of attention mechanism.
"""
weights = torch.matmul(q, k.permute(0, 1, 3, 2))
if bias:
weights += self.bias
weights = F.softmax(weights, dim=-1)
weights = F.dropout(weights, self.drop_prob, self.training)
attn = torch.matmul(weights, v)
return attn
@staticmethod
def split_last_dim(x, n):
"""Reshape x so that the last dimension becomes two dimensions.
The first of these two dimensions is n.
Args:
x (torch.Tensor): Tensor with shape (..., m)
n (int): Size of second-to-last dimension.
Returns:
ret (torch.Tensor): Resulting tensor with shape (..., n, m/n)
"""
old_shape = list(x.size())
last = old_shape[-1]
new_shape = old_shape[:-1] + [n] + [last // n if last else None]
ret = x.view(new_shape)
return ret.permute(0, 2, 1, 3)
@staticmethod
def combine_last_two_dim(x):
"""Merge the last two dimensions of `x`.
Args:
x (torch.Tensor): Tensor with shape (..., m, n)
Returns:
ret (torch.Tensor): Resulting tensor with shape (..., m * n)
"""
old_shape = list(x.size())
a, b = old_shape[-2:]
new_shape = old_shape[:-2] + [a * b if a and b else None]
ret = x.contiguous().view(new_shape)
return ret
@staticmethod
def get_pos_enc(seq_len, num_channels, device):
position = torch.arange(seq_len, dtype=torch.float32, device=device)
num_timescales = num_channels // 2
log_timescale_increment = math.log(10000.) / (num_timescales - 1)
inv_timescales = torch.arange(num_timescales,
dtype=torch.float32,
device=device)
inv_timescales *= -log_timescale_increment
inv_timescales = inv_timescales.exp_()
scaled_time = position.unsqueeze(1) * inv_timescales.unsqueeze(0)
encoding = torch.cat([scaled_time.sin(), scaled_time.cos()], dim=1)
encoding = F.pad(encoding, [0, num_channels % 2, 0, 0])
encoding = encoding.view(1, seq_len, num_channels)
return encoding
class GatedConv(nn.Module):
"""Gated Convolution Block
Originally used by PixelCNN++ (https://arxiv.org/pdf/1701.05517).
Args:
num_channels (int): Number of channels in hidden activations.
drop_prob (float): Dropout probability.
aux_channels (int): Number of channels in optional auxiliary input.
"""
def __init__(self, num_channels, drop_prob=0., aux_channels=None):
super(GatedConv, self).__init__()
self.nlin = concat_elu
self.conv = WNConv2d(2 * num_channels, num_channels, kernel_size=3, padding=1)
self.drop = nn.Dropout2d(drop_prob)
self.gate = WNConv2d(2 * num_channels, 2 * num_channels, kernel_size=1, padding=0)
if aux_channels is not None:
self.aux_conv = WNConv2d(2 * aux_channels, num_channels, kernel_size=1, padding=0)
else:
self.aux_conv = None
def forward(self, x, aux=None):
x = self.nlin(x)
x = self.conv(x)
if aux is not None:
aux = self.nlin(aux)
x = x + self.aux_conv(aux)
x = self.nlin(x)
x = self.drop(x)
x = self.gate(x)
a, b = x.chunk(2, dim=1)
x = a * torch.sigmoid(b)
return x
class Rescale(nn.Module):
"""Per-channel rescaling. Need a proper `nn.Module` so we can wrap it
with `torch.nn.utils.weight_norm`.
Args:
num_channels (int): Number of channels in the input.
"""
def __init__(self, num_channels):
super(Rescale, self).__init__()
self.weight = nn.Parameter(torch.ones(num_channels, 1, 1))
def forward(self, x):
x = self.weight * x
return x
|