# Copyright 2023-present the HuggingFace Inc. team. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import warnings from typing import Any, List, Optional import packaging import torch import transformers from torch import nn from peft.tuners.lora import LoraLayer from peft.tuners.tuners_utils import check_adapters_to_merge from peft.utils import transpose if packaging.version.parse(transformers.__version__) >= packaging.version.parse("4.33.0"): from transformers.integrations import deepspeed_config else: from transformers.deepspeed import deepspeed_config class AdaLoraLayer(LoraLayer): # List all names of layers that may contain adapter weights # Note: ranknum doesn't need to be included as it is not an nn.Module adapter_layer_names = ("lora_A", "lora_B", "lora_E", "lora_embedding_A", "lora_embedding_B") # other_param_names is defined in LoraLayer def __init__(self, base_layer: nn.Module) -> None: super().__init__(base_layer) self.lora_E = nn.ParameterDict({}) self.lora_A = nn.ParameterDict({}) self.lora_B = nn.ParameterDict({}) self.ranknum = nn.ParameterDict({}) def update_layer(self, adapter_name, r, lora_alpha, lora_dropout, init_lora_weights): if r < 0: # note: r == 0 is allowed for AdaLora, see #1539 raise ValueError(f"`r` should be a positive integer or 0, but the value passed is {r}") self.r[adapter_name] = r self.lora_alpha[adapter_name] = lora_alpha if lora_dropout > 0.0: lora_dropout_layer = nn.Dropout(p=lora_dropout) else: lora_dropout_layer = nn.Identity() self.lora_dropout[adapter_name] = lora_dropout_layer # Actual trainable parameters # Right singular vectors self.lora_A[adapter_name] = nn.Parameter(torch.randn(r, self.in_features)) # Singular values self.lora_E[adapter_name] = nn.Parameter(torch.randn(r, 1)) # Left singular vectors self.lora_B[adapter_name] = nn.Parameter(torch.randn(self.out_features, r)) # The current rank self.ranknum[adapter_name] = nn.Parameter(torch.randn(1), requires_grad=False) self.ranknum[adapter_name].data.fill_(float(r)) self.ranknum[adapter_name].requires_grad = False self.scaling[adapter_name] = lora_alpha if lora_alpha > 0 else float(r) if init_lora_weights: self.reset_lora_parameters(adapter_name) if hasattr(self.get_base_layer(), "qweight"): # QuantLinear self.to(self.get_base_layer().qweight.device) else: self.to(self.get_base_layer().weight.device) self.set_adapter(self.active_adapters) def reset_lora_parameters(self, adapter_name): if adapter_name in self.lora_A.keys(): nn.init.normal_(self.lora_E[adapter_name], mean=0.0, std=0.02) nn.init.normal_(self.lora_A[adapter_name], mean=0.0, std=0.02) nn.init.normal_(self.lora_B[adapter_name], mean=0.0, std=0.02) class SVDLinear(nn.Module, AdaLoraLayer): # SVD-based adaptation by a dense layer def __init__( self, base_layer: nn.Module, adapter_name: str, r: int = 0, lora_alpha: int = 1, lora_dropout: float = 0.0, fan_in_fan_out: bool = False, init_lora_weights: bool = True, **kwargs, ) -> None: super().__init__() AdaLoraLayer.__init__(self, base_layer) # Freezing the pre-trained weight matrix self.get_base_layer().weight.requires_grad = False self.fan_in_fan_out = fan_in_fan_out self._active_adapter = adapter_name self.update_layer(adapter_name, r, lora_alpha, lora_dropout, init_lora_weights) def merge(self, safe_merge: bool = False, adapter_names: Optional[List[str]] = None) -> None: """ Merge the active adapter weights into the base weights Args: safe_merge (`bool`, *optional*): If True, the merge operation will be performed in a copy of the original weights and check for NaNs before merging the weights. This is useful if you want to check if the merge operation will produce NaNs. Defaults to `False`. adapter_names (`List[str]`, *optional*): The list of adapter names that should be merged. If None, all active adapters will be merged. Defaults to `None`. """ adapter_names = check_adapters_to_merge(self, adapter_names) if not adapter_names: # no adapter to merge return for active_adapter in adapter_names: base_layer = self.get_base_layer() if active_adapter in self.lora_A.keys(): if safe_merge: # Note that safe_merge will be slower than the normal merge # because of the copy operation. orig_weights = base_layer.weight.data.clone() orig_weights += self.get_delta_weight(active_adapter) if not torch.isfinite(orig_weights).all(): raise ValueError( f"NaNs detected in the merged weights. The adapter {active_adapter} seems to be broken" ) base_layer.weight.data = orig_weights else: base_layer.weight.data += self.get_delta_weight(active_adapter) self.merged_adapters.append(active_adapter) def unmerge(self) -> None: """ This method unmerges all merged adapter layers from the base weights. """ if not self.merged: warnings.warn("Already unmerged. Nothing to do.") return while len(self.merged_adapters) > 0: active_adapter = self.merged_adapters.pop() if active_adapter in self.lora_A.keys(): self.get_base_layer().weight.data -= self.get_delta_weight(active_adapter) def get_delta_weight(self, adapter) -> torch.Tensor: return ( transpose(self.lora_B[adapter] @ (self.lora_A[adapter] * self.lora_E[adapter]), self.fan_in_fan_out) * self.scaling[adapter] / (self.ranknum[adapter] + 1e-5) ) def forward(self, x: torch.Tensor, *args: Any, **kwargs: Any) -> torch.Tensor: if self.disable_adapters: if self.merged: self.unmerge() result = self.base_layer(x, *args, **kwargs) elif self.merged: result = self.base_layer(x, *args, **kwargs) else: result = self.base_layer(x, *args, **kwargs) for active_adapter in self.active_adapters: if active_adapter not in self.lora_A.keys(): continue lora_A = self.lora_A[active_adapter] lora_B = self.lora_B[active_adapter] lora_E = self.lora_E[active_adapter] dropout = self.lora_dropout[active_adapter] scaling = self.scaling[active_adapter] ranknum = self.ranknum[active_adapter] + 1e-5 x = x.to(lora_A.dtype) result += (dropout(x) @ (lora_A * lora_E).T @ lora_B.T) * scaling / ranknum return result def __repr__(self) -> str: rep = super().__repr__() return "adalora." + rep class RankAllocator: """ The RankAllocator for AdaLoraModel. Paper: https://openreview.net/pdf?id=lq62uWRJjiY Args: config ([`AdaLoraConfig`]): The configuration of the AdaLora model. model: the model that we apply AdaLoRA to. """ def __init__(self, model, peft_config, adapter_name): self.peft_config = peft_config self.adapter_name = adapter_name self.beta1 = peft_config.beta1 self.beta2 = peft_config.beta2 assert self.beta1 > 0 and self.beta1 < 1 assert self.beta2 > 0 and self.beta2 < 1 self.reset_ipt() self._set_budget_scheduler(model) def set_total_step(self, total_step): self.peft_config.total_step = total_step def reset_ipt(self): self.ipt = {} self.exp_avg_ipt = {} self.exp_avg_unc = {} def _set_budget_scheduler(self, model): self.init_bgt = 0 self.name_set = set() for n, p in model.named_parameters(): if f"lora_A.{self.adapter_name}" in n: self.init_bgt += p.size(0) self.name_set.add(n.replace("lora_A", "%s")) self.name_set = sorted(self.name_set) # The total final rank budget self.target_bgt = self.peft_config.target_r * len(self.name_set) def budget_schedule(self, step: int): tinit = self.peft_config.tinit tfinal = self.peft_config.tfinal total_step = self.peft_config.total_step # Initial warmup if step <= tinit: budget = self.init_bgt mask_ind = False # Final fine-tuning elif step > total_step - tfinal: budget = self.target_bgt mask_ind = True else: # Budget decreasing with a cubic scheduler mul_coeff = 1 - (step - tinit) / (total_step - tfinal - tinit) budget = int((self.init_bgt - self.target_bgt) * (mul_coeff**3) + self.target_bgt) mask_ind = True if step % self.peft_config.deltaT == 0 else False return budget, mask_ind def update_ipt(self, model): # Update the sensitivity and uncertainty for every weight for n, p in model.named_parameters(): if "lora_" in n and self.adapter_name in n: if n not in self.ipt: self.ipt[n] = torch.zeros_like(p) self.exp_avg_ipt[n] = torch.zeros_like(p) self.exp_avg_unc[n] = torch.zeros_like(p) with torch.no_grad(): if deepspeed_config() is not None: import deepspeed grad = deepspeed.utils.safe_get_full_grad(p) self.ipt[n] = (p * grad).abs().detach() else: self.ipt[n] = (p * p.grad).abs().detach() # Sensitivity smoothing self.exp_avg_ipt[n] = self.beta1 * self.exp_avg_ipt[n] + (1 - self.beta1) * self.ipt[n] # Uncertainty quantification self.exp_avg_unc[n] = ( self.beta2 * self.exp_avg_unc[n] + (1 - self.beta2) * (self.ipt[n] - self.exp_avg_ipt[n]).abs() ) def _element_score(self, n): return self.exp_avg_ipt[n] * self.exp_avg_unc[n] def _combine_ipt(self, ipt_E, ipt_AB): ipt_AB = ipt_AB.sum(dim=1, keepdim=False) sum_ipt = ipt_E.view(-1) + ipt_AB.view(-1) return sum_ipt def mask_to_budget(self, model, budget): value_ipt = {} vector_ipt = {} triplet_ipt = {} # Get the importance score for A, E, B for n, p in model.named_parameters(): if f"lora_A.{self.adapter_name}" in n: entry_ipt = self._element_score(n) comb_ipt = torch.mean(entry_ipt, dim=1, keepdim=True) name_m = n.replace("lora_A", "%s") if name_m not in vector_ipt: vector_ipt[name_m] = [comb_ipt] else: vector_ipt[name_m].append(comb_ipt) if f"lora_B.{self.adapter_name}" in n: entry_ipt = self._element_score(n) comb_ipt = torch.mean(entry_ipt, dim=0, keepdim=False).view(-1, 1) name_m = n.replace("lora_B", "%s") if name_m not in vector_ipt: vector_ipt[name_m] = [comb_ipt] else: vector_ipt[name_m].append(comb_ipt) if f"lora_E.{self.adapter_name}" in n: entry_ipt = self._element_score(n) name_m = n.replace("lora_E", "%s") value_ipt[name_m] = entry_ipt all_score = [] # Calculate the score for each triplet for name_m in vector_ipt: ipt_E = value_ipt[name_m] ipt_AB = torch.cat(vector_ipt[name_m], dim=1) sum_ipt = self._combine_ipt(ipt_E, ipt_AB) name_E = name_m % "lora_E" triplet_ipt[name_E] = sum_ipt.view(-1, 1) all_score.append(sum_ipt.view(-1)) # Get the threshold by ranking ipt mask_threshold = torch.kthvalue( torch.cat(all_score), k=self.init_bgt - budget, )[0].item() rank_pattern = {} # Mask the unimportant triplets with torch.no_grad(): for n, p in model.named_parameters(): if f"lora_E.{self.adapter_name}" in n: p.masked_fill_(triplet_ipt[n] <= mask_threshold, 0.0) rank_pattern[n] = (~(triplet_ipt[n] <= mask_threshold)).view(-1).tolist() return rank_pattern def update_and_allocate(self, model, global_step, force_mask=False): # # Update the importance score and allocate the budget if global_step < self.peft_config.total_step - self.peft_config.tfinal: self.update_ipt(model) budget, mask_ind = self.budget_schedule(global_step) # Allocate the budget according to importance scores if mask_ind or force_mask: rank_pattern = self.mask_to_budget(model, budget) else: rank_pattern = None return budget, rank_pattern def mask_using_rank_pattern(self, model, rank_pattern): # Mask the unimportant triplets is_adapter_name_truncated = False if self.adapter_name not in next(iter(rank_pattern.keys())): is_adapter_name_truncated = True with torch.no_grad(): for n, p in model.named_parameters(): if f"lora_E.{self.adapter_name}" in n: key = n if not is_adapter_name_truncated else n.replace(f".{self.adapter_name}", "") mask = torch.Tensor(rank_pattern[key]).unsqueeze(-1).to(p.device) p.masked_fill_(~mask.bool(), 0.0)