|
"""Implementation of the paper: |
|
|
|
LLaMA-Adapter: Efficient Fine-tuning of Language Models with Zero-init Attention |
|
https://arxiv.org/abs/2303.16199 |
|
|
|
Port for Lit-GPT |
|
""" |
|
from dataclasses import dataclass |
|
from typing import Any, Dict, List, Optional, Tuple, Union |
|
|
|
import torch |
|
import torch.nn as nn |
|
from typing_extensions import Self |
|
|
|
from lit_gpt.config import Config as BaseConfig |
|
from lit_gpt.model import GPT as BaseModel |
|
from lit_gpt.model import Block as BaseBlock |
|
from lit_gpt.model import CausalSelfAttention as BaseCausalSelfAttention |
|
|
|
|
|
@dataclass |
|
class Config(BaseConfig): |
|
adapter_prompt_length: int = 10 |
|
adapter_start_layer: int = 2 |
|
|
|
|
|
class GPT(BaseModel): |
|
"""The implementation is identical to `lit_gpt.model.GPT` with the exception that |
|
the `Block` saves the layer index and passes it down to the attention layer.""" |
|
|
|
def __init__(self, config: Config) -> None: |
|
nn.Module.__init__(self) |
|
assert config.padded_vocab_size is not None |
|
self.config = config |
|
|
|
self.lm_head = nn.Linear(config.n_embd, config.padded_vocab_size, bias=config.lm_head_bias) |
|
self.transformer = nn.ModuleDict( |
|
dict( |
|
wte=nn.Embedding(config.padded_vocab_size, config.n_embd), |
|
h=nn.ModuleList(Block(config, i) for i in range(config.n_layer)), |
|
ln_f=config.norm_class(config.n_embd, eps=config.norm_eps), |
|
) |
|
) |
|
self.max_seq_length = self.config.block_size |
|
self.mask_cache: Optional[torch.Tensor] = None |
|
|
|
def forward( |
|
self, idx: torch.Tensor, input_pos: Optional[torch.Tensor] = None, lm_head_chunk_size: int = 0 |
|
) -> Union[torch.Tensor, List[torch.Tensor]]: |
|
T = idx.size(1) |
|
if self.max_seq_length < T: |
|
raise ValueError(f"Cannot forward sequence of length {T}, max seq length is only {self.max_seq_length}.") |
|
|
|
if input_pos is not None: |
|
cos = self.cos.index_select(0, input_pos) |
|
sin = self.sin.index_select(0, input_pos) |
|
if self.mask_cache is None: |
|
raise TypeError("You need to call `gpt.set_kv_cache()`") |
|
mask = self.mask_cache.index_select(2, input_pos) |
|
else: |
|
cos = self.cos[:T] |
|
sin = self.sin[:T] |
|
mask = None |
|
|
|
x = self.transformer.wte(idx) |
|
for block in self.transformer.h: |
|
x = block(x, cos, sin, mask, input_pos) |
|
x = self.transformer.ln_f(x) |
|
if lm_head_chunk_size > 0: |
|
|
|
return [self.lm_head(x_i) for x_i in x.split(lm_head_chunk_size, dim=1)] |
|
return self.lm_head(x) |
|
|
|
@classmethod |
|
def from_name(cls, name: str, **kwargs: Any) -> Self: |
|
return cls(Config.from_name(name, **kwargs)) |
|
|
|
def _init_weights(self, module: nn.Module) -> None: |
|
"""Meant to be used with `gpt.apply(gpt._init_weights)`. Unused method left for completeness.""" |
|
super()._init_weights(module) |
|
if isinstance(module, CausalSelfAttention): |
|
module.reset_parameters() |
|
|
|
|
|
class Block(BaseBlock): |
|
"""The implementation is identical to `lit_gpt.model.Block` with the exception that |
|
we replace the attention layer where adaption is implemented.""" |
|
|
|
def __init__(self, config: Config, block_idx: int) -> None: |
|
|
|
nn.Module.__init__(self) |
|
self.norm_1 = config.norm_class(config.n_embd, eps=config.norm_eps) |
|
self.attn = CausalSelfAttention(config, block_idx) |
|
if not config.shared_attention_norm: |
|
self.norm_2 = config.norm_class(config.n_embd, eps=config.norm_eps) |
|
self.mlp = config.mlp_class(config) |
|
|
|
self.config = config |
|
|
|
|
|
class CausalSelfAttention(BaseCausalSelfAttention): |
|
"""A modification of `lit_gpt.model.CausalSelfAttention` that adds the attention |
|
over the adaption prompt.""" |
|
|
|
def __init__(self, config: Config, block_idx: int) -> None: |
|
super().__init__(config) |
|
if block_idx >= config.adapter_start_layer: |
|
|
|
self.adapter_wte = nn.Embedding(config.adapter_prompt_length, config.n_embd) |
|
|
|
self.gating_factor = torch.nn.Parameter(torch.zeros(1, 1, config.n_head, 1)) |
|
|
|
self.adapter_kv_cache: Optional[Tuple[torch.Tensor, torch.Tensor]] = None |
|
self.block_idx = block_idx |
|
|
|
def scaled_dot_product_attention( |
|
self, q: torch.Tensor, k: torch.Tensor, v: torch.Tensor, mask: Optional[torch.Tensor] = None |
|
) -> torch.Tensor: |
|
y = super().scaled_dot_product_attention(q, k, v, mask) |
|
if self.block_idx < self.config.adapter_start_layer: |
|
return y |
|
|
|
aT = self.config.adapter_prompt_length |
|
if self.adapter_kv_cache is not None: |
|
|
|
|
|
ak, av = self.adapter_kv_cache |
|
else: |
|
prefix = self.adapter_wte.weight.reshape(1, aT, self.config.n_embd) |
|
aqkv = self.attn(prefix) |
|
q_per_kv = self.config.n_head // self.config.n_query_groups |
|
aqkv = aqkv.view(1, aT, self.config.n_query_groups, q_per_kv + 2, self.config.head_size) |
|
aqkv = aqkv.permute(0, 2, 3, 1, 4) |
|
_, ak, av = aqkv.split((q_per_kv, 1, 1), dim=2) |
|
if self.config.n_query_groups != 1: |
|
|
|
ak = ak.repeat_interleave(q_per_kv, dim=2) |
|
av = av.repeat_interleave(q_per_kv, dim=2) |
|
ak = ak.view(1, -1, aT, self.config.head_size) |
|
av = av.view(1, -1, aT, self.config.head_size) |
|
self.adapter_kv_cache = (ak, av) |
|
|
|
T = q.size(2) |
|
amask = torch.ones(T, aT, dtype=torch.bool, device=q.device) |
|
ay = super().scaled_dot_product_attention(q, ak, av, amask) |
|
return y + self.gating_factor * ay |
|
|
|
def reset_parameters(self) -> None: |
|
torch.nn.init.zeros_(self.gating_factor) |
|
|
|
def _load_from_state_dict(self, state_dict: Dict, prefix: str, *args: Any, **kwargs: Any) -> None: |
|
"""For compatibility with older checkpoints.""" |
|
if (key := prefix + "gating_factor") in state_dict and state_dict[key].size(1) == self.config.n_head: |
|
state_dict[key] = state_dict[key].permute(0, 2, 1, 3) |
|
super()._load_from_state_dict(state_dict, prefix, *args, **kwargs) |
|
|
|
|
|
def mark_only_adapter_as_trainable(model: GPT) -> None: |
|
"""Sets `requires_grad=False` for all non-adapter weights.""" |
|
for name, param in model.named_parameters(): |
|
param.requires_grad = adapter_filter(name, param) |
|
|
|
|
|
def adapter_filter(key: str, value: Any) -> bool: |
|
return "adapter_wte" in key or "gating_factor" in key |
|
|