import torch import torch.nn as nn import torch.nn.functional as F # Hyperparameters d_model = 512 # Dimension of the embeddings and the token representations # batch_size = 32 # Batch size for training num_heads = 8 # Number of heads in multi-head attention dim_feedforward = 2048 # Dimension of feedforward network in encoder and decoder vocab_size = 25672 seq_length = 10 # Length of the input and output sequences # To generate text, we need to define several components: # 1. Token Embedding: To convert token indices to vectors # 2. Positional Encoding: To give the model information about the order of tokens # 3. Transformer Block: Consisting of an encoder and a decoder # 4. Output Layer: To convert the transformer output to token probabilities # Token Embedding class TokenEmbedding(nn.Module): def __init__(self, vocab_size, d_model): super().__init__() self.embedding = nn.Embedding(vocab_size, d_model) def forward(self, tokens): return self.embedding(tokens) # max_len=5000 # pe = torch.zeros(max_len, d_model) # position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) # div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-torch.log(torch.tensor(10000.0)) / d_model)) # pe[:, 0::2] = torch.sin(position * div_term) # pe[:, 1::2] = torch.cos(position * div_term) # pe = pe.unsqueeze(0).transpose(0, 1) # pe = pe[:max_len] # Positional Encoding class PositionalEncoding(nn.Module): def __init__(self, d_model, max_len=5000): super().__init__() self.max_len = max_len self.d_model = d_model self.register_buffer('pe', self.generate_positional_encoding()) def generate_positional_encoding(self): # Create a long enough 'pe' for 'max_len' pe = torch.zeros(self.max_len, self.d_model) position = torch.arange(0, self.max_len, dtype=torch.float).unsqueeze(1) div_term = torch.exp(torch.arange(0, self.d_model, 2).float() * (-torch.log(torch.tensor(10000.0)) / self.d_model)) pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) pe = pe.unsqueeze(0).transpose(0, 1) return pe[:self.max_len] # Trim to match the desired max_len def forward(self, x): # Add positional encoding to the input tensor # print("x: " + str(x.size())) # print("pe: " + str(pe.size())) # x is of shape [batch size, seq length, d_model] # Select positional encoding up to the sequence length of x pe = self.pe[:x.size(0), :] # pe is now of shape [seq length, d_model] #pez = pe[:src.size(0), :] # pe is now of shape [seq length, d_model] # Add a batch dimension to pe to make it [1, seq length, d_model] # Then use expand to match the batch size of x # pe = pe.unsqueeze(0).expand(x.size(0), -1, -1) # pe is now of shape [batch size, seq length, d_model] # pe = pe.unsqueeze(0).expand(src.size(0), -1, -1) # Broadcast the positional encoding to the entire batch # Broadcasting automatically adjusts the dimensions to match x x = x + pe #x = src + pez return x # Transformer Block class TransformerBlock(nn.Module): def __init__(self, d_model, num_heads, dim_feedforward, dropout=0.1): super().__init__() # Encoder Layer encoder_layer = nn.TransformerEncoderLayer( d_model=d_model, nhead=num_heads, dim_feedforward=dim_feedforward, dropout=dropout ) self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=10) # Decoder Layer decoder_layer = nn.TransformerDecoderLayer( d_model=d_model, nhead=num_heads, dim_feedforward=dim_feedforward, dropout=dropout ) self.decoder = nn.TransformerDecoder(decoder_layer, num_layers=10) def forward(self, src, tgt, src_mask, tgt_mask, src_key_padding_mask, tgt_key_padding_mask): # Forward pass through the encoder and decoder memory = self.encoder(src, mask=src_mask, src_key_padding_mask=src_key_padding_mask) #memory = encoder(src, mask=src_mask, src_key_padding_mask=src_padding_mask) # >>> src.size() # torch.Size([32, 10, 512]) # >>> src_mask.size() # torch.Size([10, 10]) # >>> src_padding_mask.size() # torch.Size([10, 32]) output = self.decoder(tgt, memory, tgt_mask=tgt_mask, memory_mask=None, tgt_key_padding_mask=tgt_key_padding_mask, memory_key_padding_mask=src_key_padding_mask) # output = decoder(tgt, memory, tgt_mask=tgt_mask, memory_mask=None, # tgt_key_padding_mask=tgt_padding_mask, # memory_key_padding_mask=src_padding_mask) return output # Output Layer class OutputLayer(nn.Module): def __init__(self, d_model, vocab_size): super().__init__() self.linear = nn.Linear(d_model, vocab_size) def forward(self, x): return F.log_softmax(self.linear(x), dim=-1) # src_batch, tgt_input, # src_mask, tgt_mask, # src_padding_mask, tgt_padding_mask # src = embedding(src_batch) * torch.sqrt(torch.tensor(d_model)) # tgt = embedding(tgt_input) * torch.sqrt(torch.tensor(d_model)) # src = pos_encoder(src) # tgt = pos_encoder(tgt) # transformer_output = transformer_block(src, tgt, src_mask, tgt_mask, src_padding_mask, tgt_padding_mask) # out = output_layer(transformer_output) # Putting it all together class TransformerModel(nn.Module): def __init__(self, vocab_size, d_model, num_heads, dim_feedforward, max_seq_length): super().__init__() self.embedding = TokenEmbedding(vocab_size, d_model) self.pos_encoder = PositionalEncoding(d_model, max_seq_length) #pos_encoder = PositionalEncoding(d_model, seq_length) self.transformer_block = TransformerBlock(d_model, num_heads, dim_feedforward) self.output_layer = OutputLayer(d_model, vocab_size) def forward(self, src, tgt, src_mask, tgt_mask, src_key_padding_mask, tgt_key_padding_mask): src = self.embedding(src) * torch.sqrt(torch.tensor(d_model)) tgt = self.embedding(tgt) * torch.sqrt(torch.tensor(d_model)) src = self.pos_encoder(src) tgt = self.pos_encoder(tgt) # src = self.pos_encoder(src.transpose(0, 1)).transpose(0, 1) # tgt = self.pos_encoder(tgt.transpose(0, 1)).transpose(0, 1) # src = pos_encoder(src.transpose(0, 1)) # tgt = pos_encoder(tgt.transpose(0, 1)) transformer_output = self.transformer_block(src.transpose(0, 1), tgt.transpose(0, 1), src_mask, tgt_mask, src_key_padding_mask, tgt_key_padding_mask) #transformer_output = transformer_block(src.transpose(0, 1), tgt.transpose(0, 1), src_mask, tgt_mask, src_padding_mask, tgt_padding_mask) out = self.output_layer(transformer_output) return out # # Create a sample source and target batch # src = torch.randint(low=0, high=vocab_size, size=(seq_length, batch_size)) # tgt = torch.randint(low=0, high=vocab_size, size=(seq_length, batch_size)) # # Masks and Padding # src_mask = torch.zeros((seq_length, seq_length)).type(torch.bool) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # tgt_mask = nn.Transformer.generate_square_subsequent_mask(seq_length, device=device) # #tgt_mask = nn.Transformer.generate_square_subsequent_mask(None, seq_length) # src_key_padding_mask = torch.zeros(batch_size, seq_length).type(torch.bool) # Assuming no padding # tgt_key_padding_mask = torch.zeros(batch_size, seq_length).type(torch.bool) # Assuming no padding # Initialize the model transformer_model = TransformerModel(vocab_size, d_model, num_heads, dim_feedforward, seq_length) # Forward pass #output = transformer_model(src, tgt, src_mask, tgt_mask, src_key_padding_mask, tgt_key_padding_mask) # Show the output size #print(output.size()) # (seq_length, batch_size, vocab_size) # In this code, we define a simple Transformer model for educational purposes. # It can be expanded by increasing the number of layers, adding dropout, and including more complex masking. # The actual training loop, loss calculation, and optimization steps are not shown.