llm / model.py
lemms's picture
Upload model.py with huggingface_hub
d253f09 verified
#!/usr/bin/env python3
# Copyright (C) 2024 Louis Chua Bean Chong
#
# This file is part of OpenLLM.
#
# OpenLLM is dual-licensed:
# 1. For open source use: GNU General Public License v3.0
# 2. For commercial use: Commercial License (contact for details)
#
# See LICENSE and docs/LICENSES.md for full license information.
"""
GPT-style Language Model Architecture
This module implements a standard GPT (Generative Pre-trained Transformer) architecture
using pure PyTorch. The model is a decoder-only transformer designed for autoregressive
language modeling (next-token prediction).
ARCHITECTURE OVERVIEW:
- Token Embedding: Maps token IDs to dense vectors
- Positional Embedding: Adds position information to token embeddings
- Transformer Blocks: Stack of multi-head attention + feed-forward layers
- Layer Normalization: Pre-norm placement for training stability
- Output Head: Linear projection to vocabulary for next-token prediction
FEATURES:
- Configurable model size (small/medium/large)
- Dropout for regularization
- Causal (autoregressive) attention masking
- Compatible with our SentencePiece tokenizer
- Memory-efficient implementation for training on limited hardware
Usage:
from model import GPTConfig, GPTModel
config = GPTConfig(vocab_size=32000, n_layer=12, n_head=12, n_embd=768)
model = GPTModel(config)
# Forward pass
logits = model(input_ids) # Shape: (batch_size, seq_len, vocab_size)
Hardware Requirements:
- Small Model (25M params): 4-8GB RAM, CPU/integrated GPU
- Medium Model (117M params): 8-16GB RAM, dedicated GPU recommended
- Large Model (350M params): 16GB+ RAM, high-end GPU required
Author: Louis Chua Bean Chong
License: GPLv3
"""
import math
from dataclasses import dataclass
from typing import Optional, Tuple
import torch
import torch.nn as nn
import torch.nn.functional as F
@dataclass
class GPTConfig:
"""
Configuration class for GPT model hyperparameters.
This class defines all the architectural parameters needed to instantiate
a GPT model. Use the provided class methods to get pre-configured setups
for different model sizes.
"""
# Model architecture
vocab_size: int = 32000 # Vocabulary size (from tokenizer)
n_layer: int = 12 # Number of transformer layers
n_head: int = 12 # Number of attention heads
n_embd: int = 768 # Embedding dimension
# Sequence and context
block_size: int = 1024 # Maximum sequence length
# Training hyperparameters
dropout: float = 0.1 # Dropout probability
bias: bool = True # Use bias in linear layers
# Model size identifier
model_name: str = "gpt-medium" # Human-readable model identifier
@classmethod
def small(cls) -> "GPTConfig":
"""Small model configuration (~25M parameters) - Good for CPU training"""
return cls(
vocab_size=32000,
n_layer=6,
n_head=8,
n_embd=512,
block_size=1024,
dropout=0.1,
model_name="gpt-small",
)
@classmethod
def medium(cls) -> "GPTConfig":
"""Medium model configuration (~117M parameters) - Balanced performance"""
return cls(
vocab_size=32000,
n_layer=12,
n_head=12,
n_embd=768,
block_size=2048,
dropout=0.1,
model_name="gpt-medium",
)
@classmethod
def large(cls) -> "GPTConfig":
"""Large model configuration (~350M parameters) - High performance"""
return cls(
vocab_size=32000,
n_layer=24,
n_head=16,
n_embd=1024,
block_size=2048,
dropout=0.1,
model_name="gpt-large",
)
def estimate_parameters(self) -> int:
"""
Estimate the total number of trainable parameters.
Returns:
int: Estimated parameter count
"""
# Token embeddings
token_emb = self.vocab_size * self.n_embd
# Position embeddings
pos_emb = self.block_size * self.n_embd
# Transformer layers
# Each layer: attention (4 * n_embd^2) + mlp (8 * n_embd^2) + layer_norms
layer_params = self.n_layer * (12 * self.n_embd**2 + 4 * self.n_embd)
# Output head
output_head = self.vocab_size * self.n_embd
total = token_emb + pos_emb + layer_params + output_head
return total
class CausalSelfAttention(nn.Module):
"""
Multi-head causal self-attention mechanism.
This implements the core attention mechanism of the transformer, with causal
masking to ensure autoregressive behavior (tokens can only attend to previous
tokens, not future ones).
"""
def __init__(self, config: GPTConfig):
super().__init__()
assert (
config.n_embd % config.n_head == 0
), "Embedding dim must be divisible by number of heads"
self.config = config
self.n_head = config.n_head
self.n_embd = config.n_embd
self.head_dim = self.n_embd // self.n_head
# Key, query, value projections for all heads (batched)
self.c_attn = nn.Linear(config.n_embd, 3 * config.n_embd, bias=config.bias)
# Output projection
self.c_proj = nn.Linear(config.n_embd, config.n_embd, bias=config.bias)
# Dropout
self.attn_dropout = nn.Dropout(config.dropout)
self.resid_dropout = nn.Dropout(config.dropout)
# Causal mask - lower triangular matrix
self.register_buffer(
"bias",
torch.tril(torch.ones(config.block_size, config.block_size)).view(
1, 1, config.block_size, config.block_size
),
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Forward pass of causal self-attention.
This method implements the scaled dot-product attention mechanism with causal masking.
The attention mechanism allows each token to attend to all previous tokens in the sequence,
but not to future tokens, maintaining the autoregressive property essential for language modeling.
Mathematical formulation:
Attention(Q, K, V) = softmax(QK^T / sqrt(d_k))V
where Q, K, V are query, key, value matrices derived from input x
Implementation details:
- Uses batch matrix multiplication for efficiency
- Applies causal mask to prevent future token attention
- Implements multi-head attention by reshaping and parallel processing
- Applies dropout for regularization during training
Args:
x: Input tensor of shape (batch_size, seq_len, n_embd)
Contains embedded token representations from previous layer
Returns:
torch.Tensor: Output tensor of shape (batch_size, seq_len, n_embd)
"""
# Extract tensor dimensions for clear variable naming and validation
# B = batch size (number of sequences processed in parallel)
# T = sequence length (number of tokens in each sequence)
# C = embedding dimensionality (n_embd from config)
B, T, C = x.size()
# Generate query, key, and value projections for all attention heads
# The c_attn linear layer outputs 3 * n_embd features, which we split into Q, K, V
# This batched approach is more efficient than separate linear layers
# Input shape: (B, T, C) -> Output shape: (B, T, 3*C) -> Split to 3x (B, T, C)
q, k, v = self.c_attn(x).split(self.n_embd, dim=2)
# Reshape tensors for multi-head attention computation
# Transform from (B, T, C) to (B, nh, T, hs) where:
# - nh = number of heads (self.n_head)
# - hs = head size (self.head_dim = C // nh)
# The transpose(1, 2) moves the head dimension before sequence dimension for efficient computation
q = q.view(B, T, self.n_head, self.head_dim).transpose(1, 2) # (B, nh, T, hs)
k = k.view(B, T, self.n_head, self.head_dim).transpose(1, 2) # (B, nh, T, hs)
v = v.view(B, T, self.n_head, self.head_dim).transpose(1, 2) # (B, nh, T, hs)
# Compute scaled dot-product attention scores
# Matrix multiplication: Q @ K^T gives attention affinities between all token pairs
# Scaling by 1/sqrt(head_dim) prevents softmax saturation for large embedding dimensions
# Shape: (B, nh, T, hs) @ (B, nh, hs, T) -> (B, nh, T, T)
# The resulting (T, T) matrix represents attention weights from each token to every other token
att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(self.head_dim))
# Apply causal masking to enforce autoregressive property
# The causal mask ensures that token i can only attend to tokens j where j <= i
# This prevents the model from "cheating" by looking at future tokens during training
# We use -inf for masked positions so they become 0 after softmax
att = att.masked_fill(self.bias[:, :, :T, :T] == 0, float("-inf"))
# Convert attention scores to probabilities using softmax
# Each row of the attention matrix now sums to 1, representing a probability distribution
# over which tokens to attend to for each query position
att = F.softmax(att, dim=-1)
# Apply dropout to attention weights for regularization
# This randomly zeros some attention connections during training to prevent overfitting
att = self.attn_dropout(att)
# Apply attention weights to value vectors
# This weighted combination produces the actual output of the attention mechanism
# Shape: (B, nh, T, T) @ (B, nh, T, hs) -> (B, nh, T, hs)
# Each output position is a weighted sum of all value vectors, with weights from attention
y = att @ v
# Concatenate multi-head outputs back to original embedding dimension
# Transform from (B, nh, T, hs) back to (B, T, C) where C = nh * hs
# The transpose moves head dimension back, and contiguous() ensures memory layout efficiency
# This combines information from all attention heads into a single representation
y = y.transpose(1, 2).contiguous().view(B, T, C)
# Apply final output projection and residual dropout
# The output projection allows the model to learn how to best combine multi-head information
# Residual dropout provides additional regularization before the residual connection
y = self.resid_dropout(self.c_proj(y))
return y
class MLP(nn.Module):
"""
Multi-Layer Perceptron (Feed-Forward Network) for Transformer.
This implements the position-wise feed-forward network that appears in each transformer layer.
The MLP provides additional non-linear transformation capacity beyond what attention provides.
Architecture:
Input -> Linear(n_embd -> 4*n_embd) -> GELU -> Linear(4*n_embd -> n_embd) -> Dropout -> Output
Design rationale:
- 4x expansion is standard in transformers (from "Attention Is All You Need")
- GELU activation provides smoother gradients than ReLU for language modeling
- Dropout prevents overfitting in the feed-forward layers
- Two linear layers allow complex non-linear transformations of attention outputs
Parameters:
- First linear layer: n_embd * 4*n_embd parameters (expansion)
- Second linear layer: 4*n_embd * n_embd parameters (projection back)
- Total: 8 * n_embd^2 parameters (significant portion of model size)
"""
def __init__(self, config: GPTConfig):
super().__init__()
# First linear layer: expand embedding dimension by 4x
# This expansion gives the network more representational capacity
# The 4x factor is a standard choice that balances capacity vs efficiency
self.c_fc = nn.Linear(config.n_embd, 4 * config.n_embd, bias=config.bias)
# GELU (Gaussian Error Linear Unit) activation function
# GELU provides smoother gradients compared to ReLU and works better for language modeling
# It's approximately: GELU(x) = x * Φ(x) where Φ is the CDF of standard normal distribution
self.gelu = nn.GELU()
# Second linear layer: project back to original embedding dimension
# This projection allows the network to combine information from the expanded representation
self.c_proj = nn.Linear(4 * config.n_embd, config.n_embd, bias=config.bias)
# Dropout for regularization in the feed-forward network
# Applied after the final projection to prevent overfitting
self.dropout = nn.Dropout(config.dropout)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Forward pass of the feed-forward network.
This method applies a two-layer MLP with GELU activation to transform
the attention outputs. The MLP operates independently on each position
in the sequence, providing position-wise non-linear transformations.
Mathematical operation:
MLP(x) = Dropout(Linear₂(GELU(Linear₁(x))))
where Linear₁: R^n_embd -> R^4*n_embd and Linear₂: R^4*n_embd -> R^n_embd
Args:
x: Input tensor of shape (batch_size, seq_len, n_embd)
Contains attended representations from the attention layer
Returns:
torch.Tensor: Output tensor of shape (batch_size, seq_len, n_embd)
Contains transformed representations ready for residual connection
"""
# First linear transformation: expand from n_embd to 4*n_embd dimensions
# This expansion provides the network with a higher-dimensional space for computation
# Shape: (batch_size, seq_len, n_embd) -> (batch_size, seq_len, 4*n_embd)
x = self.c_fc(x)
# Apply GELU activation function for non-linearity
# GELU is smoother than ReLU and provides better gradients for language modeling
# It introduces non-linearity while maintaining differentiability everywhere
x = self.gelu(x)
# Second linear transformation: project back to original n_embd dimensions
# This projection combines information from the expanded representation
# Shape: (batch_size, seq_len, 4*n_embd) -> (batch_size, seq_len, n_embd)
x = self.c_proj(x)
# Apply dropout for regularization before residual connection
# Dropout randomly zeros some neurons during training to prevent overfitting
# This is particularly important in the feed-forward layers which have many parameters
x = self.dropout(x)
return x
class Block(nn.Module):
"""
Single Transformer block.
Consists of:
1. Layer normalization
2. Multi-head causal self-attention
3. Residual connection
4. Layer normalization
5. MLP (feed-forward network)
6. Residual connection
Uses pre-norm architecture for better training stability.
"""
def __init__(self, config: GPTConfig):
super().__init__()
self.ln_1 = nn.LayerNorm(config.n_embd)
self.attn = CausalSelfAttention(config)
self.ln_2 = nn.LayerNorm(config.n_embd)
self.mlp = MLP(config)
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""
Forward pass of transformer block.
Args:
x: Input tensor of shape (batch_size, seq_len, n_embd)
Returns:
torch.Tensor: Output tensor of shape (batch_size, seq_len, n_embd)
"""
# Pre-norm attention with residual connection
x = x + self.attn(self.ln_1(x))
# Pre-norm MLP with residual connection
x = x + self.mlp(self.ln_2(x))
return x
class GPTModel(nn.Module):
"""
Complete GPT Language Model.
This is the main model class that combines all components:
- Token and positional embeddings
- Stack of transformer blocks
- Final layer normalization
- Language modeling head
The model can be used for:
- Training from scratch on text data
- Fine-tuning on downstream tasks
- Text generation (inference)
"""
def __init__(self, config: GPTConfig, use_checkpoint=True):
super().__init__()
assert config.vocab_size is not None, "vocab_size must be specified"
assert config.block_size is not None, "block_size must be specified"
self.config = config
self.use_checkpoint = use_checkpoint
# Embeddings
self.transformer = nn.ModuleDict(
dict(
wte=nn.Embedding(config.vocab_size, config.n_embd), # Token embeddings
wpe=nn.Embedding(config.block_size, config.n_embd), # Position embeddings
drop=nn.Dropout(config.dropout),
h=nn.ModuleList(
[Block(config) for _ in range(config.n_layer)]
), # Transformer blocks
ln_f=nn.LayerNorm(config.n_embd), # Final layer norm
)
)
# Language modeling head (maps hidden states to vocabulary)
self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False)
# Tie weights between token embeddings and output head (common practice)
self.transformer.wte.weight = self.lm_head.weight
# Initialize weights
self.apply(self._init_weights)
# Report parameter count
print(f"Model initialized: {self.config.model_name}")
print(f"Parameters: {self.get_num_params():,}")
print(f"Estimated: {self.config.estimate_parameters():,}")
def _init_weights(self, module):
"""Initialize model weights using standard practices."""
if isinstance(module, nn.Linear):
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
if module.bias is not None:
torch.nn.init.zeros_(module.bias)
elif isinstance(module, nn.Embedding):
torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
def get_num_params(self, non_embedding: bool = False) -> int:
"""
Count the number of parameters in the model.
Args:
non_embedding: If True, subtract embedding parameters
Returns:
int: Number of parameters
"""
n_params = sum(p.numel() for p in self.parameters())
if non_embedding:
n_params -= self.transformer.wpe.weight.numel()
n_params -= self.transformer.wte.weight.numel()
return n_params
def forward(
self, idx: torch.Tensor, targets: Optional[torch.Tensor] = None
) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
"""
Forward pass of the GPT model.
Args:
idx: Input token indices of shape (batch_size, seq_len)
targets: Optional target tokens for loss calculation (batch_size, seq_len)
Returns:
Tuple containing:
- logits: Output logits of shape (batch_size, seq_len, vocab_size)
- loss: Cross-entropy loss if targets provided, None otherwise
"""
device = idx.device
b, t = idx.size()
assert (
t <= self.config.block_size
), f"Sequence length {t} exceeds block size {self.config.block_size}"
# Token embeddings
tok_emb = self.transformer.wte(idx) # (b, t, n_embd)
# Position embeddings
pos = torch.arange(0, t, dtype=torch.long, device=device) # (t,)
pos_emb = self.transformer.wpe(pos) # (t, n_embd)
# Combine embeddings and apply dropout
x = self.transformer.drop(tok_emb + pos_emb)
# Pass through transformer blocks with optional gradient checkpointing
if self.use_checkpoint and self.training:
# Use gradient checkpointing to save memory during training
for block in self.transformer.h:
x = torch.utils.checkpoint.checkpoint(block, x)
else:
# Standard forward pass
for block in self.transformer.h:
x = block(x)
# Final layer normalization
x = self.transformer.ln_f(x)
# Language modeling head
# Always compute full logits for training and evaluation
logits = self.lm_head(x)
if targets is not None:
# If we have targets, compute loss
loss = F.cross_entropy(
logits.view(-1, logits.size(-1)), targets.view(-1), ignore_index=-1
)
else:
# If no targets, no loss computation
loss = None
return logits, loss
def generate(
self,
idx: torch.Tensor,
max_new_tokens: int = 100,
temperature: float = 1.0,
top_k: Optional[int] = None,
) -> torch.Tensor:
"""
Generate new tokens autoregressively.
Args:
idx: Starting token indices (batch_size, seq_len)
max_new_tokens: Maximum number of new tokens to generate
temperature: Sampling temperature (higher = more random)
top_k: If set, only sample from top-k most likely tokens
Returns:
torch.Tensor: Generated sequence (batch_size, seq_len + max_new_tokens)
"""
self.eval()
with torch.no_grad():
for _ in range(max_new_tokens):
# Crop sequence if it exceeds block size
idx_cond = (
idx
if idx.size(1) <= self.config.block_size
else idx[:, -self.config.block_size :]
)
# Forward pass
logits, _ = self(idx_cond)
# Get logits for the last token and apply temperature
logits = logits[:, -1, :] / temperature
# Optionally crop to top-k most likely tokens
if top_k is not None:
v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
logits[logits < v[:, [-1]]] = -float("inf")
# Apply softmax and sample
probs = F.softmax(logits, dim=-1)
idx_next = torch.multinomial(probs, num_samples=1)
# Append to sequence
idx = torch.cat((idx, idx_next), dim=1)
self.train() # Return to training mode
return idx
def estimate_memory_usage(self, batch_size: int = 1, seq_len: int = None) -> dict:
"""
Estimate memory usage for training and inference.
Args:
batch_size: Batch size for estimation
seq_len: Sequence length (defaults to block_size)
Returns:
dict: Memory usage estimates in MB
"""
if seq_len is None:
seq_len = self.config.block_size
# Model parameters (weights)
param_memory = self.get_num_params() * 4 / (1024**2) # 4 bytes per float32
# Activations (rough estimate)
activation_memory = (
batch_size * seq_len * self.config.n_embd * self.config.n_layer * 8 # Rough estimate
) / (1024**2)
# Gradients (same size as parameters during training)
gradient_memory = param_memory
return {
"parameters_mb": param_memory,
"activations_mb": activation_memory,
"gradients_mb": gradient_memory,
"total_training_mb": param_memory + activation_memory + gradient_memory,
"total_inference_mb": param_memory + activation_memory * 0.5, # No gradients needed
}
def create_model(model_size: str = "medium") -> GPTModel:
"""
Factory function to create a GPT model with predefined configurations.
Args:
model_size: Size of model to create ("small", "medium", "large")
Returns:
GPTModel: Initialized model
"""
configs = {
"small": GPTConfig.small(),
"medium": GPTConfig.medium(),
"large": GPTConfig.large(),
}
if model_size not in configs:
raise ValueError(f"Unknown model size: {model_size}. Choose from {list(configs.keys())}")
config = configs[model_size]
model = GPTModel(config)
return model
if __name__ == "__main__":
# Example usage
print("🧠 GPT Model Architecture")
print("=" * 50)
# Create models of different sizes
for size in ["small", "medium", "large"]:
print(f"\n{size.upper()} MODEL:")
model = create_model(size)
# Show memory estimates
memory = model.estimate_memory_usage(batch_size=4, seq_len=512)
print(
f"Memory (4 batch, 512 seq): {memory['total_training_mb']:.1f}MB training, {memory['total_inference_mb']:.1f}MB inference"
)
# Test forward pass
x = torch.randint(0, 32000, (2, 64)) # Batch size 2, sequence length 64
with torch.no_grad():
logits, _ = model(x)
print(f"Test forward pass: {x.shape} -> {logits.shape} ✓")