| |
| |
|
|
| |
|
|
| from typing import List, Optional |
|
|
| import torch |
| from torch.nn import Module |
| from torch.nn.modules.loss import _Loss |
| from torch.optim.lr_scheduler import _LRScheduler |
|
|
| from internlm.core.gradient_handler import BaseGradientHandler |
| from internlm.solver.beta2_scheduler import Beta2Scheduler |
| from internlm.solver.optimizer.hybrid_zero_optim import BaseOptimizer |
| from internlm.utils.common import get_batch_size, move_to_device |
|
|
|
|
| class Engine: |
| """ |
| The Engine class is responsible for managing the training and evaluation process of a neural network model. |
| It handles the forward and backward passes, parameter updates, gradient handling, and mode switching between |
| training and evaluation. |
| |
| Args: |
| model (torch.nn.Module): The neural network model to be trained or evaluated. |
| optimizer (BaseOptimizer): The optimizer used for updating the parameters of the model. |
| lr_scheduler (torch.optim.lr_scheduler._LRScheduler, optional): The learning rate scheduler for the optimizer. |
| Default is None. |
| beta2_scheduler (internlm.solver.beta2_scheduler.Beta2Scheduler, optional): The beta2 scheduler for the |
| optimizer. Default is None. |
| criterion (torch.nn.modules.loss._Loss, optional): The loss function used for calculating the loss during |
| training. Default is None. |
| gradient_handlers (List[BaseGradientHandler], optional): A list of gradient handlers used in the backward pass. |
| Default is None. |
| clip_grad_norm (float, optional): The norm value for gradient clipping. Default is 0.0. |
| |
| Examples: |
| >>> # define model, criterion, optimizer, lr_scheduler, train_dataloader for your training |
| >>> model = ... |
| >>> criterion = ... |
| >>> optimizer = ... |
| >>> train_dataloader = ... |
| >>> engine, _, _, _ = internlm.initialize_engine(model, optimizer, criterion) |
| >>> engine.train() |
| >>> for inputs, labels in train_dataloader |
| >>> # set gradients to zero |
| >>> engine.zero_grad() |
| >>> # run forward pass |
| >>> outputs = engine(inputs) |
| >>> # compute loss value and run backward pass |
| >>> loss = engine.criterion(outputs, labels) |
| >>> engine.backward(loss) |
| >>> # update parameters |
| >>> engine.step() |
| """ |
|
|
| def __init__( |
| self, |
| model: Module, |
| optimizer: BaseOptimizer, |
| lr_scheduler: Optional[_LRScheduler] = None, |
| beta2_scheduler: Optional[Beta2Scheduler] = None, |
| criterion: Optional[_Loss] = None, |
| gradient_handlers: Optional[List[BaseGradientHandler]] = None, |
| clip_grad_norm: float = 0.0, |
| ): |
| self._model = model |
| self._optimizer = optimizer |
| self._lr_scheduler = lr_scheduler |
| self._beta2_scheduler = beta2_scheduler |
| self._criterion = criterion |
| self._clip_grad_norm = clip_grad_norm |
|
|
| |
| self.training = True |
|
|
| |
| self._gradient_handlers = gradient_handlers if gradient_handlers else [] |
|
|
| @property |
| def model(self): |
| """Returns the model attached to the engine.""" |
| return self._model |
|
|
| @property |
| def optimizer(self): |
| """Returns the optimizer attached to the engine.""" |
| return self._optimizer |
|
|
| @property |
| def criterion(self): |
| """Returns the criterion (loss function) attached to the engine.""" |
| return self._criterion |
|
|
| def _all_reduce_gradients(self): |
| """Handles all-reduce operations of gradients across different parallel groups.""" |
| for handler in self._gradient_handlers: |
| handler.handle_gradient() |
|
|
| def zero_grad(self): |
| """Sets the gradient of all parameters in the model to zero.""" |
| self.optimizer.zero_grad() |
|
|
| def step(self): |
| """ |
| Executes the parameter update step. This includes all-reduce operations of gradients, gradient clipping, |
| and parameter update. If successful, it also steps the learning rate scheduler and beta2 scheduler |
| if they exist. |
| |
| Returns: |
| success (bool): Whether the parameter update was successful. |
| grad_norm (float): The norm of the gradient after clipping. |
| """ |
| self._all_reduce_gradients() |
| self.optimizer.clip_grad_norm(self.model, self._clip_grad_norm) |
|
|
| success, grad_norm = self.optimizer.step() |
|
|
| if success and self._lr_scheduler is not None: |
| self._lr_scheduler.step() |
|
|
| if success and self._beta2_scheduler is not None: |
| self._beta2_scheduler.step() |
|
|
| return success, grad_norm |
|
|
| def train(self): |
| """Sets the model to training mode.""" |
| self.training = True |
| self._model.train() |
|
|
| def eval(self): |
| """Sets the model to evaluation mode.""" |
| self.training = False |
| self._model.eval() |
|
|
| def backward(self, loss: torch.Tensor): |
| """ |
| Starts the backward propagation given the loss value computed by a loss function. |
| |
| Args: |
| loss (torch.Tensor): The loss value computed by a loss function. |
| """ |
| return self.optimizer.backward(loss) |
|
|
| def backward_by_grad(self, tensor, grad): |
| """ |
| Starts the backward propagation given the gradient of the output tensor. |
| |
| Args: |
| tensor (torch.Tensor): The output tensor. |
| grad (torch.Tensor): The gradient passed back to the output tensor. |
| """ |
| return self.optimizer.backward_by_grad(tensor, grad) |
|
|
| def __call__(self, *args, **kwargs): |
| """ |
| Runs the forward step for the model. |
| |
| Returns: |
| torch.Tensor: The output of the model. |
| """ |
| return self.model(*args, **kwargs) |
|
|
| def load_batch(self, data_iter, to_gpu=True): |
| """ |
| Loads a batch from the data iterator. It returns the data and labels which are |
| already in the same GPU as where the model is. |
| |
| Args: |
| data_iter (Iterable): The data iterator from which to get a batch of data, obtained by calling |
| iter(dataloader). |
| to_gpu (bool, optional): Whether the data should be moved to the GPU. Default is True. |
| |
| Returns: |
| Tuple (torch.Tensor, torch.Tensor): A tuple of (data, label). |
| """ |
| if data_iter is None: |
| raise RuntimeError("Dataloader is not defined.") |
| try: |
| batch_data = next(data_iter) |
| except TypeError: |
| batch_data = data_iter |
|
|
| if to_gpu: |
| batch_data = move_to_device(batch_data) |
| batch_size = get_batch_size(batch_data) |
|
|
| return batch_data, batch_size |
|
|
|
|
| class KDEngine(Engine): |
| def __init__( |
| self, |
| model: Module, |
| teacher: Module, |
| optimizer: BaseOptimizer, |
| lr_scheduler: Optional[_LRScheduler] = None, |
| beta2_scheduler: Optional[Beta2Scheduler] = None, |
| criterion: Optional[_Loss] = None, |
| kd_criterion: Optional[_Loss] = None, |
| gradient_handlers: Optional[List[BaseGradientHandler]] = None, |
| clip_grad_norm: float = 0.0, |
| ): |
| self._teacher = teacher |
| self._kd_criterion = kd_criterion |
|
|
| super().__init__( |
| model=model, |
| optimizer=optimizer, |
| lr_scheduler=lr_scheduler, |
| beta2_scheduler=beta2_scheduler, |
| criterion=criterion, |
| gradient_handlers=gradient_handlers, |
| clip_grad_norm=clip_grad_norm, |
| ) |
|
|
| @property |
| def teacher(self): |
| """Returns the model attached to the engine.""" |
| return self._teacher |
|
|
| @property |
| def kd_criterion(self): |
| """Returns the model attached to the engine.""" |
| return self._kd_criterion |
|
|