|
import math |
|
import torch |
|
import torch.nn as nn |
|
import torch.nn.functional as F |
|
|
|
|
|
|
|
|
|
|
|
def get_timestep_embedding(timesteps, embedding_dim): |
|
""" |
|
This matches the implementation in Denoising Diffusion Probabilistic Models: |
|
From Fairseq. |
|
Build sinusoidal embeddings. |
|
This matches the implementation in tensor2tensor, but differs slightly |
|
from the description in Section 3.5 of "Attention Is All You Need". |
|
""" |
|
assert len(timesteps.shape) == 1 |
|
|
|
half_dim = embedding_dim // 2 |
|
emb = math.log(10000) / (half_dim - 1) |
|
emb = torch.exp(torch.arange(half_dim, dtype=torch.float32) * -emb) |
|
emb = emb.to(device=timesteps.device) |
|
emb = timesteps.float()[:, None] * emb[None, :] |
|
emb = torch.cat([torch.sin(emb), torch.cos(emb)], dim=1) |
|
if embedding_dim % 2 == 1: |
|
emb = torch.nn.functional.pad(emb, (0, 1, 0, 0)) |
|
return emb |
|
|
|
|
|
def nonlinearity(x): |
|
|
|
return x*torch.sigmoid(x) |
|
|
|
|
|
def Normalize(in_channels): |
|
return torch.nn.GroupNorm(num_groups=32, num_channels=in_channels, eps=1e-6, affine=True) |
|
|
|
|
|
class Upsample(nn.Module): |
|
def __init__(self, in_channels, with_conv): |
|
super().__init__() |
|
self.with_conv = with_conv |
|
if self.with_conv: |
|
self.conv = torch.nn.Conv2d(in_channels,in_channels,kernel_size=3,stride=1,padding=1) |
|
|
|
|
|
def forward(self, x): |
|
x = torch.nn.functional.interpolate(x, scale_factor=2.0, mode="nearest") |
|
|
|
if self.with_conv: |
|
x = self.conv(x) |
|
return x |
|
|
|
|
|
class Downsample(nn.Module): |
|
def __init__(self, in_channels, with_conv): |
|
super().__init__() |
|
self.with_conv = with_conv |
|
if self.with_conv: |
|
|
|
self.conv = torch.nn.Conv2d(in_channels,in_channels,kernel_size=3,stride=2,padding=0) |
|
|
|
def forward(self, x): |
|
if self.with_conv: |
|
pad = (0, 1, 0, 1) |
|
x = torch.nn.functional.pad(x, pad, mode="constant", value=0) |
|
x = self.conv(x) |
|
else: |
|
x = torch.nn.functional.avg_pool2d(x, kernel_size=2, stride=2) |
|
return x |
|
|
|
|
|
|
|
class ResnetBlock(nn.Module): |
|
def __init__(self, *, in_channels, out_channels=None, conv_shortcut=False, |
|
dropout, temb_channels=512): |
|
super().__init__() |
|
self.in_channels = in_channels |
|
out_channels = in_channels if out_channels is None else out_channels |
|
self.out_channels = out_channels |
|
self.use_conv_shortcut = conv_shortcut |
|
|
|
self.norm1 = Normalize(in_channels) |
|
self.conv1 = torch.nn.Conv2d(in_channels,out_channels,kernel_size=3,stride=1,padding=1) |
|
self.temb_proj = torch.nn.Linear(temb_channels,out_channels) |
|
self.norm2 = Normalize(out_channels) |
|
self.dropout = torch.nn.Dropout(dropout) |
|
self.conv2 = torch.nn.Conv2d(out_channels,out_channels,kernel_size=3,stride=1,padding=1) |
|
if self.in_channels != self.out_channels: |
|
if self.use_conv_shortcut: |
|
self.conv_shortcut = torch.nn.Conv2d(in_channels,out_channels,kernel_size=3,stride=1,padding=1) |
|
else: |
|
self.nin_shortcut = torch.nn.Conv2d(in_channels,out_channels,kernel_size=1,stride=1,padding=0) |
|
|
|
def forward(self, x, temb): |
|
h = x |
|
h = self.norm1(h) |
|
h = nonlinearity(h) |
|
h = self.conv1(h) |
|
|
|
h = h + self.temb_proj(nonlinearity(temb))[:, :, None, None] |
|
|
|
h = self.norm2(h) |
|
h = nonlinearity(h) |
|
h = self.dropout(h) |
|
h = self.conv2(h) |
|
|
|
if self.in_channels != self.out_channels: |
|
if self.use_conv_shortcut: |
|
x = self.conv_shortcut(x) |
|
else: |
|
x = self.nin_shortcut(x) |
|
|
|
return x+h |
|
|
|
|
|
class AttnBlock(nn.Module): |
|
def __init__(self, in_channels): |
|
super().__init__() |
|
self.in_channels = in_channels |
|
|
|
self.norm = Normalize(in_channels) |
|
self.q = torch.nn.Conv2d(in_channels, in_channels, kernel_size = 1, stride = 1, padding = 0) |
|
self.k = torch.nn.Conv2d(in_channels, in_channels, kernel_size = 1, stride = 1, padding = 0) |
|
self.v = torch.nn.Conv2d(in_channels, in_channels, kernel_size = 1, stride = 1, padding = 0) |
|
self.proj_out = torch.nn.Conv2d(in_channels, in_channels, kernel_size = 1, stride = 1, padding=0) |
|
|
|
def forward(self, x): |
|
h_ = x |
|
h_ = self.norm(h_) |
|
|
|
q = self.q(h_) |
|
k = self.k(h_) |
|
v = self.v(h_) |
|
|
|
|
|
b, c, h, w = q.shape |
|
q = q.reshape(b, c, h*w) |
|
q = q.permute(0, 2, 1) |
|
k = k.reshape(b, c, h*w) |
|
w_ = torch.bmm(q, k) |
|
w_ = w_ * (int(c)**(-0.5)) |
|
w_ = torch.nn.functional.softmax(w_, dim=2) |
|
|
|
|
|
v = v.reshape(b, c, h*w) |
|
w_ = w_.permute(0, 2, 1) |
|
|
|
h_ = torch.bmm(v, w_) |
|
h_ = h_.reshape(b, c, h, w) |
|
|
|
h_ = self.proj_out(h_) |
|
|
|
return x+h_ |
|
|
|
class MultiHeadAttnBlock(nn.Module): |
|
def __init__(self, in_channels): |
|
super().__init__() |
|
num_heads = 8 |
|
assert in_channels % num_heads == 0, "in_channels must be divisible by num_heads" |
|
|
|
self.in_channels = in_channels |
|
self.num_heads = num_heads |
|
self.head_dim = in_channels // num_heads |
|
|
|
self.norm = Normalize(in_channels) |
|
self.q = torch.nn.Conv2d(in_channels, in_channels, kernel_size=1, stride=1, padding=0) |
|
self.k = torch.nn.Conv2d(in_channels, in_channels, kernel_size=1, stride=1, padding=0) |
|
self.v = torch.nn.Conv2d(in_channels, in_channels, kernel_size=1, stride=1, padding=0) |
|
self.proj_out = torch.nn.Conv2d(in_channels, in_channels, kernel_size=1, stride=1, padding=0) |
|
|
|
def forward(self, x): |
|
h_ = x |
|
h_ = self.norm(h_) |
|
|
|
q = self.q(h_) |
|
k = self.k(h_) |
|
v = self.v(h_) |
|
|
|
|
|
b, c, h, w = q.shape |
|
q = q.reshape(b, self.num_heads, self.head_dim, h * w).permute(0, 1, 3, 2) |
|
k = k.reshape(b, self.num_heads, self.head_dim, h * w) |
|
v = v.reshape(b, self.num_heads, self.head_dim, h * w) |
|
|
|
|
|
w_ = torch.einsum('bnqd,bnkd->bnqk', q, k) |
|
w_ = w_ * (self.head_dim ** -0.5) |
|
w_ = torch.nn.functional.softmax(w_, dim=-1) |
|
|
|
|
|
h_ = torch.einsum('bnqk,bnvd->bnqd', w_, v) |
|
h_ = h_.permute(0, 1, 3, 2).reshape(b, c, h, w) |
|
|
|
h_ = self.proj_out(h_) |
|
|
|
return x + h_ |
|
|
|
class DiffusionUNet(nn.Module): |
|
def __init__(self,ch, num_res_blocks, image_size, drop_out): |
|
super().__init__() |
|
self.dropout=drop_out |
|
|
|
ch_mult = [1, 1, 2, 2, 4, 4] |
|
|
|
attn_resolutions = [32,16,] |
|
|
|
resamp_with_conv = True |
|
|
|
|
|
in_channels = 2 |
|
out_ch = 1 |
|
resolution = image_size |
|
|
|
self.ch = ch |
|
self.temb_ch = self.ch*4 |
|
self.num_resolutions = len(ch_mult) |
|
|
|
self.num_res_blocks = num_res_blocks |
|
self.resolution = resolution |
|
self.in_channels = in_channels |
|
|
|
|
|
|
|
|
|
|
|
self.shear_emb = nn.Embedding(num_embeddings=3, embedding_dim=self.temb_ch) |
|
|
|
|
|
self.temb = nn.Module() |
|
self.temb.dense = nn.ModuleList([ |
|
torch.nn.Linear(self.ch, self.temb_ch), |
|
torch.nn.Linear(self.temb_ch, self.temb_ch),]) |
|
|
|
|
|
self.conv_in = torch.nn.Conv2d(in_channels, self.ch, kernel_size = 3, stride = 1, padding = 1) |
|
|
|
curr_res = resolution |
|
in_ch_mult = (1,)+tuple(ch_mult) |
|
self.down = nn.ModuleList() |
|
block_in = None |
|
for i_level in range(self.num_resolutions): |
|
block = nn.ModuleList() |
|
attn = nn.ModuleList() |
|
block_in = ch*in_ch_mult[i_level] |
|
block_out = ch*ch_mult[i_level] |
|
for i_block in range(self.num_res_blocks): |
|
block.append(ResnetBlock(in_channels = block_in, out_channels = block_out, temb_channels = self.temb_ch, dropout = self.dropout)) |
|
block_in = block_out |
|
if curr_res in attn_resolutions: |
|
attn.append(AttnBlock(block_in)) |
|
down = nn.Module() |
|
down.block = block |
|
down.attn = attn |
|
if i_level != self.num_resolutions-1: |
|
down.downsample = Downsample(block_in, resamp_with_conv) |
|
curr_res = curr_res // 2 |
|
self.down.append(down) |
|
|
|
|
|
self.mid = nn.Module() |
|
self.mid.block_1 = ResnetBlock(in_channels=block_in,out_channels=block_in,temb_channels=self.temb_ch,dropout=self.dropout) |
|
self.mid.attn_1 = AttnBlock(block_in) |
|
self.mid.block_2 = ResnetBlock(in_channels=block_in,out_channels=block_in,temb_channels=self.temb_ch,dropout=self.dropout) |
|
|
|
|
|
self.up = nn.ModuleList() |
|
for i_level in reversed(range(self.num_resolutions)): |
|
block = nn.ModuleList() |
|
attn = nn.ModuleList() |
|
block_out = ch*ch_mult[i_level] |
|
skip_in = ch*ch_mult[i_level] |
|
for i_block in range(self.num_res_blocks+1): |
|
if i_block == self.num_res_blocks: |
|
skip_in = ch*in_ch_mult[i_level] |
|
block.append(ResnetBlock(in_channels=block_in+skip_in,out_channels=block_out,temb_channels=self.temb_ch,dropout=self.dropout)) |
|
block_in = block_out |
|
if curr_res in attn_resolutions: |
|
attn.append(AttnBlock(block_in)) |
|
up = nn.Module() |
|
up.block = block |
|
up.attn = attn |
|
if i_level != 0: |
|
up.upsample = Upsample(block_in, resamp_with_conv) |
|
curr_res = curr_res * 2 |
|
self.up.insert(0, up) |
|
|
|
|
|
self.norm_out = Normalize(block_in) |
|
self.conv_out = torch.nn.Conv2d(block_in,out_ch,kernel_size=3,stride=1,padding=1) |
|
|
|
def forward(self, x, t, shear): |
|
|
|
|
|
assert x.shape[2] == x.shape[3] == self.resolution |
|
|
|
|
|
temb = get_timestep_embedding(t, self.ch) |
|
temb = self.temb.dense[0](temb) |
|
temb = nonlinearity(temb) |
|
temb = self.temb.dense[1](temb) |
|
|
|
|
|
shear_emb = self.shear_emb(shear) |
|
''' |
|
shear_emb=nn.functional.one_hot(shear, num_classes=3).type(torch.float) #one hot |
|
shear_emb=self.shearemb.dense[0](shear_emb) |
|
shear_emb = nonlinearity(shear_emb) |
|
shear_emb=self.shearemb.dense[1](shear_emb) |
|
''' |
|
|
|
temb = temb + shear_emb |
|
|
|
|
|
hs = [self.conv_in(x)] |
|
for i_level in range(self.num_resolutions): |
|
for i_block in range(self.num_res_blocks): |
|
h = self.down[i_level].block[i_block](hs[-1], temb) |
|
if len(self.down[i_level].attn) > 0: |
|
h = self.down[i_level].attn[i_block](h) |
|
hs.append(h) |
|
if i_level != self.num_resolutions-1: |
|
hs.append(self.down[i_level].downsample(hs[-1])) |
|
|
|
|
|
|
|
h = hs[-1] |
|
h = self.mid.block_1(h, temb) |
|
h = self.mid.attn_1(h) |
|
h = self.mid.block_2(h, temb) |
|
|
|
|
|
for i_level in reversed(range(self.num_resolutions)): |
|
for i_block in range(self.num_res_blocks+1): |
|
h = self.up[i_level].block[i_block]( |
|
torch.cat([h, hs.pop()], dim=1), temb) |
|
if len(self.up[i_level].attn) > 0: |
|
h = self.up[i_level].attn[i_block](h) |
|
if i_level != 0: |
|
h = self.up[i_level].upsample(h) |
|
|
|
|
|
h = self.norm_out(h) |
|
h = nonlinearity(h) |
|
h = self.conv_out(h) |
|
return h |