import torch import math import copy class Embedder(torch.nn.Module): def __init__(self, vocab_size, d_model): super().__init__() self.embed = torch.nn.Embedding(vocab_size, d_model) def forward(self, x): return self.embed(x) class PositionalEncoder(torch.nn.Module): def __init__(self, d_model, dropout=0.1, max_seq_len = 80): super().__init__() self.dropout = torch.nn.Dropout(p=dropout) position = torch.arange(max_seq_len).unsqueeze(1) div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model)) pe = torch.zeros(max_seq_len, 1, d_model) pe[:, 0, 0::2] = torch.sin(position * div_term) pe[:, 0, 1::2] = torch.cos(position * div_term) self.register_buffer('pe', pe) # notifies PyTorch that this value should be saved like a model parameter but should not have gradients def forward(self, x): x = x + self.pe[:x.size(0)] return self.dropout(x) class MultiHeadAttention(torch.nn.Module): def __init__(self, heads, d_model, dropout = 0.1): super().__init__() self.d_model = d_model self.d_k = d_model // heads self.h = heads self.q_linear = torch.nn.Linear(d_model, d_model) self.v_linear = torch.nn.Linear(d_model, d_model) self.k_linear = torch.nn.Linear(d_model, d_model) self.dropout = torch.nn.Dropout(dropout) self.out = torch.nn.Linear(d_model, d_model) def forward(self, q, k, v, mask=None): bs = q.size(0) # perform linear operation and split into h heads k = self.k_linear(k).view(bs, -1, self.h, self.d_k) q = self.q_linear(q).view(bs, -1, self.h, self.d_k) v = self.v_linear(v).view(bs, -1, self.h, self.d_k) # transpose to get dimensions bs * h * sl * d_model k = k.transpose(1,2) q = q.transpose(1,2) v = v.transpose(1,2) # calculate attention using function we will define next scores = attention(q, k, v, self.d_k, mask, self.dropout) # concatenate heads and put through final linear layer concat = scores.transpose(1,2).contiguous().view(bs, -1, self.d_model) output = self.out(concat) return output def attention(q, k, v, d_k, mask=None, dropout=None): scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(d_k) if mask is not None: mask = mask.unsqueeze(1) scores = scores.masked_fill(mask == 0, -1e9) scores = torch.nn.functional.softmax(scores, dim=-1) if dropout is not None: scores = dropout(scores) output = torch.matmul(scores, v) return output class FeedForward(torch.nn.Module): def __init__(self, d_model, d_ff=2048, dropout = 0.1): super().__init__() # We set d_ff as a default to 2048 self.linear_1 = torch.nn.Linear(d_model, d_ff) self.dropout = torch.nn.Dropout(dropout) self.linear_2 = torch.nn.Linear(d_ff, d_model) def forward(self, x): x = self.dropout(torch.nn.functional.relu(self.linear_1(x))) x = self.linear_2(x) return x class Norm(torch.nn.Module): def __init__(self, d_model, eps = 1e-6): super().__init__() self.size = d_model # create two learnable parameters to calibrate normalization self.alpha = torch.nn.Parameter(torch.ones(self.size)) self.bias = torch.nn.Parameter(torch.zeros(self.size)) self.eps = eps def forward(self, x): norm = self.alpha * (x - x.mean(dim=-1, keepdim=True)) / (x.std(dim=-1, keepdim=True) + self.eps) + self.bias return norm # encoder layer with one multi-head attention layer and one # feed-forward layer class EncoderLayer(torch.nn.Module): def __init__(self, d_model, heads, dropout = 0.1): super().__init__() self.norm_1 = Norm(d_model) self.norm_2 = Norm(d_model) self.attn = MultiHeadAttention(heads, d_model) self.ff = FeedForward(d_model) self.dropout_1 = torch.nn.Dropout(dropout) self.dropout_2 = torch.nn.Dropout(dropout) def forward(self, x, mask): x2 = self.norm_1(x) x = x + self.dropout_1(self.attn(x2,x2,x2,mask)) x2 = self.norm_2(x) x = x + self.dropout_2(self.ff(x2)) return x # build a decoder layer with two multi-head attention layers and # one feed-forward layer class DecoderLayer(torch.nn.Module): def __init__(self, d_model, heads, dropout=0.1): super().__init__() self.norm_1 = Norm(d_model) self.norm_2 = Norm(d_model) self.norm_3 = Norm(d_model) self.dropout_1 = torch.nn.Dropout(dropout) self.dropout_2 = torch.nn.Dropout(dropout) self.dropout_3 = torch.nn.Dropout(dropout) self.attn_1 = MultiHeadAttention(heads, d_model) self.attn_2 = MultiHeadAttention(heads, d_model) self.ff = FeedForward(d_model) def forward(self, x, e_outputs, src_mask, trg_mask): x2 = self.norm_1(x) x = x + self.dropout_1(self.attn_1(x2, x2, x2, trg_mask)) x2 = self.norm_2(x) x = x + self.dropout_2(self.attn_2(x2, e_outputs, e_outputs, src_mask)) x2 = self.norm_3(x) x = x + self.dropout_3(self.ff(x2)) return x # generate multiple layers def get_clones(module, N): return torch.nn.ModuleList([copy.deepcopy(module) for i in range(N)]) class Encoder(torch.nn.Module): def __init__(self, vocab_size, d_model, N, heads): super().__init__() self.N = N self.embed = Embedder(vocab_size, d_model) self.pe = PositionalEncoder(d_model) self.layers = get_clones(EncoderLayer(d_model, heads), N) self.norm = Norm(d_model) def forward(self, src, mask): x = self.embed(src) x = self.pe(x) for i in range(self.N): x = self.layers[i](x, mask) return self.norm(x) class Decoder(torch.nn.Module): def __init__(self, vocab_size, d_model, N, heads): super().__init__() self.N = N self.embed = Embedder(vocab_size, d_model) self.pe = PositionalEncoder(d_model) self.layers = get_clones(DecoderLayer(d_model, heads), N) self.norm = Norm(d_model) def forward(self, trg, e_outputs, src_mask, trg_mask): x = self.embed(trg) x = self.pe(x) for i in range(self.N): x = self.layers[i](x, e_outputs, src_mask, trg_mask) return self.norm(x) class Transformer(torch.nn.Module): def __init__(self, src_vocab, trg_vocab, d_model, N, heads): super().__init__() self.encoder = Encoder(src_vocab, d_model, N, heads) self.decoder = Decoder(trg_vocab, d_model, N, heads) self.out = torch.nn.Linear(d_model, trg_vocab) def forward(self, src, trg, src_mask, trg_mask): e_outputs = self.encoder(src, src_mask) d_output = self.decoder(trg, e_outputs, src_mask, trg_mask) output = self.out(d_output) return output