| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | import json |
| | import logging |
| | import os |
| | import sys |
| | import tempfile |
| |
|
| | import safetensors |
| |
|
| | from diffusers.loaders.lora_base import LORA_ADAPTER_METADATA_KEY |
| |
|
| |
|
| | sys.path.append("..") |
| | from test_examples_utils import ExamplesTestsAccelerate, run_command |
| |
|
| |
|
| | logging.basicConfig(level=logging.DEBUG) |
| |
|
| | logger = logging.getLogger() |
| | stream_handler = logging.StreamHandler(sys.stdout) |
| | logger.addHandler(stream_handler) |
| |
|
| |
|
| | class DreamBoothLoRASANA(ExamplesTestsAccelerate): |
| | instance_data_dir = "docs/source/en/imgs" |
| | pretrained_model_name_or_path = "hf-internal-testing/tiny-sana-pipe" |
| | script_path = "examples/dreambooth/train_dreambooth_lora_sana.py" |
| | transformer_layer_type = "transformer_blocks.0.attn1.to_k" |
| |
|
| | def test_dreambooth_lora_sana(self): |
| | with tempfile.TemporaryDirectory() as tmpdir: |
| | test_args = f""" |
| | {self.script_path} |
| | --pretrained_model_name_or_path {self.pretrained_model_name_or_path} |
| | --instance_data_dir {self.instance_data_dir} |
| | --resolution 32 |
| | --train_batch_size 1 |
| | --gradient_accumulation_steps 1 |
| | --max_train_steps 2 |
| | --learning_rate 5.0e-04 |
| | --scale_lr |
| | --lr_scheduler constant |
| | --lr_warmup_steps 0 |
| | --output_dir {tmpdir} |
| | --max_sequence_length 16 |
| | """.split() |
| |
|
| | test_args.extend(["--instance_prompt", ""]) |
| | run_command(self._launch_args + test_args) |
| | |
| | self.assertTrue(os.path.isfile(os.path.join(tmpdir, "pytorch_lora_weights.safetensors"))) |
| |
|
| | |
| | lora_state_dict = safetensors.torch.load_file(os.path.join(tmpdir, "pytorch_lora_weights.safetensors")) |
| | is_lora = all("lora" in k for k in lora_state_dict.keys()) |
| | self.assertTrue(is_lora) |
| |
|
| | |
| | |
| | starts_with_transformer = all(key.startswith("transformer") for key in lora_state_dict.keys()) |
| | self.assertTrue(starts_with_transformer) |
| |
|
| | def test_dreambooth_lora_latent_caching(self): |
| | with tempfile.TemporaryDirectory() as tmpdir: |
| | test_args = f""" |
| | {self.script_path} |
| | --pretrained_model_name_or_path {self.pretrained_model_name_or_path} |
| | --instance_data_dir {self.instance_data_dir} |
| | --resolution 32 |
| | --train_batch_size 1 |
| | --gradient_accumulation_steps 1 |
| | --max_train_steps 2 |
| | --cache_latents |
| | --learning_rate 5.0e-04 |
| | --scale_lr |
| | --lr_scheduler constant |
| | --lr_warmup_steps 0 |
| | --output_dir {tmpdir} |
| | --max_sequence_length 16 |
| | """.split() |
| |
|
| | test_args.extend(["--instance_prompt", ""]) |
| | run_command(self._launch_args + test_args) |
| | |
| | self.assertTrue(os.path.isfile(os.path.join(tmpdir, "pytorch_lora_weights.safetensors"))) |
| |
|
| | |
| | lora_state_dict = safetensors.torch.load_file(os.path.join(tmpdir, "pytorch_lora_weights.safetensors")) |
| | is_lora = all("lora" in k for k in lora_state_dict.keys()) |
| | self.assertTrue(is_lora) |
| |
|
| | |
| | |
| | starts_with_transformer = all(key.startswith("transformer") for key in lora_state_dict.keys()) |
| | self.assertTrue(starts_with_transformer) |
| |
|
| | def test_dreambooth_lora_layers(self): |
| | with tempfile.TemporaryDirectory() as tmpdir: |
| | test_args = f""" |
| | {self.script_path} |
| | --pretrained_model_name_or_path {self.pretrained_model_name_or_path} |
| | --instance_data_dir {self.instance_data_dir} |
| | --resolution 32 |
| | --train_batch_size 1 |
| | --gradient_accumulation_steps 1 |
| | --max_train_steps 2 |
| | --cache_latents |
| | --learning_rate 5.0e-04 |
| | --scale_lr |
| | --lora_layers {self.transformer_layer_type} |
| | --lr_scheduler constant |
| | --lr_warmup_steps 0 |
| | --output_dir {tmpdir} |
| | --max_sequence_length 16 |
| | """.split() |
| |
|
| | test_args.extend(["--instance_prompt", ""]) |
| | run_command(self._launch_args + test_args) |
| | |
| | self.assertTrue(os.path.isfile(os.path.join(tmpdir, "pytorch_lora_weights.safetensors"))) |
| |
|
| | |
| | lora_state_dict = safetensors.torch.load_file(os.path.join(tmpdir, "pytorch_lora_weights.safetensors")) |
| | is_lora = all("lora" in k for k in lora_state_dict.keys()) |
| | self.assertTrue(is_lora) |
| |
|
| | |
| | |
| | |
| | starts_with_transformer = all(self.transformer_layer_type in key for key in lora_state_dict) |
| | self.assertTrue(starts_with_transformer) |
| |
|
| | def test_dreambooth_lora_sana_checkpointing_checkpoints_total_limit(self): |
| | with tempfile.TemporaryDirectory() as tmpdir: |
| | test_args = f""" |
| | {self.script_path} |
| | --pretrained_model_name_or_path={self.pretrained_model_name_or_path} |
| | --instance_data_dir={self.instance_data_dir} |
| | --output_dir={tmpdir} |
| | --resolution=32 |
| | --train_batch_size=1 |
| | --gradient_accumulation_steps=1 |
| | --max_train_steps=6 |
| | --checkpoints_total_limit=2 |
| | --checkpointing_steps=2 |
| | --max_sequence_length 16 |
| | """.split() |
| |
|
| | test_args.extend(["--instance_prompt", ""]) |
| | run_command(self._launch_args + test_args) |
| |
|
| | self.assertEqual( |
| | {x for x in os.listdir(tmpdir) if "checkpoint" in x}, |
| | {"checkpoint-4", "checkpoint-6"}, |
| | ) |
| |
|
| | def test_dreambooth_lora_sana_checkpointing_checkpoints_total_limit_removes_multiple_checkpoints(self): |
| | with tempfile.TemporaryDirectory() as tmpdir: |
| | test_args = f""" |
| | {self.script_path} |
| | --pretrained_model_name_or_path={self.pretrained_model_name_or_path} |
| | --instance_data_dir={self.instance_data_dir} |
| | --output_dir={tmpdir} |
| | --resolution=32 |
| | --train_batch_size=1 |
| | --gradient_accumulation_steps=1 |
| | --max_train_steps=4 |
| | --checkpointing_steps=2 |
| | --max_sequence_length 166 |
| | """.split() |
| |
|
| | test_args.extend(["--instance_prompt", ""]) |
| | run_command(self._launch_args + test_args) |
| |
|
| | self.assertEqual({x for x in os.listdir(tmpdir) if "checkpoint" in x}, {"checkpoint-2", "checkpoint-4"}) |
| |
|
| | resume_run_args = f""" |
| | {self.script_path} |
| | --pretrained_model_name_or_path={self.pretrained_model_name_or_path} |
| | --instance_data_dir={self.instance_data_dir} |
| | --output_dir={tmpdir} |
| | --resolution=32 |
| | --train_batch_size=1 |
| | --gradient_accumulation_steps=1 |
| | --max_train_steps=8 |
| | --checkpointing_steps=2 |
| | --resume_from_checkpoint=checkpoint-4 |
| | --checkpoints_total_limit=2 |
| | --max_sequence_length 16 |
| | """.split() |
| |
|
| | resume_run_args.extend(["--instance_prompt", ""]) |
| | run_command(self._launch_args + resume_run_args) |
| |
|
| | self.assertEqual({x for x in os.listdir(tmpdir) if "checkpoint" in x}, {"checkpoint-6", "checkpoint-8"}) |
| |
|
| | def test_dreambooth_lora_sana_with_metadata(self): |
| | lora_alpha = 8 |
| | rank = 4 |
| | with tempfile.TemporaryDirectory() as tmpdir: |
| | test_args = f""" |
| | {self.script_path} |
| | --pretrained_model_name_or_path={self.pretrained_model_name_or_path} |
| | --instance_data_dir={self.instance_data_dir} |
| | --output_dir={tmpdir} |
| | --resolution=32 |
| | --train_batch_size=1 |
| | --gradient_accumulation_steps=1 |
| | --max_train_steps=4 |
| | --lora_alpha={lora_alpha} |
| | --rank={rank} |
| | --checkpointing_steps=2 |
| | --max_sequence_length 166 |
| | """.split() |
| |
|
| | test_args.extend(["--instance_prompt", ""]) |
| | run_command(self._launch_args + test_args) |
| |
|
| | state_dict_file = os.path.join(tmpdir, "pytorch_lora_weights.safetensors") |
| | self.assertTrue(os.path.isfile(state_dict_file)) |
| |
|
| | |
| | with safetensors.torch.safe_open(state_dict_file, framework="pt", device="cpu") as f: |
| | metadata = f.metadata() or {} |
| |
|
| | metadata.pop("format", None) |
| | raw = metadata.get(LORA_ADAPTER_METADATA_KEY) |
| | if raw: |
| | raw = json.loads(raw) |
| |
|
| | loaded_lora_alpha = raw["transformer.lora_alpha"] |
| | self.assertTrue(loaded_lora_alpha == lora_alpha) |
| | loaded_lora_rank = raw["transformer.r"] |
| | self.assertTrue(loaded_lora_rank == rank) |
| |
|