Bordoglor's picture
Upload folder using huggingface_hub
92c1c00 verified
# Copyright 2023-present the HuggingFace Inc. team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from functools import wraps
import huggingface_hub
import pytest
import torch
from safetensors.torch import load_file
from transformers import AutoModelForCausalLM, AutoTokenizer
from peft import LoraConfig, PeftType, TaskType, XLoraConfig, get_peft_model
from peft.peft_model import PeftModel
from peft.tuners.xlora.layer import XLoraLayer
from peft.utils import infer_device
def flaky(num_tries: int):
"""Decorator for test functions that are flaky"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for _ in range(num_tries):
try:
return func(*args, **kwargs)
except AssertionError as e:
print(f"Failed test {func.__name__} with error: {e}")
continue
raise AssertionError(f"Failed test {func.__name__} after {num_tries} tries")
return wrapper
return decorator
class TestXlora:
torch_device = infer_device()
model_id = "facebook/opt-125m"
num_loras = 4
@pytest.fixture(scope="class")
def lora_dir(self, tmp_path_factory):
return tmp_path_factory.mktemp("lora")
@pytest.fixture(scope="class")
def lora_embedding_dir(self, tmp_path_factory):
return tmp_path_factory.mktemp("lora_embedding")
@pytest.fixture(scope="class")
def saved_lora_adapters(self, lora_dir):
file_names = []
lora_configs = [
LoraConfig(task_type="CAUSAL_LM", target_modules=["q_proj", "v_proj"], init_lora_weights=False)
for _ in range(self.num_loras)
]
# have 1 LoRA with different target modules
lora_configs[-1] = LoraConfig(
task_type="CAUSAL_LM", target_modules=["k_proj", "q_proj", "v_proj"], init_lora_weights=False
)
for i, lora_config in enumerate(lora_configs, start=1):
torch.manual_seed(i)
model = AutoModelForCausalLM.from_pretrained(self.model_id)
peft_model = get_peft_model(model, lora_config)
file_name = os.path.join(lora_dir, f"checkpoint-{i}")
peft_model.save_pretrained(file_name)
file_names.append(file_name)
return file_names
@pytest.fixture(scope="class")
def saved_lora_embedding_adapters(self, lora_embedding_dir):
file_names = []
for i in range(1, self.num_loras + 1):
torch.manual_seed(i)
lora_config = LoraConfig(task_type="CAUSAL_LM", init_lora_weights=False, target_modules=["embed_tokens"])
model = AutoModelForCausalLM.from_pretrained(self.model_id)
peft_model = get_peft_model(model, lora_config)
file_name = os.path.join(lora_embedding_dir, f"checkpoint-{i}")
peft_model.save_pretrained(file_name)
file_names.append(file_name)
return file_names
@pytest.fixture(scope="class")
def tokenizer(self):
tokenizer = AutoTokenizer.from_pretrained(self.model_id, trust_remote_code=True, device_map=self.torch_device)
return tokenizer
@pytest.fixture(scope="function")
def embedding_model(self, saved_lora_embedding_adapters):
model = AutoModelForCausalLM.from_pretrained(self.model_id)
model.config.use_cache = False
adapters = {str(i): file_name for i, file_name in enumerate(saved_lora_embedding_adapters)}
peft_config = XLoraConfig(
task_type=TaskType.CAUSAL_LM,
peft_type=PeftType.XLORA,
hidden_size=model.config.hidden_size,
xlora_depth=8,
adapters=adapters,
)
model = get_peft_model(model, peft_config).to(self.torch_device)
return model
@pytest.fixture(scope="function")
def model(self, saved_lora_adapters):
model = AutoModelForCausalLM.from_pretrained(self.model_id)
model.config.use_cache = False
adapters = {str(i): file_name for i, file_name in enumerate(saved_lora_adapters)}
peft_config = XLoraConfig(
task_type=TaskType.CAUSAL_LM,
peft_type=PeftType.XLORA,
hidden_size=model.config.hidden_size,
xlora_depth=8,
adapters=adapters,
)
model = get_peft_model(model, peft_config).to(self.torch_device)
return model
@pytest.fixture(scope="function")
def model_layerwise(self, saved_lora_adapters):
model = AutoModelForCausalLM.from_pretrained(self.model_id)
model.config.use_cache = False
adapters = {str(i): file_name for i, file_name in enumerate(saved_lora_adapters)}
peft_config = XLoraConfig(
task_type=TaskType.CAUSAL_LM,
peft_type=PeftType.XLORA,
hidden_size=model.config.hidden_size,
xlora_depth=8,
adapters=adapters,
layerwise_scalings=True,
)
model = get_peft_model(model, peft_config).to(self.torch_device)
return model
def test_functional(self, tokenizer, model):
model.enable_scalings_logging()
inputs = tokenizer.encode("Python is a", add_special_tokens=False, return_tensors="pt")
outputs = model.generate(
input_ids=inputs.to(self.torch_device),
max_new_tokens=32,
)
assert torch.isfinite(outputs[: inputs.shape[1] :]).all()
def test_forward_hooks_are_cleaned_up(self, tokenizer, model):
# There was an issue that forward hooks would accumulate during generation, since one hook per forward step was
# being registered and generate would call forward multiple times. This is already undesirable, but to make it
# worse, only the last hook was removed, resulting in hooks accumulating.
# See https://github.com/huggingface/peft/issues/1472#issuecomment-3235817807
inputs = tokenizer.encode("Python is a", add_special_tokens=False, return_tensors="pt")
model.generate(input_ids=inputs.to(self.torch_device), max_new_tokens=10)
num_hooks_gen1 = len(model.base_model.model.model.decoder.layers[0].self_attn.k_proj._forward_pre_hooks)
model.generate(input_ids=inputs.to(self.torch_device), max_new_tokens=10)
num_hooks_gen2 = len(model.base_model.model.model.decoder.layers[0].self_attn.k_proj._forward_pre_hooks)
assert num_hooks_gen1 == num_hooks_gen2 == 0
def test_scalings_logging_methods(self, tokenizer, model):
model.enable_scalings_logging()
inputs = tokenizer.encode("Python is a", add_special_tokens=False, return_tensors="pt")
outputs = model.generate(
input_ids=inputs.to(self.torch_device),
max_new_tokens=32,
)
assert torch.isfinite(outputs[: inputs.shape[1] :]).all()
_ = model.get_latest_scalings()
# 32 is the numeber of max scalings. 3 is the number of prompt tokens.
assert 32 + 3 >= len(model.get_scalings_log()) > 0
model.disable_scalings_logging()
inputs = tokenizer.encode("Python is a", add_special_tokens=False, return_tensors="pt")
outputs = model.generate(
input_ids=inputs.to(self.torch_device),
max_new_tokens=32,
)
assert torch.isfinite(outputs[: inputs.shape[1] :]).all()
assert 32 >= len(model.get_scalings_log()) > 0
bucketed = model.get_bucketed_scalings_log()
keys = bucketed.keys()
# Once bucket for each token as we aren't using cache
assert len(bucketed) == 32 == len(keys)
seq_len = inputs.shape[1]
for key in keys:
assert len(bucketed[key][0]) == 1
assert len(bucketed[key][1]) == 1
assert bucketed[key][0][0] == key - seq_len
model.clear_scalings_log()
assert len(model.get_scalings_log()) == 0
def test_misc_methods(self, tokenizer, model):
model.set_global_scaling_weight(1.5)
assert model.internal_xlora_classifier.config.global_scaling_weight == 1.5
assert model.get_global_scaling_weight() == 1.5
inputs = tokenizer.encode("Python is a", add_special_tokens=False, return_tensors="pt")
outputs = model.generate(
input_ids=inputs.to(self.torch_device),
max_new_tokens=32,
)
assert torch.isfinite(outputs[: inputs.shape[1] :]).all()
assert str(model) is not None
# On CI (but not locally), this test is flaky since transformers v4.45.0.
@flaky(num_tries=5)
def test_save_load_functional(self, tokenizer, model, tmp_path):
inputs = tokenizer.encode("Python is a", add_special_tokens=False, return_tensors="pt")
outputs = model.generate(
input_ids=inputs.to(self.torch_device),
max_new_tokens=32,
)
before_logits = outputs[: inputs.shape[1] :]
assert torch.isfinite(before_logits).all()
model.save_pretrained(save_directory=tmp_path)
del model
model = AutoModelForCausalLM.from_pretrained(self.model_id)
model.config.use_cache = False
model = PeftModel.from_pretrained(model=model, model_id=tmp_path).to(self.torch_device)
inputs = tokenizer.encode("Python is a", add_special_tokens=False, return_tensors="pt")
outputs = model.generate(
input_ids=inputs.to(self.torch_device),
max_new_tokens=32,
)
after_logits = outputs[: inputs.shape[1] :]
assert torch.isfinite(after_logits).all()
assert torch.equal(after_logits, before_logits)
def test_save_load_functional_pt(self, tokenizer, model, tmp_path):
inputs = tokenizer.encode("Python is a", add_special_tokens=False, return_tensors="pt")
outputs = model.generate(
input_ids=inputs.to(self.torch_device),
max_new_tokens=32,
)
before_logits = outputs[: inputs.shape[1] :]
assert torch.isfinite(before_logits).all()
model.save_pretrained(save_directory=tmp_path, safe_serialization=False)
del model
model = AutoModelForCausalLM.from_pretrained(self.model_id)
model.config.use_cache = False
model = PeftModel.from_pretrained(model=model, model_id=tmp_path, safe_serialization=False).to(
self.torch_device
)
inputs = tokenizer.encode("Python is a", add_special_tokens=False, return_tensors="pt")
outputs = model.generate(
input_ids=inputs.to(self.torch_device),
max_new_tokens=32,
)
after_logits = outputs[: inputs.shape[1] :]
assert torch.isfinite(after_logits).all()
assert torch.equal(after_logits, before_logits), (after_logits, before_logits)
def test_topk_lora(self, tokenizer, model):
model.set_topk_lora(2)
assert model.internal_xlora_classifier.config.top_k_lora == 2
inputs = tokenizer.encode("Python is a", add_special_tokens=False, return_tensors="pt")
outputs = model.generate(
input_ids=inputs.to(self.torch_device),
max_new_tokens=32,
)
assert torch.isfinite(outputs[: inputs.shape[1] :]).all()
def test_softmax_topk(self, tokenizer, model):
# Just reach in to set the config
model.internal_xlora_classifier.config.top_k_lora = 2
model.internal_xlora_classifier.config.enable_softmax = False
model.internal_xlora_classifier.config.enable_softmax_topk = True
inputs = tokenizer.encode("Python is a", add_special_tokens=False, return_tensors="pt")
outputs = model.generate(
input_ids=inputs.to(self.torch_device),
max_new_tokens=32,
)
assert torch.isfinite(outputs[: inputs.shape[1] :]).all()
def test_set_override_scaling_pass_value(self, model):
# Defaults to 0
assert model.internal_xlora_classifier.override_scaling_pass_value == 0.0
# Set it to 2 and make sure it actually is
model.set_scaling_pass_value(2)
assert model.internal_xlora_classifier.override_scaling_pass_value == 2
assert model.internal_xlora_classifier.config.scaling_pass_value == 2
# Set it to None and make sure it is 1/n
model.set_scaling_pass_value(None)
assert model.internal_xlora_classifier.override_scaling_pass_value == 1 / self.num_loras
assert model.internal_xlora_classifier.config.scaling_pass_value == 1 / self.num_loras
def test_functional_layerwise(self, tokenizer, model_layerwise):
model_layerwise.enable_scalings_logging()
inputs = tokenizer.encode("Python is a", add_special_tokens=False, return_tensors="pt")
outputs = model_layerwise.generate(
input_ids=inputs.to(self.torch_device),
max_new_tokens=32,
)
assert torch.isfinite(outputs[: inputs.shape[1] :]).all()
def test_disable_adapter(self, tokenizer, model):
model.enable_scalings_logging()
inputs = tokenizer.encode("Python is a", add_special_tokens=False, return_tensors="pt")
with model.disable_adapter():
outputs_disabled = model.generate(
input_ids=inputs.to(self.torch_device),
max_new_tokens=32,
)
outputs = model.generate(
input_ids=inputs.to(self.torch_device),
max_new_tokens=32,
)
assert torch.isfinite(outputs_disabled[: inputs.shape[1] :]).all()
assert torch.isfinite(outputs[: inputs.shape[1] :]).all()
assert not torch.equal(outputs, outputs_disabled)
def test_functional_embedding(self, tokenizer, embedding_model):
inputs = tokenizer.encode("Python is a", add_special_tokens=False, return_tensors="pt")
outputs = embedding_model.generate(
input_ids=inputs.to(self.torch_device),
max_new_tokens=32,
)
assert torch.isfinite(outputs[: inputs.shape[1] :]).all()
def test_xlora_loading_valid(self):
# This test also simulatenously tests the loading-from-hub functionality!
torch.manual_seed(123)
model_id = "facebook/opt-125m"
model = AutoModelForCausalLM.from_pretrained(model_id)
model.config.use_cache = False
adapters = [
"peft-internal-testing/opt-125m-dummy-lora",
"peft-internal-testing/opt-125m-dummy-lora",
]
adapters = {str(i): file_name for i, file_name in enumerate(adapters)}
peft_config = XLoraConfig(
task_type=TaskType.CAUSAL_LM,
peft_type=PeftType.XLORA,
hidden_size=model.config.hidden_size,
adapters=adapters,
xlora_depth=8,
xlora_size=2048,
layerwise_scalings=True,
xlora_dropout_p=0.2,
)
model = get_peft_model(model, peft_config)
downloaded = huggingface_hub.hf_hub_download(repo_id=adapters["0"], filename="adapter_model.safetensors")
sd = load_file(downloaded)
w0 = model.base_model.model.model.decoder.layers[0].self_attn.q_proj.lora_A["0"].weight
w1 = sd["base_model.model.model.decoder.layers.0.self_attn.q_proj.lora_A.weight"]
assert torch.allclose(w0, w1)
def test_scalings_storage(self, tokenizer, model):
model.enable_scalings_logging()
inputs = tokenizer.encode("Python is a", add_special_tokens=False, return_tensors="pt")
outputs = model.generate(
input_ids=inputs.to(self.torch_device),
max_new_tokens=10,
)
latest_scalings = model.get_latest_scalings()
assert latest_scalings is not None, "get_latest_scalings() should not return None after generation"
assert isinstance(latest_scalings, torch.Tensor)
assert torch.isfinite(latest_scalings).all(), "Scalings should contain finite values"
def test_per_token_normalization_with_softmax_topk(self, tokenizer, model, monkeypatch):
model.internal_xlora_classifier.config.top_k_lora = 2
model.internal_xlora_classifier.config.enable_softmax = False
model.internal_xlora_classifier.config.enable_softmax_topk = True
captured_data = []
orig_get_maybe_topk_scalings = XLoraLayer.get_maybe_topk_scalings
def mock_get_maybe_topk_scalings(self, scalings):
result = orig_get_maybe_topk_scalings(self, scalings)
if getattr(model, "internal_xlora_scalings", None) is not None:
captured_data.append(result)
return result
monkeypatch.setattr(XLoraLayer, "get_maybe_topk_scalings", mock_get_maybe_topk_scalings)
model.enable_scalings_logging()
inputs = tokenizer.encode("Test per token normalization", add_special_tokens=False, return_tensors="pt")
outputs = model.generate(
input_ids=inputs.to(self.torch_device),
max_new_tokens=1,
)
for scaling in captured_data:
weight_sums = scaling.sum(dim=-1)
assert torch.allclose(weight_sums, torch.ones_like(weight_sums), atol=1e-5), (
"Per-token scaling weights are not normalized to sum to 1."
)