|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""" |
|
GPT-style Language Model Architecture |
|
|
|
This module implements a standard GPT (Generative Pre-trained Transformer) architecture |
|
using pure PyTorch. The model is a decoder-only transformer designed for autoregressive |
|
language modeling (next-token prediction). |
|
|
|
ARCHITECTURE OVERVIEW: |
|
- Token Embedding: Maps token IDs to dense vectors |
|
- Positional Embedding: Adds position information to token embeddings |
|
- Transformer Blocks: Stack of multi-head attention + feed-forward layers |
|
- Layer Normalization: Pre-norm placement for training stability |
|
- Output Head: Linear projection to vocabulary for next-token prediction |
|
|
|
FEATURES: |
|
- Configurable model size (small/medium/large) |
|
- Dropout for regularization |
|
- Causal (autoregressive) attention masking |
|
- Compatible with our SentencePiece tokenizer |
|
- Memory-efficient implementation for training on limited hardware |
|
|
|
Usage: |
|
from model import GPTConfig, GPTModel |
|
|
|
config = GPTConfig(vocab_size=32000, n_layer=12, n_head=12, n_embd=768) |
|
model = GPTModel(config) |
|
|
|
# Forward pass |
|
logits = model(input_ids) # Shape: (batch_size, seq_len, vocab_size) |
|
|
|
Hardware Requirements: |
|
- Small Model (25M params): 4-8GB RAM, CPU/integrated GPU |
|
- Medium Model (117M params): 8-16GB RAM, dedicated GPU recommended |
|
- Large Model (350M params): 16GB+ RAM, high-end GPU required |
|
|
|
Author: Louis Chua Bean Chong |
|
License: GPLv3 |
|
""" |
|
|
|
import math |
|
from dataclasses import dataclass |
|
from typing import Optional, Tuple |
|
|
|
import torch |
|
import torch.nn as nn |
|
import torch.nn.functional as F |
|
|
|
|
|
@dataclass |
|
class GPTConfig: |
|
""" |
|
Configuration class for GPT model hyperparameters. |
|
|
|
This class defines all the architectural parameters needed to instantiate |
|
a GPT model. Use the provided class methods to get pre-configured setups |
|
for different model sizes. |
|
""" |
|
|
|
|
|
vocab_size: int = 32000 |
|
n_layer: int = 12 |
|
n_head: int = 12 |
|
n_embd: int = 768 |
|
|
|
|
|
block_size: int = 1024 |
|
|
|
|
|
dropout: float = 0.1 |
|
bias: bool = True |
|
|
|
|
|
model_name: str = "gpt-medium" |
|
|
|
@classmethod |
|
def small(cls) -> "GPTConfig": |
|
"""Small model configuration (~25M parameters) - Good for CPU training""" |
|
return cls( |
|
vocab_size=32000, |
|
n_layer=6, |
|
n_head=8, |
|
n_embd=512, |
|
block_size=1024, |
|
dropout=0.1, |
|
model_name="gpt-small", |
|
) |
|
|
|
@classmethod |
|
def medium(cls) -> "GPTConfig": |
|
"""Medium model configuration (~117M parameters) - Balanced performance""" |
|
return cls( |
|
vocab_size=32000, |
|
n_layer=12, |
|
n_head=12, |
|
n_embd=768, |
|
block_size=2048, |
|
dropout=0.1, |
|
model_name="gpt-medium", |
|
) |
|
|
|
@classmethod |
|
def large(cls) -> "GPTConfig": |
|
"""Large model configuration (~350M parameters) - High performance""" |
|
return cls( |
|
vocab_size=32000, |
|
n_layer=24, |
|
n_head=16, |
|
n_embd=1024, |
|
block_size=2048, |
|
dropout=0.1, |
|
model_name="gpt-large", |
|
) |
|
|
|
def estimate_parameters(self) -> int: |
|
""" |
|
Estimate the total number of trainable parameters. |
|
|
|
Returns: |
|
int: Estimated parameter count |
|
""" |
|
|
|
token_emb = self.vocab_size * self.n_embd |
|
|
|
|
|
pos_emb = self.block_size * self.n_embd |
|
|
|
|
|
|
|
layer_params = self.n_layer * (12 * self.n_embd**2 + 4 * self.n_embd) |
|
|
|
|
|
output_head = self.vocab_size * self.n_embd |
|
|
|
total = token_emb + pos_emb + layer_params + output_head |
|
return total |
|
|
|
|
|
class CausalSelfAttention(nn.Module): |
|
""" |
|
Multi-head causal self-attention mechanism. |
|
|
|
This implements the core attention mechanism of the transformer, with causal |
|
masking to ensure autoregressive behavior (tokens can only attend to previous |
|
tokens, not future ones). |
|
""" |
|
|
|
def __init__(self, config: GPTConfig): |
|
super().__init__() |
|
assert ( |
|
config.n_embd % config.n_head == 0 |
|
), "Embedding dim must be divisible by number of heads" |
|
|
|
self.config = config |
|
self.n_head = config.n_head |
|
self.n_embd = config.n_embd |
|
self.head_dim = self.n_embd // self.n_head |
|
|
|
|
|
self.c_attn = nn.Linear(config.n_embd, 3 * config.n_embd, bias=config.bias) |
|
|
|
|
|
self.c_proj = nn.Linear(config.n_embd, config.n_embd, bias=config.bias) |
|
|
|
|
|
self.attn_dropout = nn.Dropout(config.dropout) |
|
self.resid_dropout = nn.Dropout(config.dropout) |
|
|
|
|
|
self.register_buffer( |
|
"bias", |
|
torch.tril(torch.ones(config.block_size, config.block_size)).view( |
|
1, 1, config.block_size, config.block_size |
|
), |
|
) |
|
|
|
def forward(self, x: torch.Tensor) -> torch.Tensor: |
|
""" |
|
Forward pass of causal self-attention. |
|
|
|
This method implements the scaled dot-product attention mechanism with causal masking. |
|
The attention mechanism allows each token to attend to all previous tokens in the sequence, |
|
but not to future tokens, maintaining the autoregressive property essential for language modeling. |
|
|
|
Mathematical formulation: |
|
Attention(Q, K, V) = softmax(QK^T / sqrt(d_k))V |
|
where Q, K, V are query, key, value matrices derived from input x |
|
|
|
Implementation details: |
|
- Uses batch matrix multiplication for efficiency |
|
- Applies causal mask to prevent future token attention |
|
- Implements multi-head attention by reshaping and parallel processing |
|
- Applies dropout for regularization during training |
|
|
|
Args: |
|
x: Input tensor of shape (batch_size, seq_len, n_embd) |
|
Contains embedded token representations from previous layer |
|
|
|
Returns: |
|
torch.Tensor: Output tensor of shape (batch_size, seq_len, n_embd) |
|
""" |
|
|
|
|
|
|
|
|
|
B, T, C = x.size() |
|
|
|
|
|
|
|
|
|
|
|
q, k, v = self.c_attn(x).split(self.n_embd, dim=2) |
|
|
|
|
|
|
|
|
|
|
|
|
|
q = q.view(B, T, self.n_head, self.head_dim).transpose(1, 2) |
|
k = k.view(B, T, self.n_head, self.head_dim).transpose(1, 2) |
|
v = v.view(B, T, self.n_head, self.head_dim).transpose(1, 2) |
|
|
|
|
|
|
|
|
|
|
|
|
|
att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(self.head_dim)) |
|
|
|
|
|
|
|
|
|
|
|
att = att.masked_fill(self.bias[:, :, :T, :T] == 0, float("-inf")) |
|
|
|
|
|
|
|
|
|
att = F.softmax(att, dim=-1) |
|
|
|
|
|
|
|
att = self.attn_dropout(att) |
|
|
|
|
|
|
|
|
|
|
|
y = att @ v |
|
|
|
|
|
|
|
|
|
|
|
y = y.transpose(1, 2).contiguous().view(B, T, C) |
|
|
|
|
|
|
|
|
|
y = self.resid_dropout(self.c_proj(y)) |
|
return y |
|
|
|
|
|
class MLP(nn.Module): |
|
""" |
|
Multi-Layer Perceptron (Feed-Forward Network) for Transformer. |
|
|
|
This implements the position-wise feed-forward network that appears in each transformer layer. |
|
The MLP provides additional non-linear transformation capacity beyond what attention provides. |
|
|
|
Architecture: |
|
Input -> Linear(n_embd -> 4*n_embd) -> GELU -> Linear(4*n_embd -> n_embd) -> Dropout -> Output |
|
|
|
Design rationale: |
|
- 4x expansion is standard in transformers (from "Attention Is All You Need") |
|
- GELU activation provides smoother gradients than ReLU for language modeling |
|
- Dropout prevents overfitting in the feed-forward layers |
|
- Two linear layers allow complex non-linear transformations of attention outputs |
|
|
|
Parameters: |
|
- First linear layer: n_embd * 4*n_embd parameters (expansion) |
|
- Second linear layer: 4*n_embd * n_embd parameters (projection back) |
|
- Total: 8 * n_embd^2 parameters (significant portion of model size) |
|
""" |
|
|
|
def __init__(self, config: GPTConfig): |
|
super().__init__() |
|
|
|
|
|
|
|
|
|
self.c_fc = nn.Linear(config.n_embd, 4 * config.n_embd, bias=config.bias) |
|
|
|
|
|
|
|
|
|
self.gelu = nn.GELU() |
|
|
|
|
|
|
|
self.c_proj = nn.Linear(4 * config.n_embd, config.n_embd, bias=config.bias) |
|
|
|
|
|
|
|
self.dropout = nn.Dropout(config.dropout) |
|
|
|
def forward(self, x: torch.Tensor) -> torch.Tensor: |
|
""" |
|
Forward pass of the feed-forward network. |
|
|
|
This method applies a two-layer MLP with GELU activation to transform |
|
the attention outputs. The MLP operates independently on each position |
|
in the sequence, providing position-wise non-linear transformations. |
|
|
|
Mathematical operation: |
|
MLP(x) = Dropout(Linear₂(GELU(Linear₁(x)))) |
|
where Linear₁: R^n_embd -> R^4*n_embd and Linear₂: R^4*n_embd -> R^n_embd |
|
|
|
Args: |
|
x: Input tensor of shape (batch_size, seq_len, n_embd) |
|
Contains attended representations from the attention layer |
|
|
|
Returns: |
|
torch.Tensor: Output tensor of shape (batch_size, seq_len, n_embd) |
|
Contains transformed representations ready for residual connection |
|
""" |
|
|
|
|
|
|
|
x = self.c_fc(x) |
|
|
|
|
|
|
|
|
|
x = self.gelu(x) |
|
|
|
|
|
|
|
|
|
x = self.c_proj(x) |
|
|
|
|
|
|
|
|
|
x = self.dropout(x) |
|
|
|
return x |
|
|
|
|
|
class Block(nn.Module): |
|
""" |
|
Single Transformer block. |
|
|
|
Consists of: |
|
1. Layer normalization |
|
2. Multi-head causal self-attention |
|
3. Residual connection |
|
4. Layer normalization |
|
5. MLP (feed-forward network) |
|
6. Residual connection |
|
|
|
Uses pre-norm architecture for better training stability. |
|
""" |
|
|
|
def __init__(self, config: GPTConfig): |
|
super().__init__() |
|
self.ln_1 = nn.LayerNorm(config.n_embd) |
|
self.attn = CausalSelfAttention(config) |
|
self.ln_2 = nn.LayerNorm(config.n_embd) |
|
self.mlp = MLP(config) |
|
|
|
def forward(self, x: torch.Tensor) -> torch.Tensor: |
|
""" |
|
Forward pass of transformer block. |
|
|
|
Args: |
|
x: Input tensor of shape (batch_size, seq_len, n_embd) |
|
|
|
Returns: |
|
torch.Tensor: Output tensor of shape (batch_size, seq_len, n_embd) |
|
""" |
|
|
|
x = x + self.attn(self.ln_1(x)) |
|
|
|
|
|
x = x + self.mlp(self.ln_2(x)) |
|
|
|
return x |
|
|
|
|
|
class GPTModel(nn.Module): |
|
""" |
|
Complete GPT Language Model. |
|
|
|
This is the main model class that combines all components: |
|
- Token and positional embeddings |
|
- Stack of transformer blocks |
|
- Final layer normalization |
|
- Language modeling head |
|
|
|
The model can be used for: |
|
- Training from scratch on text data |
|
- Fine-tuning on downstream tasks |
|
- Text generation (inference) |
|
""" |
|
|
|
def __init__(self, config: GPTConfig, use_checkpoint=True): |
|
super().__init__() |
|
assert config.vocab_size is not None, "vocab_size must be specified" |
|
assert config.block_size is not None, "block_size must be specified" |
|
|
|
self.config = config |
|
self.use_checkpoint = use_checkpoint |
|
|
|
|
|
self.transformer = nn.ModuleDict( |
|
dict( |
|
wte=nn.Embedding(config.vocab_size, config.n_embd), |
|
wpe=nn.Embedding(config.block_size, config.n_embd), |
|
drop=nn.Dropout(config.dropout), |
|
h=nn.ModuleList( |
|
[Block(config) for _ in range(config.n_layer)] |
|
), |
|
ln_f=nn.LayerNorm(config.n_embd), |
|
) |
|
) |
|
|
|
|
|
self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False) |
|
|
|
|
|
self.transformer.wte.weight = self.lm_head.weight |
|
|
|
|
|
self.apply(self._init_weights) |
|
|
|
|
|
print(f"Model initialized: {self.config.model_name}") |
|
print(f"Parameters: {self.get_num_params():,}") |
|
print(f"Estimated: {self.config.estimate_parameters():,}") |
|
|
|
def _init_weights(self, module): |
|
"""Initialize model weights using standard practices.""" |
|
if isinstance(module, nn.Linear): |
|
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02) |
|
if module.bias is not None: |
|
torch.nn.init.zeros_(module.bias) |
|
elif isinstance(module, nn.Embedding): |
|
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02) |
|
|
|
def get_num_params(self, non_embedding: bool = False) -> int: |
|
""" |
|
Count the number of parameters in the model. |
|
|
|
Args: |
|
non_embedding: If True, subtract embedding parameters |
|
|
|
Returns: |
|
int: Number of parameters |
|
""" |
|
n_params = sum(p.numel() for p in self.parameters()) |
|
if non_embedding: |
|
n_params -= self.transformer.wpe.weight.numel() |
|
n_params -= self.transformer.wte.weight.numel() |
|
return n_params |
|
|
|
def forward( |
|
self, idx: torch.Tensor, targets: Optional[torch.Tensor] = None |
|
) -> Tuple[torch.Tensor, Optional[torch.Tensor]]: |
|
""" |
|
Forward pass of the GPT model. |
|
|
|
Args: |
|
idx: Input token indices of shape (batch_size, seq_len) |
|
targets: Optional target tokens for loss calculation (batch_size, seq_len) |
|
|
|
Returns: |
|
Tuple containing: |
|
- logits: Output logits of shape (batch_size, seq_len, vocab_size) |
|
- loss: Cross-entropy loss if targets provided, None otherwise |
|
""" |
|
device = idx.device |
|
b, t = idx.size() |
|
assert ( |
|
t <= self.config.block_size |
|
), f"Sequence length {t} exceeds block size {self.config.block_size}" |
|
|
|
|
|
tok_emb = self.transformer.wte(idx) |
|
|
|
|
|
pos = torch.arange(0, t, dtype=torch.long, device=device) |
|
pos_emb = self.transformer.wpe(pos) |
|
|
|
|
|
x = self.transformer.drop(tok_emb + pos_emb) |
|
|
|
|
|
if self.use_checkpoint and self.training: |
|
|
|
try: |
|
for block in self.transformer.h: |
|
x = torch.utils.checkpoint.checkpoint(block, x) |
|
except AttributeError: |
|
|
|
for block in self.transformer.h: |
|
x = block(x) |
|
else: |
|
|
|
for block in self.transformer.h: |
|
x = block(x) |
|
|
|
|
|
x = self.transformer.ln_f(x) |
|
|
|
|
|
|
|
logits = self.lm_head(x) |
|
|
|
if targets is not None: |
|
|
|
loss = F.cross_entropy( |
|
logits.view(-1, logits.size(-1)), targets.view(-1), ignore_index=-1 |
|
) |
|
else: |
|
|
|
loss = None |
|
|
|
return logits, loss |
|
|
|
def generate( |
|
self, |
|
idx: torch.Tensor, |
|
max_new_tokens: int = 100, |
|
temperature: float = 1.0, |
|
top_k: Optional[int] = None, |
|
) -> torch.Tensor: |
|
""" |
|
Generate new tokens autoregressively. |
|
|
|
Args: |
|
idx: Starting token indices (batch_size, seq_len) |
|
max_new_tokens: Maximum number of new tokens to generate |
|
temperature: Sampling temperature (higher = more random) |
|
top_k: If set, only sample from top-k most likely tokens |
|
|
|
Returns: |
|
torch.Tensor: Generated sequence (batch_size, seq_len + max_new_tokens) |
|
""" |
|
self.eval() |
|
with torch.no_grad(): |
|
for _ in range(max_new_tokens): |
|
|
|
idx_cond = ( |
|
idx |
|
if idx.size(1) <= self.config.block_size |
|
else idx[:, -self.config.block_size :] |
|
) |
|
|
|
|
|
logits, _ = self(idx_cond) |
|
|
|
|
|
logits = logits[:, -1, :] / temperature |
|
|
|
|
|
if top_k is not None: |
|
v, _ = torch.topk(logits, min(top_k, logits.size(-1))) |
|
logits[logits < v[:, [-1]]] = -float("inf") |
|
|
|
|
|
probs = F.softmax(logits, dim=-1) |
|
idx_next = torch.multinomial(probs, num_samples=1) |
|
|
|
|
|
idx = torch.cat((idx, idx_next), dim=1) |
|
|
|
self.train() |
|
return idx |
|
|
|
def estimate_memory_usage(self, batch_size: int = 1, seq_len: int = None) -> dict: |
|
""" |
|
Estimate memory usage for training and inference. |
|
|
|
Args: |
|
batch_size: Batch size for estimation |
|
seq_len: Sequence length (defaults to block_size) |
|
|
|
Returns: |
|
dict: Memory usage estimates in MB |
|
""" |
|
if seq_len is None: |
|
seq_len = self.config.block_size |
|
|
|
|
|
param_memory = self.get_num_params() * 4 / (1024**2) |
|
|
|
|
|
activation_memory = ( |
|
batch_size * seq_len * self.config.n_embd * self.config.n_layer * 8 |
|
) / (1024**2) |
|
|
|
|
|
gradient_memory = param_memory |
|
|
|
return { |
|
"parameters_mb": param_memory, |
|
"activations_mb": activation_memory, |
|
"gradients_mb": gradient_memory, |
|
"total_training_mb": param_memory + activation_memory + gradient_memory, |
|
"total_inference_mb": param_memory + activation_memory * 0.5, |
|
} |
|
|
|
|
|
def create_model(model_size: str = "medium") -> GPTModel: |
|
""" |
|
Factory function to create a GPT model with predefined configurations. |
|
|
|
Args: |
|
model_size: Size of model to create ("small", "medium", "large") |
|
|
|
Returns: |
|
GPTModel: Initialized model |
|
""" |
|
configs = { |
|
"small": GPTConfig.small(), |
|
"medium": GPTConfig.medium(), |
|
"large": GPTConfig.large(), |
|
} |
|
|
|
if model_size not in configs: |
|
raise ValueError(f"Unknown model size: {model_size}. Choose from {list(configs.keys())}") |
|
|
|
config = configs[model_size] |
|
model = GPTModel(config) |
|
|
|
return model |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
print("🧠 GPT Model Architecture") |
|
print("=" * 50) |
|
|
|
|
|
for size in ["small", "medium", "large"]: |
|
print(f"\n{size.upper()} MODEL:") |
|
model = create_model(size) |
|
|
|
|
|
memory = model.estimate_memory_usage(batch_size=4, seq_len=512) |
|
print( |
|
f"Memory (4 batch, 512 seq): {memory['total_training_mb']:.1f}MB training, {memory['total_inference_mb']:.1f}MB inference" |
|
) |
|
|
|
|
|
x = torch.randint(0, 32000, (2, 64)) |
|
with torch.no_grad(): |
|
logits, _ = model(x) |
|
print(f"Test forward pass: {x.shape} -> {logits.shape} ✓") |
|
|