"""Utility functions for training and inference.""" import functools from pathlib import Path import pickle import warnings from io import BytesIO import torch import torch.utils._device from lightning.fabric.strategies import DeepSpeedStrategy, FSDPStrategy from torch.distributed.fsdp import FullStateDictConfig from torch.distributed.fsdp import FullyShardedDataParallel as FSDP from torch.distributed.fsdp import StateDictType llama_model_sizes = { 4096: "7B", # 7B n_embd=4096 5120: "13B", # 13B n_embd=5120 6656: "30B", # 30B n_embd=6656 8192: "65B", # 65B n_embd=8192 } def llama_model_lookup(checkpoint: dict) -> str: """Returns the LLaMA model name from the checkpoint. Checks the width of the lm_head.weight matrix, as these uniquely identify the model. """ embedding_size = checkpoint["lm_head.weight"].shape[1] return llama_model_sizes[embedding_size] def find_multiple(n: int, k: int) -> int: if n % k == 0: return n return n + k - (n % k) def save_model_checkpoint(fabric, model, file_path): """Handles boilerplate logic for retrieving and saving the state_dict. This will be upstreamed to Fabric soon. """ file_path = Path(file_path) if isinstance(fabric.strategy, DeepSpeedStrategy): from deepspeed.utils.zero_to_fp32 import convert_zero_checkpoint_to_fp32_state_dict fabric.save(file_path, {"model": model}) fabric.barrier() if fabric.global_rank == 0: # Create a consolidated checkpoint with the same name next to the deepspeed checkpoint convert_zero_checkpoint_to_fp32_state_dict(file_path, file_path.with_suffix(".pth")) return if isinstance(fabric.strategy, FSDPStrategy): save_policy = FullStateDictConfig(offload_to_cpu=(fabric.world_size > 1), rank0_only=True) with FSDP.state_dict_type(model, StateDictType.FULL_STATE_DICT, save_policy): state_dict = model._forward_module.state_dict() else: state_dict = model.state_dict() if fabric.global_rank == 0: torch.save(state_dict, file_path) fabric.barrier() class EmptyInitOnDevice(torch.overrides.TorchFunctionMode): def __init__(self, device=None, dtype=None, quantization_mode=None): """ Create tensors with given device and dtype and don't run initialization (but instead use "empty tensors", i.e. uninitialized memory). device: `torch.device` to work with dtype: `torch.dtype` to work with quantization_mode: optional string, quantization mode to work with, default `None`. Available modes: `llm.int8` bitsnbytes LLM.int8 quantization (only on GPU) `qptq.int4`, `gptq.int8`: GPTQ pre-quantized models Example:: with EmptyInitOnDevice("cuda", dtype=torch.bfloat16): model = LLaMA.from_name('7B') model.load_state_dict(torch.load('llama-lit/7B/lit-llama.pth'))""" self.quantization_mode = quantization_mode self.quantized_linear_cls = None if self.quantization_mode == 'llm.int8': if device.type != "cuda": raise ValueError("Quantization is only supported on the GPU.") from .quantization import Linear8bitLt self.quantized_linear_cls = Linear8bitLt elif self.quantization_mode == 'gptq.int4': from .quantization import ColBlockQuantizedLinear self.quantized_linear_cls = functools.partial(ColBlockQuantizedLinear, bits=4, tile_cols=-1) elif self.quantization_mode == 'gptq.int8': from .quantization import ColBlockQuantizedLinear self.quantized_linear_cls = functools.partial(ColBlockQuantizedLinear, bits=8, tile_cols=-1) elif self.quantization_mode is not None: raise RuntimeError(f"unknown quantization mode {self.quantization_mode}") self.device = device self.dtype = dtype def __enter__(self): if self.quantized_linear_cls != None: self.torch_linear_cls = torch.nn.Linear torch.nn.Linear = self.quantized_linear_cls return super().__enter__() def __exit__(self, exc_type, exc_val, exc_tb): if self.quantized_linear_cls != None: torch.nn.Linear = self.torch_linear_cls return super().__exit__(exc_type, exc_val, exc_tb) def __torch_function__(self, func, types, args=(), kwargs=None): kwargs = kwargs or {} if getattr(func, "__module__", None) == "torch.nn.init": if "tensor" in kwargs: return kwargs["tensor"] else: return args[0] if ( self.device is not None and func in torch.utils._device._device_constructors() and kwargs.get("device") is None ): kwargs["device"] = self.device if ( self.dtype is not None and func in torch.utils._device._device_constructors() and kwargs.get("dtype") is None ): kwargs["dtype"] = self.dtype return func(*args, **kwargs) # this is taken from torchhacks https://github.com/lernapparat/torchhacks class NotYetLoadedTensor: def __init__(self, metatensor, archiveinfo, storageinfo, rebuild_args): self.metatensor = metatensor self.archiveinfo = archiveinfo self.storageinfo = storageinfo self.rebuild_args = rebuild_args @classmethod def rebuild_from_type_v2(cls, func, new_type, args, state, *, archiveinfo=None): ret = func(*args) if isinstance(ret, NotYetLoadedTensor): old_lt = ret._load_tensor def _load_tensor(): t = old_lt() return torch._tensor._rebuild_from_type_v2( lambda: t, new_type, (), state ) ret._load_tensor = _load_tensor return ret return torch._tensor._rebuild_from_type_v2(func, new_type, args, state) @classmethod def rebuild_parameter( cls, data, requires_grad, backward_hooks, *, archiveinfo=None ): if isinstance(data, NotYetLoadedTensor): old_lt = data._load_tensor def _load_tensor(): t = old_lt() return torch._utils._rebuild_parameter(t, requires_grad, backward_hooks) data._load_tensor = _load_tensor return data return torch._utils._rebuild_parameter(data, requires_grad, backward_hooks) @classmethod def rebuild_tensor_v2( cls, storage, storage_offset, size, stride, requires_grad, backward_hooks, metadata=None, *, archiveinfo=None, ): rebuild_args = ( storage_offset, size, stride, requires_grad, backward_hooks, metadata, ) metatensor = torch._utils._rebuild_tensor_v2( storage, storage_offset, size, stride, requires_grad, backward_hooks, metadata, ) storageinfo = storage.archiveinfo return NotYetLoadedTensor(metatensor, archiveinfo, storageinfo, rebuild_args) def _load_tensor(self): name, storage_cls, fn, device, size = self.storageinfo dtype = self.metatensor.dtype uts = ( self.archiveinfo.zipfile_context.zf.get_storage_from_record( f"data/{fn}", size * torch._utils._element_size(dtype), torch.UntypedStorage, ) ._typed_storage() ._untyped_storage ) with warnings.catch_warnings(): warnings.simplefilter("ignore") storage = torch.storage.TypedStorage( wrap_storage=uts, dtype=self.metatensor.dtype, _internal=True ) tensor = torch._utils._rebuild_tensor_v2(storage, *self.rebuild_args) return tensor @classmethod def __torch_function__(cls, func, types, args=(), kwargs=None): if kwargs is None: kwargs = {} loaded_args = [ (a._load_tensor() if isinstance(a, NotYetLoadedTensor) else a) for a in args ] res = func(*loaded_args, **kwargs) # gc.collect would be costly here, maybe do it optionally return res def __getattr__(self, name): # properties ## TODO: device, is_...?? ## TODO: mH, mT, H, T, data, imag, real ## name ??? if name in { "dtype", "grad", "grad_fn", "layout", "names", "ndim", "output_nr", "requires_grad", "retains_grad", "shape", "volatile", }: return getattr(self.metatensor, name) if name in {"size"}: return getattr(self.metatensor, name) # materializing with contiguous is needed for quantization if name in {"contiguous"}: return getattr(self._load_tensor(), name) raise AttributeError(f"{type(self)} does not have {name}") def __repr__(self): return f"NotYetLoadedTensor({repr(self.metatensor)})" class LazyLoadingUnpickler(pickle.Unpickler): def __init__(self, file, zipfile_context): super().__init__(file) self.zipfile_context = zipfile_context def find_class(self, module, name): res = super().find_class(module, name) if module == "torch._utils" and name == "_rebuild_tensor_v2": return functools.partial( NotYetLoadedTensor.rebuild_tensor_v2, archiveinfo=self ) elif module == "torch._tensor" and name == "_rebuild_from_type_v2": return functools.partial( NotYetLoadedTensor.rebuild_from_type_v2, archiveinfo=self ) elif module == "torch._utils" and name == "_rebuild_parameter": return functools.partial( NotYetLoadedTensor.rebuild_parameter, archiveinfo=self ) return res def persistent_load(self, pid): name, cls, fn, device, size = pid with warnings.catch_warnings(): warnings.simplefilter("ignore") s = torch.storage.TypedStorage(dtype=cls().dtype, device="meta") s.archiveinfo = pid return s class lazy_load: def __init__(self, fn): self.zf = torch._C.PyTorchFileReader(str(fn)) with BytesIO(self.zf.get_record("data.pkl")) as pkl: mup = LazyLoadingUnpickler(pkl, self) self.sd = mup.load() def __enter__(self): return self.sd def __exit__(self, exc_type, exc_val, exc_tb): del self.zf # I don't think there is a way to force closing... self.zf = None