This training script can be run both on a single gpu in debug mode,
and also in a larger training run with distributed data parallel (ddp).
To run on a single GPU small debug run, example:
$ python -m --compile=False --eval_iters=10 --batch_size=8
To run with DDP on 4 gpus on 1 node, example:
$ torchrun --standalone --nproc_per_node=4
To run with DDP on 4 gpus across 2 nodes, example:
- Run on the first (master) node with example IP 123.456.123.456:
$ torchrun --nproc_per_node=8 --nnodes=2 --node_rank=0 --master_addr=123.456.123.456 --master_port=1234
- Run on the worker node:
$ torchrun --nproc_per_node=8 --nnodes=2 --node_rank=1 --master_addr=123.456.123.456 --master_port=1234
(If your cluster does not have Infiniband interconnect prepend NCCL_IB_DISABLE=1)
import math
import os
import time
from contextlib import nullcontext
from datetime import datetime
from functools import partial
import inspect
import torch
from torch.distributed import destroy_process_group, init_process_group
from torch.nn.parallel import DistributedDataParallel as DDP
from tinystories import Task
from model import MambaLMHeadModel
# -----------------------------------------------------------------------------
# I/O
out_dir = "out/768-8"
eval_interval = 2000
log_interval = 1
eval_iters = 100
eval_only = False # if True, script exits right after the first eval
always_save_checkpoint = True # if True, always save a checkpoint after each eval
init_from = "resume" # 'scratch' or 'resume'
# wandb logging
wandb_log = True # disabled by default
wandb_project = "tiny-mambas"
wandb_run_name = "run" +"%Y_%m_%d_%H_%M_%S")
# data
batch_size = 128 # if gradient_accumulation_steps > 1, tshis is the micro-batch size
max_seq_len = 256
vocab_size = 4096 # the Llama 2 tokenizer has 32K tokens
vocab_source = "custom"
# model
d_model = 768
n_layer = 8
# adamw optimizer
gradient_accumulation_steps = 4 # used to simulate larger batch sizes
learning_rate = 5e-4 # max learning rate
max_iters = 100000 # total number of training iterations
weight_decay = 1e-1
beta1 = 0.9
beta2 = 0.95
grad_clip = 1.0 # clip gradients at this value, or disable if == 0.0
# learning rate decay settings
decay_lr = True # whether to decay the learning rate
warmup_iters = 1000 # how many steps to warm up for
# system
device = "cuda" # examples: 'cpu', 'cuda', 'cuda:0', 'cuda:1' etc., or try 'mps' on macbooks
dtype = "float16" # float32|bfloat16|float16
compile = False # use PyTorch 2.0 to compile the model to be faster
class mambaConfig:
d_model: int = d_model
n_layer: int = n_layer
vocab_size: int = vocab_size
ssm_cfg: dict = None
rms_norm: bool = True
residual_in_fp32: bool = True
fused_add_norm: bool = True
pad_vocab_size_multiple: int = 8
config_keys = [
for k, v in globals().items()
if not k.startswith("_") and isinstance(v, (int, float, bool, str))
config = {k: globals()[k] for k in config_keys} # will be useful for logging
# fixing some hyperparams to sensible defaults
lr_decay_iters = max_iters # should be ~= max_iters per Chinchilla
min_lr = 5e-5 # minimum learning rate, should be ~= learning_rate/10 per Chinchilla
# -----------------------------------------------------------------------------
torch.backends.cuda.matmul.allow_tf32 = True # allow tf32 on matmul
torch.backends.cudnn.allow_tf32 = True # allow tf32 on cudnn
device_type = "cuda" if "cuda" in device else "cpu" # for later use in torch.autocast
# note: float16 data type will automatically use a GradScaler
ptdtype = {"float32": torch.float32, "bfloat16": torch.bfloat16, "float16": torch.float16}[dtype]
ctx = (
if device_type == "cpu"
else torch.amp.autocast(device_type=device_type, dtype=ptdtype)
# task-specific setup
iter_batches = partial(
# init these up here, can override if init_from='resume' (i.e. from a checkpoint)
iter_num = 0
best_val_loss = 1e9
# model init
model_args = dict(
tokens_per_iter = gradient_accumulation_steps * 1 * batch_size * max_seq_len
# start with model_args from command line
if init_from == "scratch":
# init a new model from scratch
print("Initializing a new model from scratch")
model = MambaLMHeadModel(mambaConfig)
model.last_loss = None
elif init_from == "resume":
print(f"Resuming training from {out_dir}")
# resume training from a checkpoint.
ckpt_path = os.path.join(out_dir, "")
checkpoint = torch.load(ckpt_path, map_location=device)
checkpoint_model_args = checkpoint["model_args"]
# force these config attributes to be equal otherwise we can't even resume training
# the rest of the attributes (e.g. dropout) can stay as desired from command line
for k in ["d_model", "n_layer", "vocab_size", "max_seq_len"]:
model_args[k] = checkpoint_model_args[k]
# create the model
model = MambaLMHeadModel(mambaConfig)
model.last_loss = None
state_dict = checkpoint["model"]
# fix the keys of the state dictionary :(
# honestly no idea how checkpoints sometimes get this prefix, have to debug more
unwanted_prefix = "_orig_mod."
for k, v in list(state_dict.items()):
if k.startswith(unwanted_prefix):
state_dict[k[len(unwanted_prefix) :]] = state_dict.pop(k)
iter_num = checkpoint["iter_num"]
best_val_loss = checkpoint["best_val_loss"]
# initialize a GradScaler. If enabled=False scaler is a no-op
scaler = torch.cuda.amp.GradScaler(enabled=(dtype == "float16"))
# optimizer
# start with all of the candidate parameters
param_dict = {pn: p for pn, p in model.named_parameters()}
# filter out those that do not require grad
param_dict = {pn: p for pn, p in param_dict.items() if p.requires_grad}
# create optim groups. Any parameters that is 2D will be weight decayed, otherwise no.
# i.e. all weight tensors in matmuls + embeddings decay, all biases and layernorms don't.
betas = (beta1, beta2)
decay_params = [p for n, p in param_dict.items() if p.dim() >= 2]
nodecay_params = [p for n, p in param_dict.items() if p.dim() < 2]
optim_groups = [
{'params': decay_params, 'weight_decay': weight_decay},
{'params': nodecay_params, 'weight_decay': 0.0}
num_decay_params = sum(p.numel() for p in decay_params)
num_nodecay_params = sum(p.numel() for p in nodecay_params)
print(f"num decayed parameter tensors: {len(decay_params)}, with {num_decay_params:,} parameters")
print(f"num non-decayed parameter tensors: {len(nodecay_params)}, with {num_nodecay_params:,} parameters")
# Create AdamW optimizer and use the fused version if it is available
fused_available = 'fused' in inspect.signature(torch.optim.AdamW).parameters
use_fused = fused_available and device_type == 'cuda'
extra_args = dict(fused=True) if use_fused else dict()
optimizer = torch.optim.AdamW(optim_groups, lr=learning_rate, betas=betas, **extra_args)
print(f"using fused AdamW: {use_fused}")
if init_from == "resume" and "optimizer" in checkpoint:
checkpoint = None # free up memory
# compile the model
if compile:
print("compiling the model... (takes a ~minute)")
unoptimized_model = model
model = torch.compile(model) # requires PyTorch 2.0
# wrap model into DDP container
# helps estimate an arbitrarily accurate loss over either split using many batches
def estimate_loss():
out = {}
for split in ["train", "val"]:
batch_iter = iter_batches(split=split)
losses = torch.zeros(eval_iters) # keep on CPU
for k in range(eval_iters):
X, Y = next(batch_iter)
with ctx:
logits = model(X, Y)
loss = raw_model.last_loss
losses[k] = loss.item()
out[split] = losses.mean()
return out
# learning rate decay scheduler (cosine with warmup)
def get_lr(it):
# 1) linear warmup for warmup_iters steps
if it < warmup_iters:
return learning_rate * it / warmup_iters
# 2) if it > lr_decay_iters, return min learning rate
if it > lr_decay_iters:
return min_lr
# 3) in between, use cosine decay down to min learning rate
decay_ratio = (it - warmup_iters) / (lr_decay_iters - warmup_iters)
assert 0 <= decay_ratio <= 1
coeff = 0.5 * (1.0 + math.cos(math.pi * decay_ratio)) # coeff ranges 0..1
return min_lr + coeff * (learning_rate - min_lr)
# logging
if wandb_log:
import wandb
wandb.init(project=wandb_project, name=wandb_run_name, config=config)
# training loop
train_batch_iter = iter_batches(split="train")
X, Y = next(train_batch_iter) # fetch the very first batch
t0 = time.time()
local_iter_num = 0 # number of iterations in the lifetime of this process
raw_model = model # unwrap DDP container if needed
running_mfu = -1.0
while True:
# determine and set the learning rate for this iteration
lr = get_lr(iter_num) if decay_lr else learning_rate
for param_group in optimizer.param_groups:
param_group["lr"] = lr
# evaluate the loss on train/val sets and write checkpoints
if iter_num % eval_interval == 0:
losses = estimate_loss()
print(f"step {iter_num}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")
if wandb_log:
"iter": iter_num,
"tokens": iter_num * tokens_per_iter,
"loss/train": losses["train"],
"loss/val": losses["val"],
"lr": lr,
}, step = iter_num
except Exception as e:
print(f"logging to wandb failed: {e}")
if losses["val"] < best_val_loss or always_save_checkpoint:
best_val_loss = losses["val"]
if iter_num > 0:
checkpoint = {
"model": raw_model.state_dict(),
"optimizer": optimizer.state_dict(),
"model_args": model_args,
"iter_num": iter_num,
"best_val_loss": best_val_loss,
"config": config,
print(f"saving checkpoint to {out_dir}"), os.path.join(out_dir, ""))
#model_export(raw_model, os.path.join(out_dir, "model.bin"), version=0)
if iter_num == 0 and eval_only:
# forward backward update, with optional gradient accumulation to simulate larger batch size
# and using the GradScaler if data type is float16
for micro_step in range(gradient_accumulation_steps):
with ctx:
logits = model(X, Y)
loss = raw_model.last_loss
loss = loss / gradient_accumulation_steps
# immediately async prefetch next batch while model is doing the forward pass on the GPU
X, Y = next(train_batch_iter)
# backward pass, with gradient scaling if training in fp16
# clip the gradient
if grad_clip != 0.0:
torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip)
# step the optimizer and scaler if training in fp16
# flush the gradients as soon as we can, no need for this memory anymore
# timing and logging
t1 = time.time()
dt = t1 - t0
t0 = t1
if iter_num % log_interval == 0:
# get loss as float, scale up due to the divide above. note: this is a CPU-GPU sync point
lossf = loss.item() * gradient_accumulation_steps
f"{iter_num} | loss {lossf:.4f} | lr {lr:e} | {dt*1000:.2f}ms |"
iter_num += 1
local_iter_num += 1
# termination conditions
if iter_num > max_iters: