Spaces:
Running
Running
File size: 5,115 Bytes
265ae36 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the Apache License, Version 2.0
# found in the LICENSE file in the root directory of this source tree.
import os
from typing import Any
import torch
import dinov2.distributed as distributed
from functools import partial
from fvcore.common.checkpoint import Checkpointer
from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
from torch.distributed.fsdp import ShardingStrategy
from torch.distributed.fsdp import MixedPrecision
from torch.distributed.fsdp import StateDictType
from torch.distributed.fsdp.sharded_grad_scaler import ShardedGradScaler
from torch.distributed.fsdp.wrap import ModuleWrapPolicy
from torch.distributed.fsdp._runtime_utils import _reshard
def get_fsdp_wrapper(model_cfg, modules_to_wrap=set()):
sharding_strategy_dict = {
"NO_SHARD": ShardingStrategy.NO_SHARD,
"SHARD_GRAD_OP": ShardingStrategy.SHARD_GRAD_OP,
"FULL_SHARD": ShardingStrategy.FULL_SHARD,
}
dtype_dict = {
"fp32": torch.float32,
"fp16": torch.float16,
"bf16": torch.bfloat16,
}
mixed_precision_config = MixedPrecision(
param_dtype=dtype_dict[model_cfg.mixed_precision.param_dtype],
reduce_dtype=dtype_dict[model_cfg.mixed_precision.reduce_dtype],
buffer_dtype=dtype_dict[model_cfg.mixed_precision.buffer_dtype],
)
sharding_strategy_config = sharding_strategy_dict[model_cfg.sharding_strategy]
local_rank = distributed.get_local_rank()
fsdp_wrapper = partial(
FSDP,
sharding_strategy=sharding_strategy_config,
mixed_precision=mixed_precision_config,
device_id=local_rank,
sync_module_states=True,
use_orig_params=True,
auto_wrap_policy=ModuleWrapPolicy(modules_to_wrap),
)
return fsdp_wrapper
def is_fsdp(x):
return isinstance(x, FSDP)
def is_sharded_fsdp(x):
return is_fsdp(x) and x.sharding_strategy is not ShardingStrategy.NO_SHARD
def free_if_fsdp(x):
if is_sharded_fsdp(x):
handles = x._handles
true_list = [True for h in handles]
_reshard(x, handles, true_list)
def get_fsdp_modules(x):
return FSDP.fsdp_modules(x)
def reshard_fsdp_model(x):
for m in get_fsdp_modules(x):
free_if_fsdp(m)
def rankstr():
return f"rank_{distributed.get_global_rank()}"
class FSDPCheckpointer(Checkpointer):
def save(self, name: str, **kwargs: Any) -> None:
"""
Dump model and checkpointables to a file.
Args:
name (str): name of the file.
kwargs (dict): extra arbitrary data to save.
"""
if not self.save_dir or not self.save_to_disk:
return
data = {}
with FSDP.state_dict_type(self.model, StateDictType.LOCAL_STATE_DICT):
data["model"] = self.model.state_dict()
# data["model"] = self.model.state_dict()
for key, obj in self.checkpointables.items():
data[key] = obj.state_dict()
data.update(kwargs)
basename = f"{name}.{rankstr()}.pth"
save_file = os.path.join(self.save_dir, basename)
assert os.path.basename(save_file) == basename, basename
self.logger.info("Saving checkpoint to {}".format(save_file))
with self.path_manager.open(save_file, "wb") as f:
torch.save(data, f)
self.tag_last_checkpoint(basename)
def load(self, *args, **kwargs):
with FSDP.state_dict_type(self.model, StateDictType.LOCAL_STATE_DICT):
return super().load(*args, **kwargs)
def has_checkpoint(self) -> bool:
"""
Returns:
bool: whether a checkpoint exists in the target directory.
"""
save_file = os.path.join(self.save_dir, f"last_checkpoint.{rankstr()}")
return self.path_manager.exists(save_file)
def get_checkpoint_file(self) -> str:
"""
Returns:
str: The latest checkpoint file in target directory.
"""
save_file = os.path.join(self.save_dir, f"last_checkpoint.{rankstr()}")
try:
with self.path_manager.open(save_file, "r") as f:
last_saved = f.read().strip()
except IOError:
# if file doesn't exist, maybe because it has just been
# deleted by a separate process
return ""
# pyre-fixme[6]: For 2nd param expected `Union[PathLike[str], str]` but got
# `Union[bytes, str]`.
return os.path.join(self.save_dir, last_saved)
def tag_last_checkpoint(self, last_filename_basename: str) -> None:
"""
Tag the last checkpoint.
Args:
last_filename_basename (str): the basename of the last filename.
"""
if distributed.is_enabled():
torch.distributed.barrier()
save_file = os.path.join(self.save_dir, f"last_checkpoint.{rankstr()}")
with self.path_manager.open(save_file, "w") as f:
f.write(last_filename_basename) # pyre-ignore
ShardedGradScaler = ShardedGradScaler
|