# From PyTorch DDP to Accelerate to Trainer, mastery of distributed training with ease

## General Overview

This tutorial assumes you have a basic understanding of PyTorch and how to train a simple model. It will showcase training on multiple GPUs through a process called Distributed Data Parallelism (DDP) through three different levels of increasing abstraction:

• Native PyTorch DDP through the pytorch.distributed module
• Utilizing 🤗 Accelerate's light wrapper around pytorch.distributed that also helps ensure the code can be run on a single GPU and TPUs with zero code changes and miminimal code changes to the original code
• Utilizing 🤗 Transformer's high-level Trainer API which abstracts all the boilerplate code and supports various devices and distributed scenarios

## What is "Distributed" training and why does it matter?

Take some very basic PyTorch training code below, which sets up and trains a model on MNIST based on the official MNIST example

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms

class BasicNet(nn.Module):
def __init__(self):
super().__init__()
self.conv1 = nn.Conv2d(1, 32, 3, 1)
self.conv2 = nn.Conv2d(32, 64, 3, 1)
self.dropout1 = nn.Dropout(0.25)
self.dropout2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(9216, 128)
self.fc2 = nn.Linear(128, 10)
self.act = F.relu

def forward(self, x):
x = self.act(self.conv1(x))
x = self.act(self.conv2(x))
x = F.max_pool2d(x, 2)
x = self.dropout1(x)
x = torch.flatten(x, 1)
x = self.act(self.fc1(x))
x = self.dropout2(x)
x = self.fc2(x)
output = F.log_softmax(x, dim=1)
return output


We define the training device (cuda):

device = "cuda"


transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307), (0.3081))
])

test_dset = datasets.MNIST('data', train=False, transform=transform)



Move the model to the CUDA device:

model = BasicNet().to(device)


Build a PyTorch optimizer:

optimizer = optim.AdamW(model.parameters(), lr=1e-3)


Before finally creating a simplistic training and evaluation loop that performs one full iteration over the dataset and calculates the test accuracy:

model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()

model.eval()
correct = 0
output = model(data)
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()
print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')


Typically from here, one could either throw all of this into a python script or run it on a Jupyter Notebook.

However, how would you then get this script to run on say two GPUs or on multiple machines if these resources are available, which could improve training speed through distributed training? Just doing python myscript.py will only ever run the script using a single GPU. This is where torch.distributed comes into play

## PyTorch Distributed Data Parallelism

As the name implies, torch.distributed is meant to work on distributed setups. This can include multi-node, where you have a number of machines each with a single GPU, or multi-gpu where a single system has multiple GPUs, or some combination of both.

To convert our above code to work within a distributed setup, a few setup configurations must first be defined, detailed in the Getting Started with DDP Tutorial

First a setup and a cleanup function must be declared. This will open up a processing group that all of the compute processes can communicate through

Note: for this section of the tutorial it should be assumed these are sent in python script files. Later on a launcher using Accelerate will be discussed that removes this necessity

import os
import torch.distributed as dist

def setup(rank, world_size):
"Sets up the process group and configuration for PyTorch Distributed Data Parallelism"
os.environ["MASTER_PORT"] = "12355"

# Initialize the process group
dist.init_process_group("gloo", rank=rank, world_size=world_size)

def cleanup():
"Cleans up the distributed environment"
dist.destroy_process_group()


The last piece of the puzzle is how do I send my data and model to another GPU?

This is where the DistributedDataParallel module comes into play. It will copy your model onto each GPU, and when loss.backward() is called the backpropagation is performed and the resulting gradients across all these copies of the model will be averaged/reduced. This ensures each device has the same weights post the optimizer step.

Below is an example of our training setup, refactored as a function, with this capability:

Note: Here rank is the overall rank of the current GPU compared to all the other GPUs available, meaning they have a rank of 0 -> n-1

from torch.nn.parallel import DistributedDataParallel as DDP

def train(model, rank, world_size):
setup(rank, world_size)
model = model.to(rank)
ddp_model = DDP(model, device_ids=[rank])
# Train for one epoch
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
cleanup()


The optimizer needs to be declared based on the model on the specific device (so ddp_model and not model) for all of the gradients to properly be calculated.

Lastly, to run the script PyTorch has a convenient torchrun command line module that can help. Just pass in the number of nodes it should use as well as the script to run and you are set:

torchrun --nproc_per_nodes=2 --nnodes=1 example_script.py


The above will run the training script on two GPUs that live on a single machine and this is the barebones for performing only distributed training with PyTorch.

Now let's talk about Accelerate, a library aimed to make this process more seameless and also help with a few best practices

## 🤗 Accelerate

Accelerate is a library designed to allow you to perform what we just did above, without needing to modify your code greatly. On top of this, the data pipeline innate to Accelerate can also improve performance to your code as well.

First, let's wrap all of the above code we just performed into a single function, to help us visualize the difference:

def train_ddp(rank, world_size):
setup(rank, world_size)
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307), (0.3081))
])

test_dset = datasets.MNIST('data', train=False, transform=transform)

# Build model
model = model.to(rank)
ddp_model = DDP(model, device_ids=[rank])

# Build optimizer

# Train for a single epoch
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()

# Evaluate
model.eval()
correct = 0
data, target = data.to(device), target.to(device)
output = model(data)
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()
print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')


Next let's talk about how Accelerate can help. There's a few issues with the above code:

1. This is slightly inefficient, given that n dataloaders are made based on each device and pushed.
2. This code will only work for multi-GPU, so special care would need to be made for it to be ran on a single node again, or on TPU.

Accelerate helps this through the Accelerator class. Through it, the code remains much the same except for three lines of code when comparing a single node to multinode, as shown below:

def train_ddp_accelerate():
accelerator = Accelerator()
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307), (0.3081))
])

test_dset = datasets.MNIST('data', train=False, transform=transform)

# Build model
model = BasicModel()

# Build optimizer

# Send everything through accelerator.prepare
)

# Train for a single epoch
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
output = model(data)
loss = F.nll_loss(output, target)
accelerator.backward(loss)
optimizer.step()

# Evaluate
model.eval()
correct = 0
data, target = data.to(device), target.to(device)
output = model(data)
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()
print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')


With this your PyTorch training loop is now setup to be ran on any distributed setup thanks to the Accelerator object. This code can then still be launched through the torchrun CLI or through Accelerate's own CLI interface, accelerate launch.

As a result its now trivialized to perform distributed training with Accelerate and keeping as much of the barebones PyTorch code the same as possible.

Earlier it was mentioned that Accelerate also makes the DataLoaders more efficient. This is through custom Samplers that can send parts of the batches automatically to different devices during training allowing for a single copy of the data to be known at one time, rather than four at once into memory depending on the configuration. Along with this, there is only a single full copy of the original dataset in memory total. Subsets of this dataset are split between all of the nodes that are utilized for training, allowing for much larger datasets to be trained on a single instance without an explosion in memory utilized.

### Using the notebook_launcher

Earlier it was mentioned you can start distributed code directly out of your Jupyter Notebook. This comes from Accelerate's notebook_launcher utility, which allows for starting multi-gpu training based on code inside of a Jupyter Notebook.

To use it is as trivial as importing the launcher:

from accelerate import notebook_launcher


And passing the training function we declared earlier, any arguments to be passed, and the number of processes to use (such as 8 on a TPU, or 2 for two GPUs). Both of the above training functions can be ran, but do note that after you start a single launch, the instance needs to be restarted before spawning another

notebook_launcher(train_ddp, args=(), num_processes=2)


Or:

notebook_launcher(train_accelerate_ddp, args=(), num_processes=2)


## Using 🤗 Trainer

Finally, we arrive at the highest level of API -- the Hugging Face Trainer.

This wraps as much training as possible while still being able to train on distributed systems without the user needing to do anything at all.

First we need to import the Trainer:

from transformers import Trainer


Then we define some TrainingArguments to control all the usual hyper-parameters. The trainer also works through dictionaries, so a custom collate function needs to be made.

Finally, we subclass the trainer and write our own compute_loss.

Afterwards, this code will also work on a distributed setup without any training code needing to be written whatsoever!

from transformers import Trainer, TrainingArguments

model = BasicNet()

training_args = TrainingArguments(
"basic-trainer",
per_device_train_batch_size=64,
per_device_eval_batch_size=64,
num_train_epochs=1,
evaluation_strategy="epoch",
remove_unused_columns=False
)

def collate_fn(examples):
pixel_values = torch.stack([example[0] for example in examples])
labels = torch.tensor([example[1] for example in examples])
return {"x":pixel_values, "labels":labels}

class MyTrainer(Trainer):
def compute_loss(self, model, inputs, return_outputs=False):
outputs = model(inputs["x"])
target = inputs["labels"]
loss = F.nll_loss(outputs, target)
return (loss, outputs) if return_outputs else loss

trainer = MyTrainer(
model,
training_args,
train_dataset=train_dset,
eval_dataset=test_dset,
data_collator=collate_fn,
)

trainer.train()

    ***** Running training *****
Num examples = 60000
Num Epochs = 1
Instantaneous batch size per device = 64
Total train batch size (w. parallel, distributed & accumulation) = 64
Total optimization steps = 938

Epoch Training Loss Validation Loss
1 0.875700 0.282633

Similarly to the above examples with the notebook_launcher, this can be done again here by throwing it all into a training function:

def train_trainer_ddp():
model = BasicNet()

training_args = TrainingArguments(
"basic-trainer",
per_device_train_batch_size=64,
per_device_eval_batch_size=64,
num_train_epochs=1,
evaluation_strategy="epoch",
remove_unused_columns=False
)

def collate_fn(examples):
pixel_values = torch.stack([example[0] for example in examples])
labels = torch.tensor([example[1] for example in examples])
return {"x":pixel_values, "labels":labels}

class MyTrainer(Trainer):
def compute_loss(self, model, inputs, return_outputs=False):
outputs = model(inputs["x"])
target = inputs["labels"]
loss = F.nll_loss(outputs, target)
return (loss, outputs) if return_outputs else loss

trainer = MyTrainer(
model,
training_args,
train_dataset=train_dset,
eval_dataset=test_dset,
data_collator=collate_fn,
)

trainer.train()

notebook_launcher(train_trainer_ddp, args=(), num_processes=2)