import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
# Hyperparameters | |
d_model = 512 # Dimension of the embeddings and the token representations | |
# batch_size = 32 # Batch size for training | |
num_heads = 8 # Number of heads in multi-head attention | |
dim_feedforward = 2048 # Dimension of feedforward network in encoder and decoder | |
vocab_size = 25672 | |
seq_length = 10 # Length of the input and output sequences | |
# To generate text, we need to define several components: | |
# 1. Token Embedding: To convert token indices to vectors | |
# 2. Positional Encoding: To give the model information about the order of tokens | |
# 3. Transformer Block: Consisting of an encoder and a decoder | |
# 4. Output Layer: To convert the transformer output to token probabilities | |
# Token Embedding | |
class TokenEmbedding(nn.Module): | |
def __init__(self, vocab_size, d_model): | |
super().__init__() | |
self.embedding = nn.Embedding(vocab_size, d_model) | |
def forward(self, tokens): | |
return self.embedding(tokens) | |
# max_len=5000 | |
# pe = torch.zeros(max_len, d_model) | |
# position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) | |
# div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-torch.log(torch.tensor(10000.0)) / d_model)) | |
# pe[:, 0::2] = torch.sin(position * div_term) | |
# pe[:, 1::2] = torch.cos(position * div_term) | |
# pe = pe.unsqueeze(0).transpose(0, 1) | |
# pe = pe[:max_len] | |
# Positional Encoding | |
class PositionalEncoding(nn.Module): | |
def __init__(self, d_model, max_len=5000): | |
super().__init__() | |
self.max_len = max_len | |
self.d_model = d_model | |
self.register_buffer('pe', self.generate_positional_encoding()) | |
def generate_positional_encoding(self): | |
# Create a long enough 'pe' for 'max_len' | |
pe = torch.zeros(self.max_len, self.d_model) | |
position = torch.arange(0, self.max_len, dtype=torch.float).unsqueeze(1) | |
div_term = torch.exp(torch.arange(0, self.d_model, 2).float() * (-torch.log(torch.tensor(10000.0)) / self.d_model)) | |
pe[:, 0::2] = torch.sin(position * div_term) | |
pe[:, 1::2] = torch.cos(position * div_term) | |
pe = pe.unsqueeze(0).transpose(0, 1) | |
return pe[:self.max_len] # Trim to match the desired max_len | |
def forward(self, x): | |
# Add positional encoding to the input tensor | |
# print("x: " + str(x.size())) | |
# print("pe: " + str(pe.size())) | |
# x is of shape [batch size, seq length, d_model] | |
# Select positional encoding up to the sequence length of x | |
pe = self.pe[:x.size(0), :] # pe is now of shape [seq length, d_model] | |
#pez = pe[:src.size(0), :] # pe is now of shape [seq length, d_model] | |
# Add a batch dimension to pe to make it [1, seq length, d_model] | |
# Then use expand to match the batch size of x | |
# pe = pe.unsqueeze(0).expand(x.size(0), -1, -1) # pe is now of shape [batch size, seq length, d_model] | |
# pe = pe.unsqueeze(0).expand(src.size(0), -1, -1) | |
# Broadcast the positional encoding to the entire batch | |
# Broadcasting automatically adjusts the dimensions to match x | |
x = x + pe | |
#x = src + pez | |
return x | |
# Transformer Block | |
class TransformerBlock(nn.Module): | |
def __init__(self, d_model, num_heads, dim_feedforward, dropout=0.1): | |
super().__init__() | |
# Encoder Layer | |
encoder_layer = nn.TransformerEncoderLayer( | |
d_model=d_model, | |
nhead=num_heads, | |
dim_feedforward=dim_feedforward, | |
dropout=dropout | |
) | |
self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=10) | |
# Decoder Layer | |
decoder_layer = nn.TransformerDecoderLayer( | |
d_model=d_model, | |
nhead=num_heads, | |
dim_feedforward=dim_feedforward, | |
dropout=dropout | |
) | |
self.decoder = nn.TransformerDecoder(decoder_layer, num_layers=10) | |
def forward(self, src, tgt, src_mask, tgt_mask, src_key_padding_mask, tgt_key_padding_mask): | |
# Forward pass through the encoder and decoder | |
memory = self.encoder(src, mask=src_mask, src_key_padding_mask=src_key_padding_mask) | |
#memory = encoder(src, mask=src_mask, src_key_padding_mask=src_padding_mask) | |
# >>> src.size() | |
# torch.Size([32, 10, 512]) | |
# >>> src_mask.size() | |
# torch.Size([10, 10]) | |
# >>> src_padding_mask.size() | |
# torch.Size([10, 32]) | |
output = self.decoder(tgt, memory, tgt_mask=tgt_mask, memory_mask=None, | |
tgt_key_padding_mask=tgt_key_padding_mask, | |
memory_key_padding_mask=src_key_padding_mask) | |
# output = decoder(tgt, memory, tgt_mask=tgt_mask, memory_mask=None, | |
# tgt_key_padding_mask=tgt_padding_mask, | |
# memory_key_padding_mask=src_padding_mask) | |
return output | |
# Output Layer | |
class OutputLayer(nn.Module): | |
def __init__(self, d_model, vocab_size): | |
super().__init__() | |
self.linear = nn.Linear(d_model, vocab_size) | |
def forward(self, x): | |
return F.log_softmax(self.linear(x), dim=-1) | |
# src_batch, tgt_input, | |
# src_mask, tgt_mask, | |
# src_padding_mask, tgt_padding_mask | |
# src = embedding(src_batch) * torch.sqrt(torch.tensor(d_model)) | |
# tgt = embedding(tgt_input) * torch.sqrt(torch.tensor(d_model)) | |
# src = pos_encoder(src) | |
# tgt = pos_encoder(tgt) | |
# transformer_output = transformer_block(src, tgt, src_mask, tgt_mask, src_padding_mask, tgt_padding_mask) | |
# out = output_layer(transformer_output) | |
# Putting it all together | |
class TransformerModel(nn.Module): | |
def __init__(self, vocab_size, d_model, num_heads, dim_feedforward, max_seq_length): | |
super().__init__() | |
self.embedding = TokenEmbedding(vocab_size, d_model) | |
self.pos_encoder = PositionalEncoding(d_model, max_seq_length) | |
#pos_encoder = PositionalEncoding(d_model, seq_length) | |
self.transformer_block = TransformerBlock(d_model, num_heads, dim_feedforward) | |
self.output_layer = OutputLayer(d_model, vocab_size) | |
def forward(self, src, tgt, src_mask, tgt_mask, src_key_padding_mask, tgt_key_padding_mask): | |
src = self.embedding(src) * torch.sqrt(torch.tensor(d_model)) | |
tgt = self.embedding(tgt) * torch.sqrt(torch.tensor(d_model)) | |
src = self.pos_encoder(src) | |
tgt = self.pos_encoder(tgt) | |
# src = self.pos_encoder(src.transpose(0, 1)).transpose(0, 1) | |
# tgt = self.pos_encoder(tgt.transpose(0, 1)).transpose(0, 1) | |
# src = pos_encoder(src.transpose(0, 1)) | |
# tgt = pos_encoder(tgt.transpose(0, 1)) | |
transformer_output = self.transformer_block(src.transpose(0, 1), tgt.transpose(0, 1), src_mask, tgt_mask, src_key_padding_mask, tgt_key_padding_mask) | |
#transformer_output = transformer_block(src.transpose(0, 1), tgt.transpose(0, 1), src_mask, tgt_mask, src_padding_mask, tgt_padding_mask) | |
out = self.output_layer(transformer_output) | |
return out | |
# # Create a sample source and target batch | |
# src = torch.randint(low=0, high=vocab_size, size=(seq_length, batch_size)) | |
# tgt = torch.randint(low=0, high=vocab_size, size=(seq_length, batch_size)) | |
# # Masks and Padding | |
# src_mask = torch.zeros((seq_length, seq_length)).type(torch.bool) | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
# tgt_mask = nn.Transformer.generate_square_subsequent_mask(seq_length, device=device) | |
# #tgt_mask = nn.Transformer.generate_square_subsequent_mask(None, seq_length) | |
# src_key_padding_mask = torch.zeros(batch_size, seq_length).type(torch.bool) # Assuming no padding | |
# tgt_key_padding_mask = torch.zeros(batch_size, seq_length).type(torch.bool) # Assuming no padding | |
# Initialize the model | |
transformer_model = TransformerModel(vocab_size, d_model, num_heads, dim_feedforward, seq_length) | |
# Forward pass | |
#output = transformer_model(src, tgt, src_mask, tgt_mask, src_key_padding_mask, tgt_key_padding_mask) | |
# Show the output size | |
#print(output.size()) # (seq_length, batch_size, vocab_size) | |
# In this code, we define a simple Transformer model for educational purposes. | |
# It can be expanded by increasing the number of layers, adding dropout, and including more complex masking. | |
# The actual training loop, loss calculation, and optimization steps are not shown. | |