|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import base64 |
|
import json |
|
import os |
|
from copy import deepcopy |
|
|
|
from torch import optim |
|
|
|
from ..optimizer import AcceleratedOptimizer |
|
from ..scheduler import AcceleratedScheduler |
|
from .dataclasses import DistributedType |
|
from .imports import is_bnb_available |
|
from .versions import compare_versions |
|
|
|
|
|
def map_pytorch_optim_to_deepspeed(optimizer): |
|
""" |
|
Args: |
|
optimizer: torch.optim.Optimizer |
|
|
|
Returns the DeepSeedCPUOptimizer (deepspeed.ops) version of the optimizer. |
|
""" |
|
|
|
defaults = {k: v for k, v in optimizer.defaults.items() if k in ["lr", "weight_decay"]} |
|
|
|
|
|
|
|
from deepspeed.ops.adam import DeepSpeedCPUAdam |
|
|
|
optimizer_class = DeepSpeedCPUAdam |
|
|
|
|
|
if compare_versions("deepspeed", ">=", "0.3.1"): |
|
defaults["adamw_mode"] = False |
|
is_adaw = isinstance(optimizer, optim.AdamW) |
|
|
|
if is_bnb_available() and not is_adaw: |
|
import bitsandbytes.optim as bnb_opt |
|
|
|
if isinstance(optimizer, (bnb_opt.AdamW, bnb_opt.AdamW32bit)): |
|
try: |
|
is_adaw = optimizer.optim_bits == 32 |
|
except AttributeError: |
|
is_adaw = optimizer.args.optim_bits == 32 |
|
else: |
|
is_adaw = False |
|
|
|
if is_adaw: |
|
defaults["adamw_mode"] = True |
|
|
|
|
|
if compare_versions("deepspeed", ">=", "0.5.5"): |
|
|
|
is_ada = isinstance(optimizer, optim.Adagrad) |
|
|
|
|
|
if is_bnb_available() and not is_ada: |
|
import bitsandbytes.optim as bnb_opt |
|
|
|
if isinstance(optimizer, (bnb_opt.Adagrad, bnb_opt.Adagrad32bit)): |
|
try: |
|
is_ada = optimizer.optim_bits == 32 |
|
except AttributeError: |
|
is_ada = optimizer.args.optim_bits == 32 |
|
if is_ada: |
|
from deepspeed.ops.adagrad import DeepSpeedCPUAdagrad |
|
|
|
optimizer_class = DeepSpeedCPUAdagrad |
|
|
|
|
|
if is_bnb_available(min_version="0.38.0") and compare_versions("deepspeed", ">=", "0.11.0"): |
|
from bitsandbytes.optim import Lion, Lion32bit |
|
|
|
if isinstance(optimizer, (Lion, Lion32bit)): |
|
try: |
|
is_bnb_32bits = optimizer.optim_bits == 32 |
|
except AttributeError: |
|
is_bnb_32bits = optimizer.args.optim_bits == 32 |
|
if is_bnb_32bits: |
|
from deepspeed.ops.lion import DeepSpeedCPULion |
|
|
|
optimizer_class = DeepSpeedCPULion |
|
|
|
return optimizer_class(optimizer.param_groups, **defaults) |
|
|
|
|
|
def get_active_deepspeed_plugin(state): |
|
""" |
|
Returns the currently active DeepSpeedPlugin. |
|
|
|
Raises: |
|
ValueError: If DeepSpeed was not enabled and this function is called. |
|
""" |
|
if state.distributed_type != DistributedType.DEEPSPEED: |
|
raise ValueError( |
|
"Couldn't retrieve the active `DeepSpeedPlugin` as none were enabled. " |
|
"Please make sure that either `Accelerator` is configured for `deepspeed` " |
|
"or make sure that the desired `DeepSpeedPlugin` has been enabled (`AcceleratorState().select_deepspeed_plugin(name)`) " |
|
"before calling this function." |
|
) |
|
if not isinstance(state.deepspeed_plugins, dict): |
|
return state.deepspeed_plugins |
|
return next(plugin for plugin in state.deepspeed_plugins.values() if plugin.selected) |
|
|
|
|
|
class HfDeepSpeedConfig: |
|
""" |
|
This object contains a DeepSpeed configuration dictionary and can be quickly queried for things like zero stage. |
|
|
|
A `weakref` of this object is stored in the module's globals to be able to access the config from areas where |
|
things like the Trainer object is not available (e.g. `from_pretrained` and `_get_resized_embeddings`). Therefore |
|
it's important that this object remains alive while the program is still running. |
|
|
|
[`Trainer`] uses the `HfTrainerDeepSpeedConfig` subclass instead. That subclass has logic to sync the configuration |
|
with values of [`TrainingArguments`] by replacing special placeholder values: `"auto"`. Without this special logic |
|
the DeepSpeed configuration is not modified in any way. |
|
|
|
Args: |
|
config_file_or_dict (`Union[str, Dict]`): path to DeepSpeed config file or dict. |
|
|
|
""" |
|
|
|
def __init__(self, config_file_or_dict): |
|
if isinstance(config_file_or_dict, dict): |
|
|
|
|
|
config = deepcopy(config_file_or_dict) |
|
elif os.path.exists(config_file_or_dict): |
|
with open(config_file_or_dict, encoding="utf-8") as f: |
|
config = json.load(f) |
|
else: |
|
try: |
|
try: |
|
|
|
config = json.loads(config_file_or_dict) |
|
except json.JSONDecodeError: |
|
|
|
config_decoded = base64.urlsafe_b64decode(config_file_or_dict).decode("utf-8") |
|
config = json.loads(config_decoded) |
|
except (UnicodeDecodeError, AttributeError, ValueError): |
|
raise ValueError( |
|
f"Expected a string path to an existing deepspeed config, or a dictionary, or a base64 encoded string. Received: {config_file_or_dict}" |
|
) |
|
|
|
self.config = config |
|
|
|
self.set_stage_and_offload() |
|
|
|
def set_stage_and_offload(self): |
|
|
|
|
|
|
|
self._stage = self.get_value("zero_optimization.stage", -1) |
|
|
|
|
|
self._offload = False |
|
if self.is_zero2() or self.is_zero3(): |
|
offload_devices_valid = set(["cpu", "nvme"]) |
|
offload_devices = set( |
|
[ |
|
self.get_value("zero_optimization.offload_optimizer.device"), |
|
self.get_value("zero_optimization.offload_param.device"), |
|
] |
|
) |
|
if len(offload_devices & offload_devices_valid) > 0: |
|
self._offload = True |
|
|
|
def find_config_node(self, ds_key_long): |
|
config = self.config |
|
|
|
|
|
nodes = ds_key_long.split(".") |
|
ds_key = nodes.pop() |
|
for node in nodes: |
|
config = config.get(node) |
|
if config is None: |
|
return None, ds_key |
|
|
|
return config, ds_key |
|
|
|
def get_value(self, ds_key_long, default=None): |
|
""" |
|
Returns the set value or `default` if no value is set |
|
""" |
|
config, ds_key = self.find_config_node(ds_key_long) |
|
if config is None: |
|
return default |
|
return config.get(ds_key, default) |
|
|
|
def del_config_sub_tree(self, ds_key_long, must_exist=False): |
|
""" |
|
Deletes a sub-section of the config file if it's found. |
|
|
|
Unless `must_exist` is `True` the section doesn't have to exist. |
|
""" |
|
config = self.config |
|
|
|
|
|
nodes = ds_key_long.split(".") |
|
for node in nodes: |
|
parent_config = config |
|
config = config.get(node) |
|
if config is None: |
|
if must_exist: |
|
raise ValueError(f"Can't find {ds_key_long} entry in the config: {self.config}") |
|
else: |
|
return |
|
|
|
|
|
if parent_config is not None: |
|
parent_config.pop(node) |
|
|
|
def is_true(self, ds_key_long): |
|
""" |
|
Returns `True`/``False` only if the value is set, always `False` otherwise. So use this method to ask the very |
|
specific question of whether the value is set to `True` (and it's not set to `False`` or isn't set). |
|
|
|
""" |
|
value = self.get_value(ds_key_long) |
|
return False if value is None else bool(value) |
|
|
|
def is_false(self, ds_key_long): |
|
""" |
|
Returns `True`/``False` only if the value is set, always `False` otherwise. So use this method to ask the very |
|
specific question of whether the value is set to `False` (and it's not set to `True`` or isn't set). |
|
""" |
|
value = self.get_value(ds_key_long) |
|
return False if value is None else not bool(value) |
|
|
|
def is_zero2(self): |
|
return self._stage == 2 |
|
|
|
def is_zero3(self): |
|
return self._stage == 3 |
|
|
|
def is_offload(self): |
|
return self._offload |
|
|
|
|
|
class DeepSpeedEngineWrapper: |
|
""" |
|
Internal wrapper for deepspeed.runtime.engine.DeepSpeedEngine. This is used to follow conventional training loop. |
|
|
|
Args: |
|
engine (deepspeed.runtime.engine.DeepSpeedEngine): deepspeed engine to wrap |
|
""" |
|
|
|
def __init__(self, engine): |
|
self.engine = engine |
|
|
|
def backward(self, loss, sync_gradients=True, **kwargs): |
|
|
|
|
|
self.engine.set_gradient_accumulation_boundary(is_boundary=sync_gradients) |
|
|
|
|
|
self.engine.backward(loss, **kwargs) |
|
|
|
|
|
if sync_gradients: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self.engine.step() |
|
|
|
|
|
|
|
|
|
def get_global_grad_norm(self): |
|
"""Get the global gradient norm from DeepSpeed engine.""" |
|
grad_norm = self.engine.get_global_grad_norm() |
|
|
|
if hasattr(grad_norm, "item"): |
|
return grad_norm.item() |
|
return grad_norm |
|
|
|
|
|
class DeepSpeedOptimizerWrapper(AcceleratedOptimizer): |
|
""" |
|
Internal wrapper around a deepspeed optimizer. |
|
|
|
Args: |
|
optimizer (`torch.optim.optimizer.Optimizer`): |
|
The optimizer to wrap. |
|
""" |
|
|
|
def __init__(self, optimizer): |
|
super().__init__(optimizer, device_placement=False, scaler=None) |
|
self.__has_overflow__ = hasattr(self.optimizer, "overflow") |
|
|
|
def zero_grad(self, set_to_none=None): |
|
pass |
|
|
|
def step(self): |
|
pass |
|
|
|
@property |
|
def step_was_skipped(self): |
|
"""Whether or not the optimizer step was done, or skipped because of gradient overflow.""" |
|
if self.__has_overflow__: |
|
return self.optimizer.overflow |
|
return False |
|
|
|
|
|
class DeepSpeedSchedulerWrapper(AcceleratedScheduler): |
|
""" |
|
Internal wrapper around a deepspeed scheduler. |
|
|
|
Args: |
|
scheduler (`torch.optim.lr_scheduler.LambdaLR`): |
|
The scheduler to wrap. |
|
optimizers (one or a list of `torch.optim.Optimizer`): |
|
""" |
|
|
|
def __init__(self, scheduler, optimizers): |
|
super().__init__(scheduler, optimizers) |
|
|
|
def step(self): |
|
pass |
|
|
|
|
|
class DummyOptim: |
|
""" |
|
Dummy optimizer presents model parameters or param groups, this is primarily used to follow conventional training |
|
loop when optimizer config is specified in the deepspeed config file. |
|
|
|
Args: |
|
lr (float): |
|
Learning rate. |
|
params (iterable): iterable of parameters to optimize or dicts defining |
|
parameter groups |
|
weight_decay (float): |
|
Weight decay. |
|
**kwargs (additional keyword arguments, *optional*): |
|
Other arguments. |
|
""" |
|
|
|
def __init__(self, params, lr=0.001, weight_decay=0, **kwargs): |
|
self.params = params |
|
self.lr = lr |
|
self.weight_decay = weight_decay |
|
self.kwargs = kwargs |
|
|
|
|
|
class DummyScheduler: |
|
""" |
|
Dummy scheduler presents model parameters or param groups, this is primarily used to follow conventional training |
|
loop when scheduler config is specified in the deepspeed config file. |
|
|
|
Args: |
|
optimizer (`torch.optim.optimizer.Optimizer`): |
|
The optimizer to wrap. |
|
total_num_steps (int, *optional*): |
|
Total number of steps. |
|
warmup_num_steps (int, *optional*): |
|
Number of steps for warmup. |
|
lr_scheduler_callable (callable, *optional*): |
|
A callable function that creates an LR Scheduler. It accepts only one argument `optimizer`. |
|
**kwargs (additional keyword arguments, *optional*): |
|
Other arguments. |
|
""" |
|
|
|
def __init__(self, optimizer, total_num_steps=None, warmup_num_steps=0, lr_scheduler_callable=None, **kwargs): |
|
self.optimizer = optimizer |
|
self.total_num_steps = total_num_steps |
|
self.warmup_num_steps = warmup_num_steps |
|
self.lr_scheduler_callable = lr_scheduler_callable |
|
self.kwargs = kwargs |
|
|