|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"""
|
|
GPT-style Language Model Architecture
|
|
|
|
This module implements a standard GPT (Generative Pre-trained Transformer) architecture
|
|
using pure PyTorch. The model is a decoder-only transformer designed for autoregressive
|
|
language modeling (next-token prediction).
|
|
|
|
ARCHITECTURE OVERVIEW:
|
|
- Token Embedding: Maps token IDs to dense vectors
|
|
- Positional Embedding: Adds position information to token embeddings
|
|
- Transformer Blocks: Stack of multi-head attention + feed-forward layers
|
|
- Layer Normalization: Pre-norm placement for training stability
|
|
- Output Head: Linear projection to vocabulary for next-token prediction
|
|
|
|
FEATURES:
|
|
- Configurable model size (small/medium/large)
|
|
- Dropout for regularization
|
|
- Causal (autoregressive) attention masking
|
|
- Compatible with our SentencePiece tokenizer
|
|
- Memory-efficient implementation for training on limited hardware
|
|
|
|
Usage:
|
|
from model import GPTConfig, GPTModel
|
|
|
|
config = GPTConfig(vocab_size=32000, n_layer=12, n_head=12, n_embd=768)
|
|
model = GPTModel(config)
|
|
|
|
# Forward pass
|
|
logits = model(input_ids) # Shape: (batch_size, seq_len, vocab_size)
|
|
|
|
Hardware Requirements:
|
|
- Small Model (25M params): 4-8GB RAM, CPU/integrated GPU
|
|
- Medium Model (117M params): 8-16GB RAM, dedicated GPU recommended
|
|
- Large Model (350M params): 16GB+ RAM, high-end GPU required
|
|
|
|
Author: Louis Chua Bean Chong
|
|
License: GPLv3
|
|
"""
|
|
|
|
import math
|
|
from dataclasses import dataclass
|
|
from typing import Optional, Tuple
|
|
|
|
import torch
|
|
import torch.nn as nn
|
|
import torch.nn.functional as F
|
|
|
|
|
|
@dataclass
|
|
class GPTConfig:
|
|
"""
|
|
Configuration class for GPT model hyperparameters.
|
|
|
|
This class defines all the architectural parameters needed to instantiate
|
|
a GPT model. Use the provided class methods to get pre-configured setups
|
|
for different model sizes.
|
|
"""
|
|
|
|
|
|
vocab_size: int = 32000
|
|
n_layer: int = 12
|
|
n_head: int = 12
|
|
n_embd: int = 768
|
|
|
|
|
|
block_size: int = 1024
|
|
|
|
|
|
dropout: float = 0.1
|
|
bias: bool = True
|
|
|
|
|
|
model_name: str = "gpt-medium"
|
|
|
|
@classmethod
|
|
def small(cls) -> "GPTConfig":
|
|
"""Small model configuration (~25M parameters) - Good for CPU training"""
|
|
return cls(
|
|
vocab_size=32000,
|
|
n_layer=6,
|
|
n_head=8,
|
|
n_embd=512,
|
|
block_size=1024,
|
|
dropout=0.1,
|
|
model_name="gpt-small",
|
|
)
|
|
|
|
@classmethod
|
|
def medium(cls) -> "GPTConfig":
|
|
"""Medium model configuration (~117M parameters) - Balanced performance"""
|
|
return cls(
|
|
vocab_size=32000,
|
|
n_layer=12,
|
|
n_head=12,
|
|
n_embd=768,
|
|
block_size=2048,
|
|
dropout=0.1,
|
|
model_name="gpt-medium",
|
|
)
|
|
|
|
@classmethod
|
|
def large(cls) -> "GPTConfig":
|
|
"""Large model configuration (~350M parameters) - High performance"""
|
|
return cls(
|
|
vocab_size=32000,
|
|
n_layer=24,
|
|
n_head=16,
|
|
n_embd=1024,
|
|
block_size=2048,
|
|
dropout=0.1,
|
|
model_name="gpt-large",
|
|
)
|
|
|
|
def estimate_parameters(self) -> int:
|
|
"""
|
|
Estimate the total number of trainable parameters.
|
|
|
|
Returns:
|
|
int: Estimated parameter count
|
|
"""
|
|
|
|
token_emb = self.vocab_size * self.n_embd
|
|
|
|
|
|
pos_emb = self.block_size * self.n_embd
|
|
|
|
|
|
|
|
layer_params = self.n_layer * (12 * self.n_embd**2 + 4 * self.n_embd)
|
|
|
|
|
|
output_head = self.vocab_size * self.n_embd
|
|
|
|
total = token_emb + pos_emb + layer_params + output_head
|
|
return total
|
|
|
|
|
|
class CausalSelfAttention(nn.Module):
|
|
"""
|
|
Multi-head causal self-attention mechanism.
|
|
|
|
This implements the core attention mechanism of the transformer, with causal
|
|
masking to ensure autoregressive behavior (tokens can only attend to previous
|
|
tokens, not future ones).
|
|
"""
|
|
|
|
def __init__(self, config: GPTConfig):
|
|
super().__init__()
|
|
assert (
|
|
config.n_embd % config.n_head == 0
|
|
), "Embedding dim must be divisible by number of heads"
|
|
|
|
self.config = config
|
|
self.n_head = config.n_head
|
|
self.n_embd = config.n_embd
|
|
self.head_dim = self.n_embd // self.n_head
|
|
|
|
|
|
self.c_attn = nn.Linear(config.n_embd, 3 * config.n_embd, bias=config.bias)
|
|
|
|
|
|
self.c_proj = nn.Linear(config.n_embd, config.n_embd, bias=config.bias)
|
|
|
|
|
|
self.attn_dropout = nn.Dropout(config.dropout)
|
|
self.resid_dropout = nn.Dropout(config.dropout)
|
|
|
|
|
|
self.register_buffer(
|
|
"bias",
|
|
torch.tril(torch.ones(config.block_size, config.block_size)).view(
|
|
1, 1, config.block_size, config.block_size
|
|
),
|
|
)
|
|
|
|
def forward(self, x: torch.Tensor) -> torch.Tensor:
|
|
"""
|
|
Forward pass of causal self-attention.
|
|
|
|
This method implements the scaled dot-product attention mechanism with causal masking.
|
|
The attention mechanism allows each token to attend to all previous tokens in the sequence,
|
|
but not to future tokens, maintaining the autoregressive property essential for language modeling.
|
|
|
|
Mathematical formulation:
|
|
Attention(Q, K, V) = softmax(QK^T / sqrt(d_k))V
|
|
where Q, K, V are query, key, value matrices derived from input x
|
|
|
|
Implementation details:
|
|
- Uses batch matrix multiplication for efficiency
|
|
- Applies causal mask to prevent future token attention
|
|
- Implements multi-head attention by reshaping and parallel processing
|
|
- Applies dropout for regularization during training
|
|
|
|
Args:
|
|
x: Input tensor of shape (batch_size, seq_len, n_embd)
|
|
Contains embedded token representations from previous layer
|
|
|
|
Returns:
|
|
torch.Tensor: Output tensor of shape (batch_size, seq_len, n_embd)
|
|
"""
|
|
|
|
|
|
|
|
|
|
B, T, C = x.size()
|
|
|
|
|
|
|
|
|
|
|
|
q, k, v = self.c_attn(x).split(self.n_embd, dim=2)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
q = q.view(B, T, self.n_head, self.head_dim).transpose(1, 2)
|
|
k = k.view(B, T, self.n_head, self.head_dim).transpose(1, 2)
|
|
v = v.view(B, T, self.n_head, self.head_dim).transpose(1, 2)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(self.head_dim))
|
|
|
|
|
|
|
|
|
|
|
|
att = att.masked_fill(self.bias[:, :, :T, :T] == 0, float("-inf"))
|
|
|
|
|
|
|
|
|
|
att = F.softmax(att, dim=-1)
|
|
|
|
|
|
|
|
att = self.attn_dropout(att)
|
|
|
|
|
|
|
|
|
|
|
|
y = att @ v
|
|
|
|
|
|
|
|
|
|
|
|
y = y.transpose(1, 2).contiguous().view(B, T, C)
|
|
|
|
|
|
|
|
|
|
y = self.resid_dropout(self.c_proj(y))
|
|
return y
|
|
|
|
|
|
class MLP(nn.Module):
|
|
"""
|
|
Multi-Layer Perceptron (Feed-Forward Network) for Transformer.
|
|
|
|
This implements the position-wise feed-forward network that appears in each transformer layer.
|
|
The MLP provides additional non-linear transformation capacity beyond what attention provides.
|
|
|
|
Architecture:
|
|
Input -> Linear(n_embd -> 4*n_embd) -> GELU -> Linear(4*n_embd -> n_embd) -> Dropout -> Output
|
|
|
|
Design rationale:
|
|
- 4x expansion is standard in transformers (from "Attention Is All You Need")
|
|
- GELU activation provides smoother gradients than ReLU for language modeling
|
|
- Dropout prevents overfitting in the feed-forward layers
|
|
- Two linear layers allow complex non-linear transformations of attention outputs
|
|
|
|
Parameters:
|
|
- First linear layer: n_embd * 4*n_embd parameters (expansion)
|
|
- Second linear layer: 4*n_embd * n_embd parameters (projection back)
|
|
- Total: 8 * n_embd^2 parameters (significant portion of model size)
|
|
"""
|
|
|
|
def __init__(self, config: GPTConfig):
|
|
super().__init__()
|
|
|
|
|
|
|
|
|
|
self.c_fc = nn.Linear(config.n_embd, 4 * config.n_embd, bias=config.bias)
|
|
|
|
|
|
|
|
|
|
self.gelu = nn.GELU()
|
|
|
|
|
|
|
|
self.c_proj = nn.Linear(4 * config.n_embd, config.n_embd, bias=config.bias)
|
|
|
|
|
|
|
|
self.dropout = nn.Dropout(config.dropout)
|
|
|
|
def forward(self, x: torch.Tensor) -> torch.Tensor:
|
|
"""
|
|
Forward pass of the feed-forward network.
|
|
|
|
This method applies a two-layer MLP with GELU activation to transform
|
|
the attention outputs. The MLP operates independently on each position
|
|
in the sequence, providing position-wise non-linear transformations.
|
|
|
|
Mathematical operation:
|
|
MLP(x) = Dropout(Linear₂(GELU(Linear₁(x))))
|
|
where Linear₁: R^n_embd -> R^4*n_embd and Linear₂: R^4*n_embd -> R^n_embd
|
|
|
|
Args:
|
|
x: Input tensor of shape (batch_size, seq_len, n_embd)
|
|
Contains attended representations from the attention layer
|
|
|
|
Returns:
|
|
torch.Tensor: Output tensor of shape (batch_size, seq_len, n_embd)
|
|
Contains transformed representations ready for residual connection
|
|
"""
|
|
|
|
|
|
|
|
x = self.c_fc(x)
|
|
|
|
|
|
|
|
|
|
x = self.gelu(x)
|
|
|
|
|
|
|
|
|
|
x = self.c_proj(x)
|
|
|
|
|
|
|
|
|
|
x = self.dropout(x)
|
|
|
|
return x
|
|
|
|
|
|
class Block(nn.Module):
|
|
"""
|
|
Single Transformer block.
|
|
|
|
Consists of:
|
|
1. Layer normalization
|
|
2. Multi-head causal self-attention
|
|
3. Residual connection
|
|
4. Layer normalization
|
|
5. MLP (feed-forward network)
|
|
6. Residual connection
|
|
|
|
Uses pre-norm architecture for better training stability.
|
|
"""
|
|
|
|
def __init__(self, config: GPTConfig):
|
|
super().__init__()
|
|
self.ln_1 = nn.LayerNorm(config.n_embd)
|
|
self.attn = CausalSelfAttention(config)
|
|
self.ln_2 = nn.LayerNorm(config.n_embd)
|
|
self.mlp = MLP(config)
|
|
|
|
def forward(self, x: torch.Tensor) -> torch.Tensor:
|
|
"""
|
|
Forward pass of transformer block.
|
|
|
|
Args:
|
|
x: Input tensor of shape (batch_size, seq_len, n_embd)
|
|
|
|
Returns:
|
|
torch.Tensor: Output tensor of shape (batch_size, seq_len, n_embd)
|
|
"""
|
|
|
|
x = x + self.attn(self.ln_1(x))
|
|
|
|
|
|
x = x + self.mlp(self.ln_2(x))
|
|
|
|
return x
|
|
|
|
|
|
class GPTModel(nn.Module):
|
|
"""
|
|
Complete GPT Language Model.
|
|
|
|
This is the main model class that combines all components:
|
|
- Token and positional embeddings
|
|
- Stack of transformer blocks
|
|
- Final layer normalization
|
|
- Language modeling head
|
|
|
|
The model can be used for:
|
|
- Training from scratch on text data
|
|
- Fine-tuning on downstream tasks
|
|
- Text generation (inference)
|
|
"""
|
|
|
|
def __init__(self, config: GPTConfig, use_checkpoint=True):
|
|
super().__init__()
|
|
assert config.vocab_size is not None, "vocab_size must be specified"
|
|
assert config.block_size is not None, "block_size must be specified"
|
|
|
|
self.config = config
|
|
self.use_checkpoint = use_checkpoint
|
|
|
|
|
|
self.transformer = nn.ModuleDict(
|
|
dict(
|
|
wte=nn.Embedding(config.vocab_size, config.n_embd),
|
|
wpe=nn.Embedding(config.block_size, config.n_embd),
|
|
drop=nn.Dropout(config.dropout),
|
|
h=nn.ModuleList(
|
|
[Block(config) for _ in range(config.n_layer)]
|
|
),
|
|
ln_f=nn.LayerNorm(config.n_embd),
|
|
)
|
|
)
|
|
|
|
|
|
self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False)
|
|
|
|
|
|
self.transformer.wte.weight = self.lm_head.weight
|
|
|
|
|
|
self.apply(self._init_weights)
|
|
|
|
|
|
print(f"Model initialized: {self.config.model_name}")
|
|
print(f"Parameters: {self.get_num_params():,}")
|
|
print(f"Estimated: {self.config.estimate_parameters():,}")
|
|
|
|
def _init_weights(self, module):
|
|
"""Initialize model weights using standard practices."""
|
|
if isinstance(module, nn.Linear):
|
|
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
|
|
if module.bias is not None:
|
|
torch.nn.init.zeros_(module.bias)
|
|
elif isinstance(module, nn.Embedding):
|
|
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
|
|
|
|
def get_num_params(self, non_embedding: bool = False) -> int:
|
|
"""
|
|
Count the number of parameters in the model.
|
|
|
|
Args:
|
|
non_embedding: If True, subtract embedding parameters
|
|
|
|
Returns:
|
|
int: Number of parameters
|
|
"""
|
|
n_params = sum(p.numel() for p in self.parameters())
|
|
if non_embedding:
|
|
n_params -= self.transformer.wpe.weight.numel()
|
|
n_params -= self.transformer.wte.weight.numel()
|
|
return n_params
|
|
|
|
def forward(
|
|
self, idx: torch.Tensor, targets: Optional[torch.Tensor] = None
|
|
) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
|
|
"""
|
|
Forward pass of the GPT model.
|
|
|
|
Args:
|
|
idx: Input token indices of shape (batch_size, seq_len)
|
|
targets: Optional target tokens for loss calculation (batch_size, seq_len)
|
|
|
|
Returns:
|
|
Tuple containing:
|
|
- logits: Output logits of shape (batch_size, seq_len, vocab_size)
|
|
- loss: Cross-entropy loss if targets provided, None otherwise
|
|
"""
|
|
device = idx.device
|
|
b, t = idx.size()
|
|
assert (
|
|
t <= self.config.block_size
|
|
), f"Sequence length {t} exceeds block size {self.config.block_size}"
|
|
|
|
|
|
tok_emb = self.transformer.wte(idx)
|
|
|
|
|
|
pos = torch.arange(0, t, dtype=torch.long, device=device)
|
|
pos_emb = self.transformer.wpe(pos)
|
|
|
|
|
|
x = self.transformer.drop(tok_emb + pos_emb)
|
|
|
|
|
|
if self.use_checkpoint and self.training:
|
|
|
|
for block in self.transformer.h:
|
|
x = torch.utils.checkpoint.checkpoint(block, x)
|
|
else:
|
|
|
|
for block in self.transformer.h:
|
|
x = block(x)
|
|
|
|
|
|
x = self.transformer.ln_f(x)
|
|
|
|
|
|
|
|
logits = self.lm_head(x)
|
|
|
|
if targets is not None:
|
|
|
|
loss = F.cross_entropy(
|
|
logits.view(-1, logits.size(-1)), targets.view(-1), ignore_index=-1
|
|
)
|
|
else:
|
|
|
|
loss = None
|
|
|
|
return logits, loss
|
|
|
|
def generate(
|
|
self,
|
|
idx: torch.Tensor,
|
|
max_new_tokens: int = 100,
|
|
temperature: float = 1.0,
|
|
top_k: Optional[int] = None,
|
|
) -> torch.Tensor:
|
|
"""
|
|
Generate new tokens autoregressively.
|
|
|
|
Args:
|
|
idx: Starting token indices (batch_size, seq_len)
|
|
max_new_tokens: Maximum number of new tokens to generate
|
|
temperature: Sampling temperature (higher = more random)
|
|
top_k: If set, only sample from top-k most likely tokens
|
|
|
|
Returns:
|
|
torch.Tensor: Generated sequence (batch_size, seq_len + max_new_tokens)
|
|
"""
|
|
self.eval()
|
|
with torch.no_grad():
|
|
for _ in range(max_new_tokens):
|
|
|
|
idx_cond = (
|
|
idx
|
|
if idx.size(1) <= self.config.block_size
|
|
else idx[:, -self.config.block_size :]
|
|
)
|
|
|
|
|
|
logits, _ = self(idx_cond)
|
|
|
|
|
|
logits = logits[:, -1, :] / temperature
|
|
|
|
|
|
if top_k is not None:
|
|
v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
|
|
logits[logits < v[:, [-1]]] = -float("inf")
|
|
|
|
|
|
probs = F.softmax(logits, dim=-1)
|
|
idx_next = torch.multinomial(probs, num_samples=1)
|
|
|
|
|
|
idx = torch.cat((idx, idx_next), dim=1)
|
|
|
|
self.train()
|
|
return idx
|
|
|
|
def estimate_memory_usage(self, batch_size: int = 1, seq_len: int = None) -> dict:
|
|
"""
|
|
Estimate memory usage for training and inference.
|
|
|
|
Args:
|
|
batch_size: Batch size for estimation
|
|
seq_len: Sequence length (defaults to block_size)
|
|
|
|
Returns:
|
|
dict: Memory usage estimates in MB
|
|
"""
|
|
if seq_len is None:
|
|
seq_len = self.config.block_size
|
|
|
|
|
|
param_memory = self.get_num_params() * 4 / (1024**2)
|
|
|
|
|
|
activation_memory = (
|
|
batch_size * seq_len * self.config.n_embd * self.config.n_layer * 8
|
|
) / (1024**2)
|
|
|
|
|
|
gradient_memory = param_memory
|
|
|
|
return {
|
|
"parameters_mb": param_memory,
|
|
"activations_mb": activation_memory,
|
|
"gradients_mb": gradient_memory,
|
|
"total_training_mb": param_memory + activation_memory + gradient_memory,
|
|
"total_inference_mb": param_memory + activation_memory * 0.5,
|
|
}
|
|
|
|
|
|
def create_model(model_size: str = "medium") -> GPTModel:
|
|
"""
|
|
Factory function to create a GPT model with predefined configurations.
|
|
|
|
Args:
|
|
model_size: Size of model to create ("small", "medium", "large")
|
|
|
|
Returns:
|
|
GPTModel: Initialized model
|
|
"""
|
|
configs = {
|
|
"small": GPTConfig.small(),
|
|
"medium": GPTConfig.medium(),
|
|
"large": GPTConfig.large(),
|
|
}
|
|
|
|
if model_size not in configs:
|
|
raise ValueError(f"Unknown model size: {model_size}. Choose from {list(configs.keys())}")
|
|
|
|
config = configs[model_size]
|
|
model = GPTModel(config)
|
|
|
|
return model
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
print("🧠 GPT Model Architecture")
|
|
print("=" * 50)
|
|
|
|
|
|
for size in ["small", "medium", "large"]:
|
|
print(f"\n{size.upper()} MODEL:")
|
|
model = create_model(size)
|
|
|
|
|
|
memory = model.estimate_memory_usage(batch_size=4, seq_len=512)
|
|
print(
|
|
f"Memory (4 batch, 512 seq): {memory['total_training_mb']:.1f}MB training, {memory['total_inference_mb']:.1f}MB inference"
|
|
)
|
|
|
|
|
|
x = torch.randint(0, 32000, (2, 64))
|
|
with torch.no_grad():
|
|
logits, _ = model(x)
|
|
print(f"Test forward pass: {x.shape} -> {logits.shape} ✓")
|
|
|