englishToSpanish / transformer.py
reputation's picture
Upload 7 files
2189a68 verified
import numpy as np
import torch
import math
from torch import nn
import torch.nn.functional as F
def get_device():
return torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
def scaled_dot_product(q, k, v, mask=None):
d_k = q.size()[-1]
scaled = torch.matmul(q, k.transpose(-1, -2)) / math.sqrt(d_k)
if mask is not None:
scaled = scaled.permute(1, 0, 2, 3) + mask
scaled = scaled.permute(1, 0, 2, 3)
attention = F.softmax(scaled, dim=-1)
values = torch.matmul(attention, v)
return values, attention
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_sequence_length):
super().__init__()
self.max_sequence_length = max_sequence_length
self.d_model = d_model
def forward(self):
even_i = torch.arange(0, self.d_model, 2).float()
denominator = torch.pow(10000, even_i/self.d_model)
position = (torch.arange(self.max_sequence_length)
.reshape(self.max_sequence_length, 1))
even_PE = torch.sin(position / denominator)
odd_PE = torch.cos(position / denominator)
stacked = torch.stack([even_PE, odd_PE], dim=2)
PE = torch.flatten(stacked, start_dim=1, end_dim=2)
return PE
class SentenceEmbedding(nn.Module):
def __init__(self, max_sequence_length, d_model, language_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN):
super().__init__()
self.vocab_size = len(language_to_index)
self.max_sequence_length = max_sequence_length
self.embedding = nn.Embedding(self.vocab_size, d_model)
self.language_to_index = language_to_index
self.position_encoder = PositionalEncoding(d_model, max_sequence_length)
self.dropout = nn.Dropout(p=0.1)
self.START_TOKEN = START_TOKEN
self.END_TOKEN = END_TOKEN
self.PADDING_TOKEN = PADDING_TOKEN
def batch_tokenize(self, batch, start_token, end_token):
def tokenize(sentence, start_token, end_token):
sentence_word_indicies = [self.language_to_index[token] for token in list(sentence)]
if start_token:
sentence_word_indicies.insert(0, self.language_to_index[self.START_TOKEN])
if end_token:
sentence_word_indicies.append(self.language_to_index[self.END_TOKEN])
for _ in range(len(sentence_word_indicies), self.max_sequence_length):
sentence_word_indicies.append(self.language_to_index[self.PADDING_TOKEN])
return torch.tensor(sentence_word_indicies)
tokenized = []
for sentence_num in range(len(batch)):
tokenized.append( tokenize(batch[sentence_num], start_token, end_token) )
tokenized = torch.stack(tokenized)
return tokenized.to(get_device())
def forward(self, x, start_token, end_token):
x = self.batch_tokenize(x, start_token, end_token)
x = self.embedding(x)
pos = self.position_encoder().to(get_device())
x = self.dropout(x + pos)
return x
class MultiHeadAttention(nn.Module):
def __init__(self, d_model, num_heads):
super().__init__()
self.d_model = d_model
self.num_heads = num_heads
self.head_dim = d_model // num_heads
self.qkv_layer = nn.Linear(d_model , 3 * d_model)
self.linear_layer = nn.Linear(d_model, d_model)
def forward(self, x, mask):
batch_size, sequence_length, d_model = x.size()
qkv = self.qkv_layer(x)
qkv = qkv.reshape(batch_size, sequence_length, self.num_heads, 3 * self.head_dim)
qkv = qkv.permute(0, 2, 1, 3)
q, k, v = qkv.chunk(3, dim=-1)
values, attention = scaled_dot_product(q, k, v, mask)
values = values.permute(0, 2, 1, 3).reshape(batch_size, sequence_length, self.num_heads * self.head_dim)
out = self.linear_layer(values)
return out
class LayerNormalization(nn.Module):
def __init__(self, parameters_shape, eps=1e-5):
super().__init__()
self.parameters_shape=parameters_shape
self.eps=eps
self.gamma = nn.Parameter(torch.ones(parameters_shape))
self.beta = nn.Parameter(torch.zeros(parameters_shape))
def forward(self, inputs):
dims = [-(i + 1) for i in range(len(self.parameters_shape))]
mean = inputs.mean(dim=dims, keepdim=True)
var = ((inputs - mean) ** 2).mean(dim=dims, keepdim=True)
std = (var + self.eps).sqrt()
y = (inputs - mean) / std
out = self.gamma * y + self.beta
return out
class PositionwiseFeedForward(nn.Module):
def __init__(self, d_model, hidden, drop_prob=0.1):
super(PositionwiseFeedForward, self).__init__()
self.linear1 = nn.Linear(d_model, hidden)
self.linear2 = nn.Linear(hidden, d_model)
self.relu = nn.ReLU()
self.dropout = nn.Dropout(p=drop_prob)
def forward(self, x):
x = self.linear1(x)
x = self.relu(x)
x = self.dropout(x)
x = self.linear2(x)
return x
class EncoderLayer(nn.Module):
def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
super(EncoderLayer, self).__init__()
self.attention = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
self.norm1 = LayerNormalization(parameters_shape=[d_model])
self.dropout1 = nn.Dropout(p=drop_prob)
self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
self.norm2 = LayerNormalization(parameters_shape=[d_model])
self.dropout2 = nn.Dropout(p=drop_prob)
def forward(self, x, self_attention_mask):
residual_x = x.clone()
x = self.attention(x, mask=self_attention_mask)
x = self.dropout1(x)
x = self.norm1(x + residual_x)
residual_x = x.clone()
x = self.ffn(x)
x = self.dropout2(x)
x = self.norm2(x + residual_x)
return x
class SequentialEncoder(nn.Sequential):
def forward(self, *inputs):
x, self_attention_mask = inputs
for module in self._modules.values():
x = module(x, self_attention_mask)
return x
class Encoder(nn.Module):
def __init__(self,
d_model,
ffn_hidden,
num_heads,
drop_prob,
num_layers,
max_sequence_length,
language_to_index,
START_TOKEN,
END_TOKEN,
PADDING_TOKEN):
super().__init__()
self.sentence_embedding = SentenceEmbedding(max_sequence_length, d_model, language_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN)
self.layers = SequentialEncoder(*[EncoderLayer(d_model, ffn_hidden, num_heads, drop_prob)
for _ in range(num_layers)])
def forward(self, x, self_attention_mask, start_token, end_token):
x = self.sentence_embedding(x, start_token, end_token)
x = self.layers(x, self_attention_mask)
return x
class MultiHeadCrossAttention(nn.Module):
def __init__(self, d_model, num_heads):
super().__init__()
self.d_model = d_model
self.num_heads = num_heads
self.head_dim = d_model // num_heads
self.kv_layer = nn.Linear(d_model , 2 * d_model)
self.q_layer = nn.Linear(d_model , d_model)
self.linear_layer = nn.Linear(d_model, d_model)
def forward(self, x, y, mask):
batch_size, sequence_length, d_model = x.size()
kv = self.kv_layer(x)
q = self.q_layer(y)
kv = kv.reshape(batch_size, sequence_length, self.num_heads, 2 * self.head_dim)
q = q.reshape(batch_size, sequence_length, self.num_heads, self.head_dim)
kv = kv.permute(0, 2, 1, 3)
q = q.permute(0, 2, 1, 3)
k, v = kv.chunk(2, dim=-1)
values, attention = scaled_dot_product(q, k, v, mask)
values = values.permute(0, 2, 1, 3).reshape(batch_size, sequence_length, d_model)
out = self.linear_layer(values)
return out
class DecoderLayer(nn.Module):
def __init__(self, d_model, ffn_hidden, num_heads, drop_prob):
super(DecoderLayer, self).__init__()
self.self_attention = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
self.layer_norm1 = LayerNormalization(parameters_shape=[d_model])
self.dropout1 = nn.Dropout(p=drop_prob)
self.encoder_decoder_attention = MultiHeadCrossAttention(d_model=d_model, num_heads=num_heads)
self.layer_norm2 = LayerNormalization(parameters_shape=[d_model])
self.dropout2 = nn.Dropout(p=drop_prob)
self.ffn = PositionwiseFeedForward(d_model=d_model, hidden=ffn_hidden, drop_prob=drop_prob)
self.layer_norm3 = LayerNormalization(parameters_shape=[d_model])
self.dropout3 = nn.Dropout(p=drop_prob)
def forward(self, x, y, self_attention_mask, cross_attention_mask):
_y = y.clone()
y = self.self_attention(y, mask=self_attention_mask)
y = self.dropout1(y)
y = self.layer_norm1(y + _y)
_y = y.clone()
y = self.encoder_decoder_attention(x, y, mask=cross_attention_mask)
y = self.dropout2(y)
y = self.layer_norm2(y + _y)
_y = y.clone()
y = self.ffn(y)
y = self.dropout3(y)
y = self.layer_norm3(y + _y)
return y
class SequentialDecoder(nn.Sequential):
def forward(self, *inputs):
x, y, self_attention_mask, cross_attention_mask = inputs
for module in self._modules.values():
y = module(x, y, self_attention_mask, cross_attention_mask)
return y
class Decoder(nn.Module):
def __init__(self,
d_model,
ffn_hidden,
num_heads,
drop_prob,
num_layers,
max_sequence_length,
language_to_index,
START_TOKEN,
END_TOKEN,
PADDING_TOKEN):
super().__init__()
self.sentence_embedding = SentenceEmbedding(max_sequence_length, d_model, language_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN)
self.layers = SequentialDecoder(*[DecoderLayer(d_model, ffn_hidden, num_heads, drop_prob) for _ in range(num_layers)])
def forward(self, x, y, self_attention_mask, cross_attention_mask, start_token, end_token):
y = self.sentence_embedding(y, start_token, end_token)
y = self.layers(x, y, self_attention_mask, cross_attention_mask)
return y
class Transformer(nn.Module):
def __init__(self,
d_model,
ffn_hidden,
num_heads,
drop_prob,
num_layers,
max_sequence_length,
kn_vocab_size,
english_to_index,
kannada_to_index,
START_TOKEN,
END_TOKEN,
PADDING_TOKEN
):
super().__init__()
self.encoder = Encoder(d_model, ffn_hidden, num_heads, drop_prob, num_layers, max_sequence_length, english_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN)
self.decoder = Decoder(d_model, ffn_hidden, num_heads, drop_prob, num_layers, max_sequence_length, kannada_to_index, START_TOKEN, END_TOKEN, PADDING_TOKEN)
self.linear = nn.Linear(d_model, kn_vocab_size)
self.device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
def forward(self,
x,
y,
encoder_self_attention_mask=None,
decoder_self_attention_mask=None,
decoder_cross_attention_mask=None,
enc_start_token=False,
enc_end_token=False,
dec_start_token=False,
dec_end_token=False):
x = self.encoder(x, encoder_self_attention_mask, start_token=enc_start_token, end_token=enc_end_token)
out = self.decoder(x, y, decoder_self_attention_mask, decoder_cross_attention_mask, start_token=dec_start_token, end_token=dec_end_token)
out = self.linear(out)
return out