Text Generation
English
research
leaf / model.py
boyuia's picture
Added model source.
f4c3c19 verified
import torch
import torch.nn as nn
from torch.nn import functional as F
import json
import os
# --- Hyperparameters ---
# These are the settings for our model. You can experiment with these values.
batch_size = 32 # How many sequences to process in parallel
block_size = 8 # Maximum context length for predictions
max_iters = 3000 # Number of training iterations
eval_interval = 300 # How often to evaluate the model
learning_rate = 1e-2 # The learning rate for the optimizer
device = 'cuda' if torch.cuda.is_available() else 'cpu' # Use GPU if available
eval_iters = 200 # Number of iterations for evaluation
n_embd = 32 # The dimension of the token embeddings
n_head = 4 # The number of attention heads in the Multi-Head Attention block
n_layer = 4 # The number of Transformer blocks
dropout = 0.0 # Dropout rate for regularization
# --- Data Preparation ---
# To use this code, you need to create a file named 'dataset.jsonl'
# in the same directory as this script. Each line of the file should be a JSON object
# with 'header' and 'formal_statement' keys, like the example you provided.
file_path = 'dataset.jsonl'
# Process the JSONL data from the file.
corpus = ""
try:
with open(file_path, 'r') as f:
for line in f:
data_point = json.loads(line)
# Combine the 'header' and 'formal_statement' fields.
# We add a newline character to separate the two parts of the text.
corpus += data_point['header'] + '\n' + data_point['formal_statement'] + '\n'
except FileNotFoundError:
print(f"Error: The file '{file_path}' was not found. Please create it and add your data.")
exit()
except json.JSONDecodeError:
print(f"Error: There was a problem parsing a line in '{file_path}'. Make sure each line is a valid JSON object.")
exit()
except KeyError:
print(f"Error: A line in '{file_path}' does not have the 'header' or 'formal_statement' keys. Please check your JSONL file format.")
exit()
# Check if the corpus is empty after loading the file.
if not corpus:
print(f"Error: The corpus is empty. This could be because '{file_path}' is empty or contains no valid text.")
exit()
# Here we create a simple character-level tokenizer.
# The vocabulary consists of all unique characters in the text.
chars = sorted(list(set(corpus)))
vocab_size = len(chars)
stoi = {ch: i for i, ch in enumerate(chars)}
itos = {i: ch for i, ch in enumerate(chars)}
# Fix the bug in the encode function. The loop variable was 's' instead of 'c'.
encode = lambda s: [stoi[c] for c in s]
decode = lambda l: ''.join([itos[i] for i in l])
# Convert the entire text into a PyTorch tensor.
data = torch.tensor(encode(corpus), dtype=torch.long)
# Create a simple train/validation split.
n = int(0.9 * len(data))
train_data = data[:n]
val_data = data[n:]
# --- Helper Functions ---
# This function gets a random batch of data from either the training or validation set.
def get_batch(split):
data = train_data if split == 'train' else val_data
# Generate random starting indices for each sequence in the batch.
ix = torch.randint(len(data) - block_size, (batch_size,))
# Stack the sequences to create a batch.
x = torch.stack([data[i:i + block_size] for i in ix])
y = torch.stack([data[i + 1:i + block_size + 1] for i in ix])
x, y = x.to(device), y.to(device)
return x, y
# This function is used to estimate the model's loss on both the train and validation sets.
# It uses torch.no_grad() to make the process more efficient as we're not training.
@torch.no_grad()
def estimate_loss():
out = {}
model.eval() # Set the model to evaluation mode.
for split in ['train', 'val']:
losses = torch.zeros(eval_iters)
for k in range(eval_iters):
X, Y = get_batch(split)
logits, loss = model(X, Y)
losses[k] = loss.item()
out[split] = losses.mean()
model.train() # Set the model back to training mode.
return out
# --- The Self-Attention Mechanism ---
# This is a single attention head.
class Head(nn.Module):
def __init__(self, head_size):
super().__init__()
# Linear layers to project the input into key, query, and value vectors.
self.key = nn.Linear(n_embd, head_size, bias=False)
self.query = nn.Linear(n_embd, head_size, bias=False)
self.value = nn.Linear(n_embd, head_size, bias=False)
# A buffer to store a lower-triangular matrix, which prevents future tokens from
# "seeing" past tokens (decoder-style attention).
self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))
# Dropout layer for regularization.
self.dropout = nn.Dropout(dropout)
def forward(self, x):
B, T, C = x.shape
k = self.key(x) # (B, T, head_size)
q = self.query(x) # (B, T, head_size)
# Compute the affinity scores (weights).
# (q @ k.transpose(-2, -1)) is matrix multiplication of q and k transpose.
wei = q @ k.transpose(-2, -1) * C**-0.5 # (B, T, head_size) @ (B, head_size, T) -> (B, T, T)
# Apply the lower-triangular mask to enforce causality.
wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf'))
# Apply softmax to get the attention weights.
wei = F.softmax(wei, dim=-1)
self.dropout(wei)
v = self.value(x) # (B, T, head_size)
out = wei @ v # (B, T, T) @ (B, T, head_size) -> (B, T, head_size)
return out
# This combines multiple attention heads in parallel.
class MultiHeadAttention(nn.Module):
def __init__(self, num_heads, head_size):
super().__init__()
# Create a list of `Head` modules.
self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
# A final linear layer to project the concatenated output of all heads.
self.proj = nn.Linear(num_heads * head_size, n_embd)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
# Concatenate the output from each head.
out = torch.cat([h(x) for h in self.heads], dim=-1)
out = self.dropout(self.proj(out))
return out
# This is a simple feed-forward network.
class FeedFoward(nn.Module):
def __init__(self, n_embd):
super().__init__()
# A simple linear-ReLU-linear stack.
self.net = nn.Sequential(
nn.Linear(n_embd, 4 * n_embd),
nn.ReLU(),
nn.Linear(4 * n_embd, n_embd),
nn.Dropout(dropout),
)
def forward(self, x):
return self.net(x)
# This is a single Transformer block, composed of Multi-Head Attention and a Feed-Forward network.
class TransformerBlock(nn.Module):
def __init__(self, n_embd, n_head):
super().__init__()
head_size = n_embd // n_head
# The attention mechanism.
self.sa = MultiHeadAttention(n_head, head_size)
# The feed-forward network.
self.ffwd = FeedFoward(n_embd)
# Layer normalization layers.
self.ln1 = nn.LayerNorm(n_embd)
self.ln2 = nn.LayerNorm(n_embd)
def forward(self, x):
# Apply self-attention with a residual connection and layer normalization.
x = x + self.sa(self.ln1(x))
# Apply feed-forward with another residual connection and layer normalization.
x = x + self.ffwd(self.ln2(x))
return x
# --- The Main Language Model ---
class LanguageModel(nn.Module):
def __init__(self):
super().__init__()
# A token embedding table: each integer token gets a vector representation.
self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
# A positional embedding table: each position gets a vector representation.
self.position_embedding_table = nn.Embedding(block_size, n_embd)
# A sequence of Transformer blocks.
self.blocks = nn.Sequential(*[TransformerBlock(n_embd, n_head) for _ in range(n_layer)])
# A final layer normalization.
self.ln_f = nn.LayerNorm(n_embd)
# A linear layer to project the final embeddings to the vocabulary size.
self.lm_head = nn.Linear(n_embd, vocab_size)
def forward(self, idx, targets=None):
B, T = idx.shape
# Get token embeddings and positional embeddings.
tok_emb = self.token_embedding_table(idx) # (B, T, C)
pos_emb = self.position_embedding_table(torch.arange(T, device=device)) # (T, C)
# Add them together to get the final embeddings.
x = tok_emb + pos_emb # (B, T, C)
# Pass through the Transformer blocks.
x = self.blocks(x)
x = self.ln_f(x)
# Project to the vocabulary size.
logits = self.lm_head(x) # (B, T, vocab_size)
loss = None
if targets is not None:
# Reshape for cross-entropy loss calculation.
B, T, C = logits.shape
logits = logits.view(B * T, C)
targets = targets.view(B * T)
loss = F.cross_entropy(logits, targets)
return logits, loss
# A function to generate text.
def generate(self, idx, max_new_tokens):
# idx is (B, T) tensor of indices in the current context.
for _ in range(max_new_tokens):
# Crop idx to block_size, as the model has a limited context.
idx_cond = idx[:, -block_size:]
# Get predictions.
logits, loss = self(idx_cond)
# Focus only on the last time step.
logits = logits[:, -1, :]
# Apply softmax to get probabilities.
probs = F.softmax(logits, dim=-1)
# Sample from the distribution.
idx_next = torch.multinomial(probs, num_samples=1)
# Append the new token to the sequence.
idx = torch.cat((idx, idx_next), dim=1)
return idx
# --- Training and Generation ---
model = LanguageModel()
m = model.to(device)
# Create a PyTorch optimizer.
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
# Main training loop.
for iter in range(max_iters):
# Every few iterations, evaluate the loss on both splits.
if iter % eval_interval == 0:
losses = estimate_loss()
print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")
# Sample a batch of data.
xb, yb = get_batch('train')
# Forward pass: compute loss.
logits, loss = model(xb, yb)
# Backward pass: compute gradients.
optimizer.zero_grad(set_to_none=True)
loss.backward()
# Update the model parameters.
optimizer.step()
# --- Generate new text from the trained model ---
context = torch.zeros((1, 1), dtype=torch.long, device=device)
generated_text_indices = m.generate(context, max_new_tokens=20)
print("\nGenerated text:")
print(decode(generated_text_indices[0].tolist()))