Spaces:
Sleeping
Sleeping
| # Copyright 2023-present the HuggingFace Inc. team. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| import os | |
| import tempfile | |
| import pytest | |
| import torch | |
| from torch.testing import assert_close | |
| from transformers import AutoModelForCausalLM | |
| from peft import get_peft_model | |
| from peft.peft_model import PeftModel | |
| from peft.tuners.adaption_prompt import AdaptionPromptConfig | |
| from peft.utils import infer_device | |
| from peft.utils.other import prepare_model_for_kbit_training | |
| from peft.utils.save_and_load import get_peft_model_state_dict | |
| MODELS_TO_TEST = [ | |
| "hf-internal-testing/tiny-random-gpt2", | |
| "trl-internal-testing/tiny-random-LlamaForCausalLM", | |
| "hf-internal-testing/tiny-random-MistralForCausalLM", | |
| ] | |
| class TestAdaptionPrompt: | |
| """ | |
| Tests for the AdaptionPrompt model. | |
| Some of these tests were adapted from `test_peft_model.py` (which has been refactored since), but since we haven't | |
| checked in the test checkpoints for Llama into `hf-internal-testing`, we separate them for now. | |
| """ | |
| transformers_class = AutoModelForCausalLM | |
| torch_device = infer_device() | |
| def test_attributes(self, model_id): | |
| model = self.transformers_class.from_pretrained(model_id) | |
| config = AdaptionPromptConfig(adapter_layers=1, adapter_len=4) | |
| model = get_peft_model(model, config) | |
| assert hasattr(model, "save_pretrained") | |
| assert hasattr(model, "from_pretrained") | |
| assert hasattr(model, "push_to_hub") | |
| def test_prepare_for_training(self, model_id): | |
| model = self.transformers_class.from_pretrained(model_id) | |
| config = AdaptionPromptConfig(adapter_layers=1, adapter_len=4, task_type="CAUSAL_LM") | |
| model = get_peft_model(model, config) | |
| model = model.to(self.torch_device) | |
| dummy_input = torch.LongTensor([[1, 1, 1]]).to(self.torch_device) | |
| dummy_output = model.get_input_embeddings()(dummy_input) | |
| assert not dummy_output.requires_grad | |
| def test_prepare_for_int8_training(self, model_id): | |
| model = self.transformers_class.from_pretrained(model_id) | |
| model = prepare_model_for_kbit_training(model) | |
| model = model.to(self.torch_device) | |
| for param in model.parameters(): | |
| assert not param.requires_grad | |
| config = AdaptionPromptConfig(adapter_layers=1, adapter_len=4, task_type="CAUSAL_LM") | |
| model = get_peft_model(model, config) | |
| # For backward compatibility | |
| if hasattr(model, "enable_input_require_grads"): | |
| model.enable_input_require_grads() | |
| else: | |
| def make_inputs_require_grad(module, input, output): | |
| output.requires_grad_(True) | |
| model.get_input_embeddings().register_forward_hook(make_inputs_require_grad) | |
| dummy_input = torch.LongTensor([[1, 1, 1]]).to(self.torch_device) | |
| dummy_output = model.get_input_embeddings()(dummy_input) | |
| assert dummy_output.requires_grad | |
| def test_save_pretrained_regression(self, model_id): | |
| seed = 420 | |
| torch.manual_seed(seed) | |
| model = self.transformers_class.from_pretrained(model_id) | |
| config = AdaptionPromptConfig(adapter_layers=2, adapter_len=4, task_type="CAUSAL_LM") | |
| model = get_peft_model(model, config) | |
| model = model.to(self.torch_device) | |
| with tempfile.TemporaryDirectory() as tmp_dirname: | |
| model.save_pretrained(tmp_dirname, safe_serialization=False) | |
| torch.manual_seed(seed) | |
| model_from_pretrained = self.transformers_class.from_pretrained(model_id) | |
| model_from_pretrained = PeftModel.from_pretrained(model_from_pretrained, tmp_dirname) | |
| # check if the state dicts are equal | |
| state_dict = get_peft_model_state_dict(model) | |
| state_dict_from_pretrained = get_peft_model_state_dict(model_from_pretrained) | |
| # check if same keys | |
| assert state_dict.keys() == state_dict_from_pretrained.keys() | |
| # Check that the number of saved parameters is 4 -- 2 layers of (tokens and gate). | |
| assert len(state_dict) == 4 | |
| # check if tensors equal | |
| for key in state_dict.keys(): | |
| assert torch.allclose( | |
| state_dict[key].to(self.torch_device), state_dict_from_pretrained[key].to(self.torch_device) | |
| ) | |
| # check if `adapter_model.bin` is present | |
| assert os.path.exists(os.path.join(tmp_dirname, "adapter_model.bin")) | |
| # check if `adapter_config.json` is present | |
| assert os.path.exists(os.path.join(tmp_dirname, "adapter_config.json")) | |
| # check if `model.safetensors` is not present | |
| assert not os.path.exists(os.path.join(tmp_dirname, "model.safetensors")) | |
| # check if `config.json` is not present | |
| assert not os.path.exists(os.path.join(tmp_dirname, "config.json")) | |
| def test_save_pretrained(self, model_id): | |
| seed = 420 | |
| torch.manual_seed(seed) | |
| model = self.transformers_class.from_pretrained(model_id) | |
| config = AdaptionPromptConfig(adapter_layers=2, adapter_len=4, task_type="CAUSAL_LM") | |
| model = get_peft_model(model, config) | |
| model = model.to(self.torch_device) | |
| with tempfile.TemporaryDirectory() as tmp_dirname: | |
| model.save_pretrained(tmp_dirname) | |
| torch.manual_seed(seed) | |
| model_from_pretrained = self.transformers_class.from_pretrained(model_id) | |
| model_from_pretrained = PeftModel.from_pretrained(model_from_pretrained, tmp_dirname) | |
| # check if the state dicts are equal | |
| state_dict = get_peft_model_state_dict(model) | |
| state_dict_from_pretrained = get_peft_model_state_dict(model_from_pretrained) | |
| # check if same keys | |
| assert state_dict.keys() == state_dict_from_pretrained.keys() | |
| # Check that the number of saved parameters is 4 -- 2 layers of (tokens and gate). | |
| assert len(state_dict) == 4 | |
| # check if tensors equal | |
| for key in state_dict.keys(): | |
| assert torch.allclose( | |
| state_dict[key].to(self.torch_device), state_dict_from_pretrained[key].to(self.torch_device) | |
| ) | |
| # check if `adapter_model.bin` is present | |
| assert os.path.exists(os.path.join(tmp_dirname, "adapter_model.safetensors")) | |
| # check if `adapter_config.json` is present | |
| assert os.path.exists(os.path.join(tmp_dirname, "adapter_config.json")) | |
| # check if `model.safetensors` is not present | |
| assert not os.path.exists(os.path.join(tmp_dirname, "model.safetensors")) | |
| # check if `config.json` is not present | |
| assert not os.path.exists(os.path.join(tmp_dirname, "config.json")) | |
| def test_save_pretrained_selected_adapters(self, model_id): | |
| seed = 420 | |
| torch.manual_seed(seed) | |
| model = self.transformers_class.from_pretrained(model_id) | |
| config = AdaptionPromptConfig(adapter_layers=2, adapter_len=4, task_type="CAUSAL_LM") | |
| model = get_peft_model(model, config) | |
| model = model.to(self.torch_device) | |
| new_adapter_config = AdaptionPromptConfig(adapter_layers=2, adapter_len=4, task_type="CAUSAL_LM") | |
| model.add_adapter("new_adapter", new_adapter_config) | |
| with tempfile.TemporaryDirectory() as tmp_dirname: | |
| model.save_pretrained(tmp_dirname) | |
| torch.manual_seed(seed) | |
| model_from_pretrained = self.transformers_class.from_pretrained(model_id) | |
| model_from_pretrained = PeftModel.from_pretrained(model_from_pretrained, tmp_dirname) | |
| model_from_pretrained.load_adapter(tmp_dirname, "new_adapter") | |
| # check if the state dicts are equal | |
| state_dict = get_peft_model_state_dict(model) | |
| state_dict_from_pretrained = get_peft_model_state_dict(model_from_pretrained) | |
| # check if same keys | |
| assert state_dict.keys() == state_dict_from_pretrained.keys() | |
| # Check that the number of saved parameters is 4 -- 2 layers of (tokens and gate). | |
| assert len(state_dict) == 4 | |
| # check if tensors equal | |
| for key in state_dict.keys(): | |
| assert torch.allclose( | |
| state_dict[key].to(self.torch_device), state_dict_from_pretrained[key].to(self.torch_device) | |
| ) | |
| # check if `adapter_model.bin` is present | |
| assert os.path.exists(os.path.join(tmp_dirname, "adapter_model.safetensors")) | |
| # check if `adapter_config.json` is present | |
| assert os.path.exists(os.path.join(tmp_dirname, "adapter_config.json")) | |
| # check if `model.safetensors` is not present | |
| assert not os.path.exists(os.path.join(tmp_dirname, "model.safetensors")) | |
| # check if `config.json` is not present | |
| assert not os.path.exists(os.path.join(tmp_dirname, "config.json")) | |
| def test_generate(self, model_id): | |
| model = self.transformers_class.from_pretrained(model_id) | |
| config = AdaptionPromptConfig(adapter_layers=2, adapter_len=4, task_type="CAUSAL_LM") | |
| model = get_peft_model(model, config) | |
| model = model.to(self.torch_device) | |
| input_ids = torch.LongTensor([[1, 1, 1], [2, 1, 2]]).to(self.torch_device) | |
| attention_mask = torch.LongTensor([[1, 1, 1], [1, 0, 1]]).to(self.torch_device) | |
| # check if `generate` works | |
| _ = model.generate(input_ids=input_ids, attention_mask=attention_mask) | |
| # check if `generate` works if positional arguments are passed | |
| _ = model.generate(input_ids, attention_mask=attention_mask) | |
| def test_sequence_adapter_ops(self, model_id): | |
| """Test sequence of adapter operations.""" | |
| # Test input data. | |
| input_ids = torch.LongTensor([[1, 1, 1], [2, 1, 2]]).to(self.torch_device) | |
| target_ids = torch.LongTensor([[0, 0, 0], [0, 0, 0]]).to(self.torch_device) | |
| attention_mask = torch.LongTensor([[1, 1, 1], [1, 0, 1]]).to(self.torch_device) | |
| # Create original llama model. | |
| original = self.transformers_class.from_pretrained(model_id) | |
| original = original.to(self.torch_device) | |
| original_before = original(input_ids=input_ids, attention_mask=attention_mask) | |
| # Get AdaptionPrompt model. | |
| adapted = get_peft_model( | |
| original, AdaptionPromptConfig(adapter_layers=2, adapter_len=4, task_type="CAUSAL_LM") | |
| ) | |
| adapted = adapted.to(self.torch_device) | |
| default_before = adapted(input_ids=input_ids, attention_mask=attention_mask, labels=target_ids) | |
| # Test zero-init: The logits should be exactly the same. | |
| assert_close(original_before.logits, default_before.logits, rtol=0, atol=0) | |
| # Single fine-tuning step on "default" adapter. | |
| optimizer = torch.optim.SGD(adapted.parameters(), lr=1) | |
| optimizer.zero_grad() | |
| default_before.loss.backward() | |
| optimizer.step() | |
| # Test that the output changed. | |
| default_after = adapted(input_ids=input_ids, attention_mask=attention_mask, labels=target_ids) | |
| assert not torch.allclose(default_before.logits, default_after.logits) | |
| with adapted.disable_adapter(): | |
| # Test that the output is the same as the original output. | |
| default_disabled = adapted(input_ids=input_ids, attention_mask=attention_mask, labels=target_ids) | |
| assert_close(original_before.logits, default_disabled.logits, rtol=0, atol=0) | |
| # Add new adapter 1. | |
| adapted.add_adapter("adapter 1", AdaptionPromptConfig(adapter_layers=2, adapter_len=8, task_type="CAUSAL_LM")) | |
| # Test zero-init | |
| adapter_1_before = adapted(input_ids=input_ids, attention_mask=attention_mask, labels=target_ids) | |
| assert_close(original_before.logits, adapter_1_before.logits, rtol=0, atol=0) | |
| # Single fine-tuning step on adapter 1. | |
| optimizer = torch.optim.SGD(adapted.parameters(), lr=1) | |
| optimizer.zero_grad() | |
| adapter_1_before.loss.backward() | |
| optimizer.step() | |
| # Test that adapter 1 output changed. | |
| adapter_1_after = adapted(input_ids=input_ids, attention_mask=attention_mask, labels=target_ids) | |
| assert not torch.allclose(adapter_1_before.logits, adapter_1_after.logits) | |
| assert not torch.allclose(original_before.logits, adapter_1_after.logits) | |
| assert not torch.allclose(default_after.logits, adapter_1_after.logits) | |
| with adapted.disable_adapter(): | |
| # Test that the output is the same as the original output. | |
| adapter_1_disabled = adapted(input_ids=input_ids, attention_mask=attention_mask, labels=target_ids) | |
| assert_close(original_before.logits, adapter_1_disabled.logits, rtol=0, atol=0) | |
| # Set adapter back to default. | |
| adapted.set_adapter("default") | |
| # Test that the output is the same as the default output after training. | |
| default_after_set = adapted(input_ids=input_ids, attention_mask=attention_mask, labels=target_ids) | |
| assert_close(default_after.logits, default_after_set.logits, rtol=0, atol=0) | |
| assert not torch.allclose(original_before.logits, default_after_set.logits) | |
| assert not torch.allclose(adapter_1_after.logits, default_after_set.logits) | |
| def test_add_and_set_while_disabled(self, model_id): | |
| """Test that adding and setting adapters while disabled works as intended.""" | |
| # Test input data. | |
| input_ids = torch.LongTensor([[1, 1, 1], [2, 1, 2]]).to(self.torch_device) | |
| target_ids = torch.LongTensor([[0, 0, 0], [0, 0, 0]]).to(self.torch_device) | |
| attention_mask = torch.LongTensor([[1, 1, 1], [1, 0, 1]]).to(self.torch_device) | |
| # Create original llama model. | |
| original = self.transformers_class.from_pretrained(model_id) | |
| original = original.to(self.torch_device) | |
| original_before = original(input_ids=input_ids, attention_mask=attention_mask) | |
| # Get AdaptionPrompt model. | |
| adapted = get_peft_model( | |
| original, AdaptionPromptConfig(adapter_layers=2, adapter_len=4, task_type="CAUSAL_LM") | |
| ) | |
| adapted = adapted.to(self.torch_device) | |
| with adapted.disable_adapter(): | |
| adapted.add_adapter( | |
| "adapter 1", AdaptionPromptConfig(adapter_layers=2, adapter_len=8, task_type="CAUSAL_LM") | |
| ) | |
| # Test that the output is the same as the original output. | |
| adapter_1_before = adapted(input_ids=input_ids, attention_mask=attention_mask, labels=target_ids) | |
| assert_close(original_before.logits, adapter_1_before.logits, rtol=0, atol=0) | |
| # Single fine-tuning step on adapter 1. | |
| optimizer = torch.optim.SGD(adapted.parameters(), lr=1) | |
| optimizer.zero_grad() | |
| adapter_1_before.loss.backward() | |
| optimizer.step() | |
| # Test that adapter 1 output changed. | |
| adapter_1_after = adapted(input_ids=input_ids, attention_mask=attention_mask, labels=target_ids) | |
| assert not torch.allclose(original_before.logits, adapter_1_after.logits) | |
| adapted.set_adapter("default") | |
| with adapted.disable_adapter(): | |
| adapted.set_adapter("adapter 1") | |
| # Test that adapter 1 is active again. | |
| adapter_1_after_set = adapted(input_ids=input_ids, attention_mask=attention_mask, labels=target_ids) | |
| assert_close(adapter_1_after.logits, adapter_1_after_set.logits, rtol=0, atol=0) | |
| def test_use_cache(self, model_id): | |
| """Test that AdaptionPrompt works when Llama config use_cache=True.""" | |
| torch.manual_seed(0) | |
| input_ids = torch.LongTensor([[1, 1, 1], [2, 1, 2]]).to(self.torch_device) | |
| original = self.transformers_class.from_pretrained(model_id, use_cache=False) | |
| adapted = get_peft_model( | |
| original, AdaptionPromptConfig(adapter_layers=2, adapter_len=4, task_type="CAUSAL_LM") | |
| ) | |
| adapted = adapted.to(self.torch_device) | |
| expected = adapted.generate(input_ids=input_ids, max_length=8) | |
| # Set use_cache = True and generate output again. | |
| adapted.base_model.config.use_cache = True | |
| actual = adapted.generate(input_ids=input_ids, max_length=8) | |
| assert_close(expected, actual, rtol=0, atol=0) | |
| def test_bf16_inference(self, model_id): | |
| if self.torch_device == "mps": | |
| return pytest.skip("Skipping bf16 test on MPS") | |
| """Test that AdaptionPrompt works when Llama using a half-precision model.""" | |
| input_ids = torch.LongTensor([[1, 1, 1], [2, 1, 2]]).to(self.torch_device) | |
| original = self.transformers_class.from_pretrained(model_id, torch_dtype=torch.bfloat16) | |
| adapted = get_peft_model( | |
| original, AdaptionPromptConfig(adapter_layers=2, adapter_len=4, task_type="CAUSAL_LM") | |
| ) | |
| adapted = adapted.to(self.torch_device) | |
| adapted.generate(input_ids=input_ids) # does not raise | |
| def test_disable_adapter(self, model_id): | |
| model = self.transformers_class.from_pretrained(model_id).to(self.torch_device) | |
| dummy_input = torch.LongTensor([[1, 1, 1]]).to(self.torch_device) | |
| output_before = model(dummy_input).logits | |
| config = AdaptionPromptConfig(adapter_layers=1, adapter_len=4, task_type="CAUSAL_LM") | |
| model = get_peft_model(model, config).to(self.torch_device) | |
| output_peft = model(dummy_input).logits | |
| # TODO currently this fails because scores are zeroed out: | |
| # https://github.com/huggingface/peft/blob/062d95a09eb5d1de35c0e5e23d4387daba99e2db/src/peft/tuners/adaption_prompt.py#L303 | |
| # This is fine for users but makes it difficult to test if anything happens. In the future, we will have a clean | |
| # way to control initialization. Until then, this test is expected to fail. | |
| assert not torch.allclose(output_before, output_peft) | |
| with model.disable_adapter(): | |
| output_peft_disabled = model(dummy_input).logits | |
| assert torch.allclose(output_before, output_peft_disabled) | |