| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
|
|
| import logging |
|
|
| from tqdm import tqdm |
|
|
| logger = logging.getLogger(__name__) |
| logging.basicConfig(level=logging.INFO) |
|
|
| from utils.device_utils import clean_memory_on_device |
|
|
|
|
| def calculate_fp8_maxval(exp_bits=4, mantissa_bits=3, sign_bits=1): |
| """ |
| Calculate the maximum representable value in FP8 format. |
| Default is E4M3 format (4-bit exponent, 3-bit mantissa, 1-bit sign). |
| |
| Args: |
| exp_bits (int): Number of exponent bits |
| mantissa_bits (int): Number of mantissa bits |
| sign_bits (int): Number of sign bits (0 or 1) |
| |
| Returns: |
| float: Maximum value representable in FP8 format |
| """ |
| assert exp_bits + mantissa_bits + sign_bits == 8, "Total bits must be 8" |
|
|
| |
| bias = 2 ** (exp_bits - 1) - 1 |
|
|
| |
| mantissa_max = 1.0 |
| for i in range(mantissa_bits - 1): |
| mantissa_max += 2 ** -(i + 1) |
|
|
| |
| max_value = mantissa_max * (2 ** (2**exp_bits - 1 - bias)) |
|
|
| return max_value |
|
|
|
|
| def quantize_tensor_to_fp8(tensor, scale, exp_bits=4, mantissa_bits=3, sign_bits=1, max_value=None, min_value=None): |
| """ |
| Quantize a tensor to FP8 format. |
| |
| Args: |
| tensor (torch.Tensor): Tensor to quantize |
| scale (float or torch.Tensor): Scale factor |
| exp_bits (int): Number of exponent bits |
| mantissa_bits (int): Number of mantissa bits |
| sign_bits (int): Number of sign bits |
| |
| Returns: |
| tuple: (quantized_tensor, scale_factor) |
| """ |
| |
| scaled_tensor = tensor / scale |
|
|
| |
| bias = 2 ** (exp_bits - 1) - 1 |
|
|
| if max_value is None: |
| |
| max_value = calculate_fp8_maxval(exp_bits, mantissa_bits, sign_bits) |
| min_value = -max_value if sign_bits > 0 else 0.0 |
|
|
| |
| clamped_tensor = torch.clamp(scaled_tensor, min_value, max_value) |
|
|
| |
| abs_values = torch.abs(clamped_tensor) |
| nonzero_mask = abs_values > 0 |
|
|
| |
| log_scales = torch.zeros_like(clamped_tensor) |
| if nonzero_mask.any(): |
| log_scales[nonzero_mask] = torch.floor(torch.log2(abs_values[nonzero_mask]) + bias).detach() |
|
|
| |
| log_scales = torch.clamp(log_scales, min=1.0) |
| quant_factor = 2.0 ** (log_scales - mantissa_bits - bias) |
|
|
| |
| quantized = torch.round(clamped_tensor / quant_factor) * quant_factor |
|
|
| return quantized, scale |
|
|
|
|
| def optimize_state_dict_with_fp8( |
| state_dict, calc_device, target_layer_keys=None, exclude_layer_keys=None, exp_bits=4, mantissa_bits=3, move_to_device=False |
| ): |
| """ |
| Optimize Linear layer weights in a model's state dict to FP8 format. |
| |
| Args: |
| state_dict (dict): State dict to optimize, replaced in-place |
| calc_device (str): Device to quantize tensors on |
| target_layer_keys (list, optional): Layer key patterns to target (None for all Linear layers) |
| exclude_layer_keys (list, optional): Layer key patterns to exclude |
| exp_bits (int): Number of exponent bits |
| mantissa_bits (int): Number of mantissa bits |
| move_to_device (bool): Move optimized tensors to the calculating device |
| |
| Returns: |
| dict: FP8 optimized state dict |
| """ |
| if exp_bits == 4 and mantissa_bits == 3: |
| fp8_dtype = torch.float8_e4m3fn |
| elif exp_bits == 5 and mantissa_bits == 2: |
| fp8_dtype = torch.float8_e5m2 |
| else: |
| raise ValueError(f"Unsupported FP8 format: E{exp_bits}M{mantissa_bits}") |
|
|
| |
| max_value = calculate_fp8_maxval(exp_bits, mantissa_bits) |
| min_value = -max_value |
|
|
| |
| optimized_count = 0 |
|
|
| |
| target_state_dict_keys = [] |
| for key in state_dict.keys(): |
| |
| is_target = (target_layer_keys is None or any(pattern in key for pattern in target_layer_keys)) and key.endswith(".weight") |
| is_excluded = exclude_layer_keys is not None and any(pattern in key for pattern in exclude_layer_keys) |
| is_target = is_target and not is_excluded |
|
|
| if is_target and isinstance(state_dict[key], torch.Tensor): |
| target_state_dict_keys.append(key) |
|
|
| |
| for key in tqdm(target_state_dict_keys): |
| value = state_dict[key] |
|
|
| |
| original_device = value.device |
| original_dtype = value.dtype |
|
|
| |
| if calc_device is not None: |
| value = value.to(calc_device) |
|
|
| |
| scale = torch.max(torch.abs(value.flatten())) / max_value |
| |
|
|
| |
| quantized_weight, _ = quantize_tensor_to_fp8(value, scale, exp_bits, mantissa_bits, 1, max_value, min_value) |
|
|
| |
| fp8_key = key |
| scale_key = key.replace(".weight", ".scale_weight") |
|
|
| quantized_weight = quantized_weight.to(fp8_dtype) |
|
|
| if not move_to_device: |
| quantized_weight = quantized_weight.to(original_device) |
|
|
| scale_tensor = torch.tensor([scale], dtype=original_dtype, device=quantized_weight.device) |
|
|
| state_dict[fp8_key] = quantized_weight |
| state_dict[scale_key] = scale_tensor |
|
|
| optimized_count += 1 |
|
|
| if calc_device is not None: |
| |
| clean_memory_on_device(calc_device) |
|
|
| logger.info(f"Number of optimized Linear layers: {optimized_count}") |
| return state_dict |
|
|
|
|
| def fp8_linear_forward_patch(self: nn.Linear, x, use_scaled_mm=False, max_value=None): |
| """ |
| Patched forward method for Linear layers with FP8 weights. |
| |
| Args: |
| self: Linear layer instance |
| x (torch.Tensor): Input tensor |
| use_scaled_mm (bool): Use scaled_mm for FP8 Linear layers, requires SM 8.9+ (RTX 40 series) |
| max_value (float): Maximum value for FP8 quantization. If None, no quantization is applied for input tensor. |
| |
| Returns: |
| torch.Tensor: Result of linear transformation |
| """ |
| if use_scaled_mm: |
| input_dtype = x.dtype |
| original_weight_dtype = self.scale_weight.dtype |
| weight_dtype = self.weight.dtype |
| target_dtype = torch.float8_e5m2 |
| assert weight_dtype == torch.float8_e4m3fn, "Only FP8 E4M3FN format is supported" |
| assert x.ndim == 3, "Input tensor must be 3D (batch_size, seq_len, hidden_dim)" |
|
|
| if max_value is None: |
| |
| scale_x = torch.tensor(1.0, dtype=torch.float32, device=x.device) |
| else: |
| |
| scale_x = (torch.max(torch.abs(x.flatten())) / max_value).to(torch.float32) |
|
|
| |
| x, _ = quantize_tensor_to_fp8(x, scale_x, 5, 2, 1, max_value, -max_value) |
|
|
| original_shape = x.shape |
| x = x.reshape(-1, x.shape[2]).to(target_dtype) |
|
|
| weight = self.weight.t() |
| scale_weight = self.scale_weight.to(torch.float32) |
|
|
| if self.bias is not None: |
| |
| o = torch._scaled_mm(x, weight, out_dtype=original_weight_dtype, bias=self.bias, scale_a=scale_x, scale_b=scale_weight) |
| else: |
| o = torch._scaled_mm(x, weight, out_dtype=input_dtype, scale_a=scale_x, scale_b=scale_weight) |
|
|
| return o.reshape(original_shape[0], original_shape[1], -1).to(input_dtype) |
|
|
| else: |
| |
| original_dtype = self.scale_weight.dtype |
| dequantized_weight = self.weight.to(original_dtype) * self.scale_weight |
|
|
| |
| if self.bias is not None: |
| output = F.linear(x, dequantized_weight, self.bias) |
| else: |
| output = F.linear(x, dequantized_weight) |
|
|
| return output |
|
|
|
|
| def apply_fp8_monkey_patch(model, optimized_state_dict, use_scaled_mm=False): |
| """ |
| Apply monkey patching to a model using FP8 optimized state dict. |
| |
| Args: |
| model (nn.Module): Model instance to patch |
| optimized_state_dict (dict): FP8 optimized state dict |
| use_scaled_mm (bool): Use scaled_mm for FP8 Linear layers, requires SM 8.9+ (RTX 40 series) |
| |
| Returns: |
| nn.Module: The patched model (same instance, modified in-place) |
| """ |
| |
| |
| max_value = None |
|
|
| |
| scale_keys = [k for k in optimized_state_dict.keys() if k.endswith(".scale_weight")] |
|
|
| |
| patched_module_paths = set() |
| for scale_key in scale_keys: |
| |
| module_path = scale_key.rsplit(".scale_weight", 1)[0] |
| patched_module_paths.add(module_path) |
|
|
| patched_count = 0 |
|
|
| |
| for name, module in model.named_modules(): |
| |
| has_scale = name in patched_module_paths |
|
|
| |
| if isinstance(module, nn.Linear) and has_scale: |
| |
| module.register_buffer("scale_weight", torch.tensor(1.0, dtype=module.weight.dtype)) |
|
|
| |
| def new_forward(self, x): |
| return fp8_linear_forward_patch(self, x, use_scaled_mm, max_value) |
|
|
| |
| module.forward = new_forward.__get__(module, type(module)) |
|
|
| patched_count += 1 |
|
|
| logger.info(f"Number of monkey-patched Linear layers: {patched_count}") |
| return model |
|
|
|
|
| |
| def example_usage(): |
| |
| class TestModel(nn.Module): |
| def __init__(self): |
| super().__init__() |
| fc1 = nn.Linear(768, 3072) |
| act1 = nn.GELU() |
| fc2 = nn.Linear(3072, 768) |
| act2 = nn.GELU() |
| fc3 = nn.Linear(768, 768) |
|
|
| |
| self.single_blocks = nn.ModuleList([fc1, act1, fc2, act2, fc3]) |
|
|
| self.fc4 = nn.Linear(768, 128) |
|
|
| def forward(self, x): |
| for layer in self.single_blocks: |
| x = layer(x) |
| x = self.fc4(x) |
| return x |
|
|
| |
| test_model = TestModel() |
| test_model.to(torch.float16) |
|
|
| |
| test_input = torch.randn(1, 768, dtype=torch.float16) |
|
|
| |
| with torch.no_grad(): |
| original_output = test_model(test_input) |
| print("original output", original_output[0, :5]) |
|
|
| |
| state_dict = test_model.state_dict() |
|
|
| |
| cuda_device = torch.device("cuda") |
| optimized_state_dict = optimize_state_dict_with_fp8(state_dict, cuda_device, ["single_blocks"], ["2"]) |
|
|
| |
| optimized_model = TestModel() |
| optimized_model.to(torch.float16) |
| apply_fp8_monkey_patch(optimized_model, optimized_state_dict) |
|
|
| |
| optimized_model.load_state_dict(optimized_state_dict, strict=True, assign=True) |
|
|
| |
| with torch.no_grad(): |
| optimized_output = optimized_model(test_input) |
| print("optimized output", optimized_output[0, :5]) |
|
|
| |
| error = torch.mean(torch.abs(original_output - optimized_output)) |
| print(f"Mean absolute error: {error.item()}") |
|
|
| |
| original_params = sum(p.nelement() * p.element_size() for p in test_model.parameters()) / (1024 * 1024) |
| print(f"Model parameter memory: {original_params:.2f} MB") |
| optimized_params = sum(p.nelement() * p.element_size() for p in optimized_model.parameters()) / (1024 * 1024) |
| print(f"Optimized model parameter memory: {optimized_params:.2f} MB") |
|
|
| return test_model |
|
|
|
|
| if __name__ == "__main__": |
| example_usage() |
|
|