khoicrtp's picture
init
12001a9
raw
history blame
11 kB
"""Utility functions for training and inference."""
import functools
from pathlib import Path
import pickle
import warnings
from io import BytesIO
import torch
import torch.utils._device
from lightning.fabric.strategies import DeepSpeedStrategy, FSDPStrategy
from torch.distributed.fsdp import FullStateDictConfig
from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
from torch.distributed.fsdp import StateDictType
llama_model_sizes = {
4096: "7B", # 7B n_embd=4096
5120: "13B", # 13B n_embd=5120
6656: "30B", # 30B n_embd=6656
8192: "65B", # 65B n_embd=8192
}
def llama_model_lookup(checkpoint: dict) -> str:
"""Returns the LLaMA model name from the checkpoint.
Checks the width of the lm_head.weight matrix, as these uniquely identify the model.
"""
embedding_size = checkpoint["lm_head.weight"].shape[1]
return llama_model_sizes[embedding_size]
def find_multiple(n: int, k: int) -> int:
if n % k == 0:
return n
return n + k - (n % k)
def save_model_checkpoint(fabric, model, file_path):
"""Handles boilerplate logic for retrieving and saving the state_dict.
This will be upstreamed to Fabric soon.
"""
file_path = Path(file_path)
if isinstance(fabric.strategy, DeepSpeedStrategy):
from deepspeed.utils.zero_to_fp32 import convert_zero_checkpoint_to_fp32_state_dict
fabric.save(file_path, {"model": model})
fabric.barrier()
if fabric.global_rank == 0:
# Create a consolidated checkpoint with the same name next to the deepspeed checkpoint
convert_zero_checkpoint_to_fp32_state_dict(file_path, file_path.with_suffix(".pth"))
return
if isinstance(fabric.strategy, FSDPStrategy):
save_policy = FullStateDictConfig(offload_to_cpu=(fabric.world_size > 1), rank0_only=True)
with FSDP.state_dict_type(model, StateDictType.FULL_STATE_DICT, save_policy):
state_dict = model._forward_module.state_dict()
else:
state_dict = model.state_dict()
if fabric.global_rank == 0:
torch.save(state_dict, file_path)
fabric.barrier()
class EmptyInitOnDevice(torch.overrides.TorchFunctionMode):
def __init__(self, device=None, dtype=None, quantization_mode=None):
"""
Create tensors with given device and dtype and don't run initialization
(but instead use "empty tensors", i.e. uninitialized memory).
device: `torch.device` to work with
dtype: `torch.dtype` to work with
quantization_mode: optional string, quantization mode to work with, default `None`.
Available modes: `llm.int8` bitsnbytes LLM.int8 quantization (only on GPU)
`qptq.int4`, `gptq.int8`: GPTQ pre-quantized models
Example::
with EmptyInitOnDevice("cuda", dtype=torch.bfloat16):
model = LLaMA.from_name('7B')
model.load_state_dict(torch.load('llama-lit/7B/lit-llama.pth'))"""
self.quantization_mode = quantization_mode
self.quantized_linear_cls = None
if self.quantization_mode == 'llm.int8':
if device.type != "cuda":
raise ValueError("Quantization is only supported on the GPU.")
from .quantization import Linear8bitLt
self.quantized_linear_cls = Linear8bitLt
elif self.quantization_mode == 'gptq.int4':
from .quantization import ColBlockQuantizedLinear
self.quantized_linear_cls = functools.partial(ColBlockQuantizedLinear, bits=4, tile_cols=-1)
elif self.quantization_mode == 'gptq.int8':
from .quantization import ColBlockQuantizedLinear
self.quantized_linear_cls = functools.partial(ColBlockQuantizedLinear, bits=8, tile_cols=-1)
elif self.quantization_mode is not None:
raise RuntimeError(f"unknown quantization mode {self.quantization_mode}")
self.device = device
self.dtype = dtype
def __enter__(self):
if self.quantized_linear_cls != None:
self.torch_linear_cls = torch.nn.Linear
torch.nn.Linear = self.quantized_linear_cls
return super().__enter__()
def __exit__(self, exc_type, exc_val, exc_tb):
if self.quantized_linear_cls != None:
torch.nn.Linear = self.torch_linear_cls
return super().__exit__(exc_type, exc_val, exc_tb)
def __torch_function__(self, func, types, args=(), kwargs=None):
kwargs = kwargs or {}
if getattr(func, "__module__", None) == "torch.nn.init":
if "tensor" in kwargs:
return kwargs["tensor"]
else:
return args[0]
if (
self.device is not None
and func in torch.utils._device._device_constructors()
and kwargs.get("device") is None
):
kwargs["device"] = self.device
if (
self.dtype is not None
and func in torch.utils._device._device_constructors()
and kwargs.get("dtype") is None
):
kwargs["dtype"] = self.dtype
return func(*args, **kwargs)
# this is taken from torchhacks https://github.com/lernapparat/torchhacks
class NotYetLoadedTensor:
def __init__(self, metatensor, archiveinfo, storageinfo, rebuild_args):
self.metatensor = metatensor
self.archiveinfo = archiveinfo
self.storageinfo = storageinfo
self.rebuild_args = rebuild_args
@classmethod
def rebuild_from_type_v2(cls, func, new_type, args, state, *, archiveinfo=None):
ret = func(*args)
if isinstance(ret, NotYetLoadedTensor):
old_lt = ret._load_tensor
def _load_tensor():
t = old_lt()
return torch._tensor._rebuild_from_type_v2(
lambda: t, new_type, (), state
)
ret._load_tensor = _load_tensor
return ret
return torch._tensor._rebuild_from_type_v2(func, new_type, args, state)
@classmethod
def rebuild_parameter(
cls, data, requires_grad, backward_hooks, *, archiveinfo=None
):
if isinstance(data, NotYetLoadedTensor):
old_lt = data._load_tensor
def _load_tensor():
t = old_lt()
return torch._utils._rebuild_parameter(t, requires_grad, backward_hooks)
data._load_tensor = _load_tensor
return data
return torch._utils._rebuild_parameter(data, requires_grad, backward_hooks)
@classmethod
def rebuild_tensor_v2(
cls,
storage,
storage_offset,
size,
stride,
requires_grad,
backward_hooks,
metadata=None,
*,
archiveinfo=None,
):
rebuild_args = (
storage_offset,
size,
stride,
requires_grad,
backward_hooks,
metadata,
)
metatensor = torch._utils._rebuild_tensor_v2(
storage,
storage_offset,
size,
stride,
requires_grad,
backward_hooks,
metadata,
)
storageinfo = storage.archiveinfo
return NotYetLoadedTensor(metatensor, archiveinfo, storageinfo, rebuild_args)
def _load_tensor(self):
name, storage_cls, fn, device, size = self.storageinfo
dtype = self.metatensor.dtype
uts = (
self.archiveinfo.zipfile_context.zf.get_storage_from_record(
f"data/{fn}",
size * torch._utils._element_size(dtype),
torch.UntypedStorage,
)
._typed_storage()
._untyped_storage
)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
storage = torch.storage.TypedStorage(
wrap_storage=uts, dtype=self.metatensor.dtype, _internal=True
)
tensor = torch._utils._rebuild_tensor_v2(storage, *self.rebuild_args)
return tensor
@classmethod
def __torch_function__(cls, func, types, args=(), kwargs=None):
if kwargs is None:
kwargs = {}
loaded_args = [
(a._load_tensor() if isinstance(a, NotYetLoadedTensor) else a) for a in args
]
res = func(*loaded_args, **kwargs)
# gc.collect would be costly here, maybe do it optionally
return res
def __getattr__(self, name):
# properties
## TODO: device, is_...??
## TODO: mH, mT, H, T, data, imag, real
## name ???
if name in {
"dtype",
"grad",
"grad_fn",
"layout",
"names",
"ndim",
"output_nr",
"requires_grad",
"retains_grad",
"shape",
"volatile",
}:
return getattr(self.metatensor, name)
if name in {"size"}:
return getattr(self.metatensor, name)
# materializing with contiguous is needed for quantization
if name in {"contiguous"}:
return getattr(self._load_tensor(), name)
raise AttributeError(f"{type(self)} does not have {name}")
def __repr__(self):
return f"NotYetLoadedTensor({repr(self.metatensor)})"
class LazyLoadingUnpickler(pickle.Unpickler):
def __init__(self, file, zipfile_context):
super().__init__(file)
self.zipfile_context = zipfile_context
def find_class(self, module, name):
res = super().find_class(module, name)
if module == "torch._utils" and name == "_rebuild_tensor_v2":
return functools.partial(
NotYetLoadedTensor.rebuild_tensor_v2, archiveinfo=self
)
elif module == "torch._tensor" and name == "_rebuild_from_type_v2":
return functools.partial(
NotYetLoadedTensor.rebuild_from_type_v2, archiveinfo=self
)
elif module == "torch._utils" and name == "_rebuild_parameter":
return functools.partial(
NotYetLoadedTensor.rebuild_parameter, archiveinfo=self
)
return res
def persistent_load(self, pid):
name, cls, fn, device, size = pid
with warnings.catch_warnings():
warnings.simplefilter("ignore")
s = torch.storage.TypedStorage(dtype=cls().dtype, device="meta")
s.archiveinfo = pid
return s
class lazy_load:
def __init__(self, fn):
self.zf = torch._C.PyTorchFileReader(str(fn))
with BytesIO(self.zf.get_record("data.pkl")) as pkl:
mup = LazyLoadingUnpickler(pkl, self)
self.sd = mup.load()
def __enter__(self):
return self.sd
def __exit__(self, exc_type, exc_val, exc_tb):
del self.zf # I don't think there is a way to force closing...
self.zf = None