|
import math |
|
|
|
import numpy as np |
|
import torch |
|
import torch.nn as nn |
|
import torch.nn.functional as F |
|
|
|
|
|
class CausalSelfAttention(nn.Module): |
|
""" |
|
A vanilla multi-head masked self-attention layer with a projection at the end. |
|
It is possible to use torch.nn.MultiheadAttention here but I am including an |
|
explicit implementation here to show that there is nothing too scary here. |
|
""" |
|
|
|
def __init__(self, bert_n_emb, bert_n_head, attn_pdrop, resid_pdrop, |
|
latent_shape, sampler): |
|
super().__init__() |
|
assert bert_n_emb % bert_n_head == 0 |
|
|
|
self.key = nn.Linear(bert_n_emb, bert_n_emb) |
|
self.query = nn.Linear(bert_n_emb, bert_n_emb) |
|
self.value = nn.Linear(bert_n_emb, bert_n_emb) |
|
|
|
self.attn_drop = nn.Dropout(attn_pdrop) |
|
self.resid_drop = nn.Dropout(resid_pdrop) |
|
|
|
self.proj = nn.Linear(bert_n_emb, bert_n_emb) |
|
self.n_head = bert_n_head |
|
self.causal = True if sampler == 'autoregressive' else False |
|
if self.causal: |
|
block_size = np.prod(latent_shape) |
|
mask = torch.tril(torch.ones(block_size, block_size)) |
|
self.register_buffer("mask", mask.view(1, 1, block_size, |
|
block_size)) |
|
|
|
def forward(self, x, layer_past=None): |
|
B, T, C = x.size() |
|
|
|
|
|
k = self.key(x).view(B, T, self.n_head, |
|
C // self.n_head).transpose(1, |
|
2) |
|
q = self.query(x).view(B, T, self.n_head, |
|
C // self.n_head).transpose(1, |
|
2) |
|
v = self.value(x).view(B, T, self.n_head, |
|
C // self.n_head).transpose(1, |
|
2) |
|
|
|
present = torch.stack((k, v)) |
|
if self.causal and layer_past is not None: |
|
past_key, past_value = layer_past |
|
k = torch.cat((past_key, k), dim=-2) |
|
v = torch.cat((past_value, v), dim=-2) |
|
|
|
|
|
att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1))) |
|
|
|
if self.causal and layer_past is None: |
|
att = att.masked_fill(self.mask[:, :, :T, :T] == 0, float('-inf')) |
|
|
|
att = F.softmax(att, dim=-1) |
|
att = self.attn_drop(att) |
|
y = att @ v |
|
|
|
y = y.transpose(1, 2).contiguous().view(B, T, C) |
|
|
|
|
|
y = self.resid_drop(self.proj(y)) |
|
return y, present |
|
|
|
|
|
class Block(nn.Module): |
|
""" an unassuming Transformer block """ |
|
|
|
def __init__(self, bert_n_emb, resid_pdrop, bert_n_head, attn_pdrop, |
|
latent_shape, sampler): |
|
super().__init__() |
|
self.ln1 = nn.LayerNorm(bert_n_emb) |
|
self.ln2 = nn.LayerNorm(bert_n_emb) |
|
self.attn = CausalSelfAttention(bert_n_emb, bert_n_head, attn_pdrop, |
|
resid_pdrop, latent_shape, sampler) |
|
self.mlp = nn.Sequential( |
|
nn.Linear(bert_n_emb, 4 * bert_n_emb), |
|
nn.GELU(), |
|
nn.Linear(4 * bert_n_emb, bert_n_emb), |
|
nn.Dropout(resid_pdrop), |
|
) |
|
|
|
def forward(self, x, layer_past=None, return_present=False): |
|
|
|
attn, present = self.attn(self.ln1(x), layer_past) |
|
x = x + attn |
|
x = x + self.mlp(self.ln2(x)) |
|
|
|
if layer_past is not None or return_present: |
|
return x, present |
|
return x |
|
|
|
|
|
class Transformer(nn.Module): |
|
""" the full GPT language model, with a context size of block_size """ |
|
|
|
def __init__(self, |
|
codebook_size, |
|
segm_codebook_size, |
|
bert_n_emb, |
|
bert_n_layers, |
|
bert_n_head, |
|
block_size, |
|
latent_shape, |
|
embd_pdrop, |
|
resid_pdrop, |
|
attn_pdrop, |
|
sampler='absorbing'): |
|
super().__init__() |
|
|
|
self.vocab_size = codebook_size + 1 |
|
self.n_embd = bert_n_emb |
|
self.block_size = block_size |
|
self.n_layers = bert_n_layers |
|
self.codebook_size = codebook_size |
|
self.segm_codebook_size = segm_codebook_size |
|
self.causal = sampler == 'autoregressive' |
|
if self.causal: |
|
self.vocab_size = codebook_size |
|
|
|
self.tok_emb = nn.Embedding(self.vocab_size, self.n_embd) |
|
self.pos_emb = nn.Parameter( |
|
torch.zeros(1, self.block_size, self.n_embd)) |
|
self.segm_emb = nn.Embedding(self.segm_codebook_size, self.n_embd) |
|
self.start_tok = nn.Parameter(torch.zeros(1, 1, self.n_embd)) |
|
self.drop = nn.Dropout(embd_pdrop) |
|
|
|
|
|
self.blocks = nn.Sequential(*[ |
|
Block(bert_n_emb, resid_pdrop, bert_n_head, attn_pdrop, |
|
latent_shape, sampler) for _ in range(self.n_layers) |
|
]) |
|
|
|
self.ln_f = nn.LayerNorm(self.n_embd) |
|
self.head = nn.Linear(self.n_embd, self.codebook_size, bias=False) |
|
|
|
def get_block_size(self): |
|
return self.block_size |
|
|
|
def _init_weights(self, module): |
|
if isinstance(module, (nn.Linear, nn.Embedding)): |
|
module.weight.data.normal_(mean=0.0, std=0.02) |
|
if isinstance(module, nn.Linear) and module.bias is not None: |
|
module.bias.data.zero_() |
|
elif isinstance(module, nn.LayerNorm): |
|
module.bias.data.zero_() |
|
module.weight.data.fill_(1.0) |
|
|
|
def forward(self, idx, segm_tokens, t=None): |
|
|
|
token_embeddings = self.tok_emb(idx) |
|
|
|
segm_embeddings = self.segm_emb(segm_tokens) |
|
|
|
if self.causal: |
|
token_embeddings = torch.cat((self.start_tok.repeat( |
|
token_embeddings.size(0), 1, 1), token_embeddings), |
|
dim=1) |
|
|
|
t = token_embeddings.shape[1] |
|
assert t <= self.block_size, "Cannot forward, model block size is exhausted." |
|
|
|
|
|
position_embeddings = self.pos_emb[:, :t, :] |
|
|
|
x = token_embeddings + position_embeddings + segm_embeddings |
|
x = self.drop(x) |
|
for block in self.blocks: |
|
x = block(x) |
|
x = self.ln_f(x) |
|
logits = self.head(x) |
|
|
|
return logits |
|
|
|
|
|
class TransformerMultiHead(nn.Module): |
|
""" the full GPT language model, with a context size of block_size """ |
|
|
|
def __init__(self, |
|
codebook_size, |
|
segm_codebook_size, |
|
texture_codebook_size, |
|
bert_n_emb, |
|
bert_n_layers, |
|
bert_n_head, |
|
block_size, |
|
latent_shape, |
|
embd_pdrop, |
|
resid_pdrop, |
|
attn_pdrop, |
|
num_head, |
|
sampler='absorbing'): |
|
super().__init__() |
|
|
|
self.vocab_size = codebook_size + 1 |
|
self.n_embd = bert_n_emb |
|
self.block_size = block_size |
|
self.n_layers = bert_n_layers |
|
self.codebook_size = codebook_size |
|
self.segm_codebook_size = segm_codebook_size |
|
self.texture_codebook_size = texture_codebook_size |
|
self.causal = sampler == 'autoregressive' |
|
if self.causal: |
|
self.vocab_size = codebook_size |
|
|
|
self.tok_emb = nn.Embedding(self.vocab_size, self.n_embd) |
|
self.pos_emb = nn.Parameter( |
|
torch.zeros(1, self.block_size, self.n_embd)) |
|
self.segm_emb = nn.Embedding(self.segm_codebook_size, self.n_embd) |
|
self.texture_emb = nn.Embedding(self.texture_codebook_size, |
|
self.n_embd) |
|
self.start_tok = nn.Parameter(torch.zeros(1, 1, self.n_embd)) |
|
self.drop = nn.Dropout(embd_pdrop) |
|
|
|
|
|
self.blocks = nn.Sequential(*[ |
|
Block(bert_n_emb, resid_pdrop, bert_n_head, attn_pdrop, |
|
latent_shape, sampler) for _ in range(self.n_layers) |
|
]) |
|
|
|
self.num_head = num_head |
|
self.head_class_num = codebook_size // self.num_head |
|
self.ln_f = nn.LayerNorm(self.n_embd) |
|
self.head_list = nn.ModuleList([ |
|
nn.Linear(self.n_embd, self.head_class_num, bias=False) |
|
for _ in range(self.num_head) |
|
]) |
|
|
|
def get_block_size(self): |
|
return self.block_size |
|
|
|
def _init_weights(self, module): |
|
if isinstance(module, (nn.Linear, nn.Embedding)): |
|
module.weight.data.normal_(mean=0.0, std=0.02) |
|
if isinstance(module, nn.Linear) and module.bias is not None: |
|
module.bias.data.zero_() |
|
elif isinstance(module, nn.LayerNorm): |
|
module.bias.data.zero_() |
|
module.weight.data.fill_(1.0) |
|
|
|
def forward(self, idx, segm_tokens, texture_tokens, t=None): |
|
|
|
token_embeddings = self.tok_emb(idx) |
|
segm_embeddings = self.segm_emb(segm_tokens) |
|
texture_embeddings = self.texture_emb(texture_tokens) |
|
|
|
if self.causal: |
|
token_embeddings = torch.cat((self.start_tok.repeat( |
|
token_embeddings.size(0), 1, 1), token_embeddings), |
|
dim=1) |
|
|
|
t = token_embeddings.shape[1] |
|
assert t <= self.block_size, "Cannot forward, model block size is exhausted." |
|
|
|
|
|
position_embeddings = self.pos_emb[:, :t, :] |
|
|
|
x = token_embeddings + position_embeddings + segm_embeddings + texture_embeddings |
|
x = self.drop(x) |
|
for block in self.blocks: |
|
x = block(x) |
|
x = self.ln_f(x) |
|
logits_list = [self.head_list[i](x) for i in range(self.num_head)] |
|
|
|
return logits_list |
|
|