Spaces:
Running
on
Zero
Running
on
Zero
# coding=utf-8 | |
# Copyright 2024 HuggingFace Inc. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
import logging | |
import os | |
import sys | |
import tempfile | |
import safetensors | |
sys.path.append("..") | |
from test_examples_utils import ExamplesTestsAccelerate, run_command # noqa: E402 | |
from diffusers import DiffusionPipeline # noqa: E402 | |
logging.basicConfig(level=logging.DEBUG) | |
logger = logging.getLogger() | |
stream_handler = logging.StreamHandler(sys.stdout) | |
logger.addHandler(stream_handler) | |
class DreamBoothLoRA(ExamplesTestsAccelerate): | |
def test_dreambooth_lora(self): | |
with tempfile.TemporaryDirectory() as tmpdir: | |
test_args = f""" | |
examples/dreambooth/train_dreambooth_lora.py | |
--pretrained_model_name_or_path hf-internal-testing/tiny-stable-diffusion-pipe | |
--instance_data_dir docs/source/en/imgs | |
--instance_prompt photo | |
--resolution 64 | |
--train_batch_size 1 | |
--gradient_accumulation_steps 1 | |
--max_train_steps 2 | |
--learning_rate 5.0e-04 | |
--scale_lr | |
--lr_scheduler constant | |
--lr_warmup_steps 0 | |
--output_dir {tmpdir} | |
""".split() | |
run_command(self._launch_args + test_args) | |
# save_pretrained smoke test | |
self.assertTrue(os.path.isfile(os.path.join(tmpdir, "pytorch_lora_weights.safetensors"))) | |
# make sure the state_dict has the correct naming in the parameters. | |
lora_state_dict = safetensors.torch.load_file(os.path.join(tmpdir, "pytorch_lora_weights.safetensors")) | |
is_lora = all("lora" in k for k in lora_state_dict.keys()) | |
self.assertTrue(is_lora) | |
# when not training the text encoder, all the parameters in the state dict should start | |
# with `"unet"` in their names. | |
starts_with_unet = all(key.startswith("unet") for key in lora_state_dict.keys()) | |
self.assertTrue(starts_with_unet) | |
def test_dreambooth_lora_with_text_encoder(self): | |
with tempfile.TemporaryDirectory() as tmpdir: | |
test_args = f""" | |
examples/dreambooth/train_dreambooth_lora.py | |
--pretrained_model_name_or_path hf-internal-testing/tiny-stable-diffusion-pipe | |
--instance_data_dir docs/source/en/imgs | |
--instance_prompt photo | |
--resolution 64 | |
--train_batch_size 1 | |
--gradient_accumulation_steps 1 | |
--max_train_steps 2 | |
--learning_rate 5.0e-04 | |
--scale_lr | |
--lr_scheduler constant | |
--lr_warmup_steps 0 | |
--train_text_encoder | |
--output_dir {tmpdir} | |
""".split() | |
run_command(self._launch_args + test_args) | |
# save_pretrained smoke test | |
self.assertTrue(os.path.isfile(os.path.join(tmpdir, "pytorch_lora_weights.safetensors"))) | |
# check `text_encoder` is present at all. | |
lora_state_dict = safetensors.torch.load_file(os.path.join(tmpdir, "pytorch_lora_weights.safetensors")) | |
keys = lora_state_dict.keys() | |
is_text_encoder_present = any(k.startswith("text_encoder") for k in keys) | |
self.assertTrue(is_text_encoder_present) | |
# the names of the keys of the state dict should either start with `unet` | |
# or `text_encoder`. | |
is_correct_naming = all(k.startswith("unet") or k.startswith("text_encoder") for k in keys) | |
self.assertTrue(is_correct_naming) | |
def test_dreambooth_lora_checkpointing_checkpoints_total_limit(self): | |
with tempfile.TemporaryDirectory() as tmpdir: | |
test_args = f""" | |
examples/dreambooth/train_dreambooth_lora.py | |
--pretrained_model_name_or_path=hf-internal-testing/tiny-stable-diffusion-pipe | |
--instance_data_dir=docs/source/en/imgs | |
--output_dir={tmpdir} | |
--instance_prompt=prompt | |
--resolution=64 | |
--train_batch_size=1 | |
--gradient_accumulation_steps=1 | |
--max_train_steps=6 | |
--checkpoints_total_limit=2 | |
--checkpointing_steps=2 | |
""".split() | |
run_command(self._launch_args + test_args) | |
self.assertEqual( | |
{x for x in os.listdir(tmpdir) if "checkpoint" in x}, | |
{"checkpoint-4", "checkpoint-6"}, | |
) | |
def test_dreambooth_lora_checkpointing_checkpoints_total_limit_removes_multiple_checkpoints(self): | |
with tempfile.TemporaryDirectory() as tmpdir: | |
test_args = f""" | |
examples/dreambooth/train_dreambooth_lora.py | |
--pretrained_model_name_or_path=hf-internal-testing/tiny-stable-diffusion-pipe | |
--instance_data_dir=docs/source/en/imgs | |
--output_dir={tmpdir} | |
--instance_prompt=prompt | |
--resolution=64 | |
--train_batch_size=1 | |
--gradient_accumulation_steps=1 | |
--max_train_steps=4 | |
--checkpointing_steps=2 | |
""".split() | |
run_command(self._launch_args + test_args) | |
self.assertEqual({x for x in os.listdir(tmpdir) if "checkpoint" in x}, {"checkpoint-2", "checkpoint-4"}) | |
resume_run_args = f""" | |
examples/dreambooth/train_dreambooth_lora.py | |
--pretrained_model_name_or_path=hf-internal-testing/tiny-stable-diffusion-pipe | |
--instance_data_dir=docs/source/en/imgs | |
--output_dir={tmpdir} | |
--instance_prompt=prompt | |
--resolution=64 | |
--train_batch_size=1 | |
--gradient_accumulation_steps=1 | |
--max_train_steps=8 | |
--checkpointing_steps=2 | |
--resume_from_checkpoint=checkpoint-4 | |
--checkpoints_total_limit=2 | |
""".split() | |
run_command(self._launch_args + resume_run_args) | |
self.assertEqual({x for x in os.listdir(tmpdir) if "checkpoint" in x}, {"checkpoint-6", "checkpoint-8"}) | |
def test_dreambooth_lora_if_model(self): | |
with tempfile.TemporaryDirectory() as tmpdir: | |
test_args = f""" | |
examples/dreambooth/train_dreambooth_lora.py | |
--pretrained_model_name_or_path hf-internal-testing/tiny-if-pipe | |
--instance_data_dir docs/source/en/imgs | |
--instance_prompt photo | |
--resolution 64 | |
--train_batch_size 1 | |
--gradient_accumulation_steps 1 | |
--max_train_steps 2 | |
--learning_rate 5.0e-04 | |
--scale_lr | |
--lr_scheduler constant | |
--lr_warmup_steps 0 | |
--output_dir {tmpdir} | |
--pre_compute_text_embeddings | |
--tokenizer_max_length=77 | |
--text_encoder_use_attention_mask | |
""".split() | |
run_command(self._launch_args + test_args) | |
# save_pretrained smoke test | |
self.assertTrue(os.path.isfile(os.path.join(tmpdir, "pytorch_lora_weights.safetensors"))) | |
# make sure the state_dict has the correct naming in the parameters. | |
lora_state_dict = safetensors.torch.load_file(os.path.join(tmpdir, "pytorch_lora_weights.safetensors")) | |
is_lora = all("lora" in k for k in lora_state_dict.keys()) | |
self.assertTrue(is_lora) | |
# when not training the text encoder, all the parameters in the state dict should start | |
# with `"unet"` in their names. | |
starts_with_unet = all(key.startswith("unet") for key in lora_state_dict.keys()) | |
self.assertTrue(starts_with_unet) | |
class DreamBoothLoRASDXL(ExamplesTestsAccelerate): | |
def test_dreambooth_lora_sdxl(self): | |
with tempfile.TemporaryDirectory() as tmpdir: | |
test_args = f""" | |
examples/dreambooth/train_dreambooth_lora_sdxl.py | |
--pretrained_model_name_or_path hf-internal-testing/tiny-stable-diffusion-xl-pipe | |
--instance_data_dir docs/source/en/imgs | |
--instance_prompt photo | |
--resolution 64 | |
--train_batch_size 1 | |
--gradient_accumulation_steps 1 | |
--max_train_steps 2 | |
--learning_rate 5.0e-04 | |
--scale_lr | |
--lr_scheduler constant | |
--lr_warmup_steps 0 | |
--output_dir {tmpdir} | |
""".split() | |
run_command(self._launch_args + test_args) | |
# save_pretrained smoke test | |
self.assertTrue(os.path.isfile(os.path.join(tmpdir, "pytorch_lora_weights.safetensors"))) | |
# make sure the state_dict has the correct naming in the parameters. | |
lora_state_dict = safetensors.torch.load_file(os.path.join(tmpdir, "pytorch_lora_weights.safetensors")) | |
is_lora = all("lora" in k for k in lora_state_dict.keys()) | |
self.assertTrue(is_lora) | |
# when not training the text encoder, all the parameters in the state dict should start | |
# with `"unet"` in their names. | |
starts_with_unet = all(key.startswith("unet") for key in lora_state_dict.keys()) | |
self.assertTrue(starts_with_unet) | |
def test_dreambooth_lora_sdxl_with_text_encoder(self): | |
with tempfile.TemporaryDirectory() as tmpdir: | |
test_args = f""" | |
examples/dreambooth/train_dreambooth_lora_sdxl.py | |
--pretrained_model_name_or_path hf-internal-testing/tiny-stable-diffusion-xl-pipe | |
--instance_data_dir docs/source/en/imgs | |
--instance_prompt photo | |
--resolution 64 | |
--train_batch_size 1 | |
--gradient_accumulation_steps 1 | |
--max_train_steps 2 | |
--learning_rate 5.0e-04 | |
--scale_lr | |
--lr_scheduler constant | |
--lr_warmup_steps 0 | |
--output_dir {tmpdir} | |
--train_text_encoder | |
""".split() | |
run_command(self._launch_args + test_args) | |
# save_pretrained smoke test | |
self.assertTrue(os.path.isfile(os.path.join(tmpdir, "pytorch_lora_weights.safetensors"))) | |
# make sure the state_dict has the correct naming in the parameters. | |
lora_state_dict = safetensors.torch.load_file(os.path.join(tmpdir, "pytorch_lora_weights.safetensors")) | |
is_lora = all("lora" in k for k in lora_state_dict.keys()) | |
self.assertTrue(is_lora) | |
# when not training the text encoder, all the parameters in the state dict should start | |
# with `"unet"` or `"text_encoder"` or `"text_encoder_2"` in their names. | |
keys = lora_state_dict.keys() | |
starts_with_unet = all( | |
k.startswith("unet") or k.startswith("text_encoder") or k.startswith("text_encoder_2") for k in keys | |
) | |
self.assertTrue(starts_with_unet) | |
def test_dreambooth_lora_sdxl_custom_captions(self): | |
with tempfile.TemporaryDirectory() as tmpdir: | |
test_args = f""" | |
examples/dreambooth/train_dreambooth_lora_sdxl.py | |
--pretrained_model_name_or_path hf-internal-testing/tiny-stable-diffusion-xl-pipe | |
--dataset_name hf-internal-testing/dummy_image_text_data | |
--caption_column text | |
--instance_prompt photo | |
--resolution 64 | |
--train_batch_size 1 | |
--gradient_accumulation_steps 1 | |
--max_train_steps 2 | |
--learning_rate 5.0e-04 | |
--scale_lr | |
--lr_scheduler constant | |
--lr_warmup_steps 0 | |
--output_dir {tmpdir} | |
""".split() | |
run_command(self._launch_args + test_args) | |
def test_dreambooth_lora_sdxl_text_encoder_custom_captions(self): | |
with tempfile.TemporaryDirectory() as tmpdir: | |
test_args = f""" | |
examples/dreambooth/train_dreambooth_lora_sdxl.py | |
--pretrained_model_name_or_path hf-internal-testing/tiny-stable-diffusion-xl-pipe | |
--dataset_name hf-internal-testing/dummy_image_text_data | |
--caption_column text | |
--instance_prompt photo | |
--resolution 64 | |
--train_batch_size 1 | |
--gradient_accumulation_steps 1 | |
--max_train_steps 2 | |
--learning_rate 5.0e-04 | |
--scale_lr | |
--lr_scheduler constant | |
--lr_warmup_steps 0 | |
--output_dir {tmpdir} | |
--train_text_encoder | |
""".split() | |
run_command(self._launch_args + test_args) | |
def test_dreambooth_lora_sdxl_checkpointing_checkpoints_total_limit(self): | |
pipeline_path = "hf-internal-testing/tiny-stable-diffusion-xl-pipe" | |
with tempfile.TemporaryDirectory() as tmpdir: | |
test_args = f""" | |
examples/dreambooth/train_dreambooth_lora_sdxl.py | |
--pretrained_model_name_or_path {pipeline_path} | |
--instance_data_dir docs/source/en/imgs | |
--instance_prompt photo | |
--resolution 64 | |
--train_batch_size 1 | |
--gradient_accumulation_steps 1 | |
--max_train_steps 6 | |
--checkpointing_steps=2 | |
--checkpoints_total_limit=2 | |
--learning_rate 5.0e-04 | |
--scale_lr | |
--lr_scheduler constant | |
--lr_warmup_steps 0 | |
--output_dir {tmpdir} | |
""".split() | |
run_command(self._launch_args + test_args) | |
pipe = DiffusionPipeline.from_pretrained(pipeline_path) | |
pipe.load_lora_weights(tmpdir) | |
pipe("a prompt", num_inference_steps=1) | |
# check checkpoint directories exist | |
# checkpoint-2 should have been deleted | |
self.assertEqual({x for x in os.listdir(tmpdir) if "checkpoint" in x}, {"checkpoint-4", "checkpoint-6"}) | |
def test_dreambooth_lora_sdxl_text_encoder_checkpointing_checkpoints_total_limit(self): | |
pipeline_path = "hf-internal-testing/tiny-stable-diffusion-xl-pipe" | |
with tempfile.TemporaryDirectory() as tmpdir: | |
test_args = f""" | |
examples/dreambooth/train_dreambooth_lora_sdxl.py | |
--pretrained_model_name_or_path {pipeline_path} | |
--instance_data_dir docs/source/en/imgs | |
--instance_prompt photo | |
--resolution 64 | |
--train_batch_size 1 | |
--gradient_accumulation_steps 1 | |
--max_train_steps 7 | |
--checkpointing_steps=2 | |
--checkpoints_total_limit=2 | |
--train_text_encoder | |
--learning_rate 5.0e-04 | |
--scale_lr | |
--lr_scheduler constant | |
--lr_warmup_steps 0 | |
--output_dir {tmpdir} | |
""".split() | |
run_command(self._launch_args + test_args) | |
pipe = DiffusionPipeline.from_pretrained(pipeline_path) | |
pipe.load_lora_weights(tmpdir) | |
pipe("a prompt", num_inference_steps=2) | |
# check checkpoint directories exist | |
self.assertEqual( | |
{x for x in os.listdir(tmpdir) if "checkpoint" in x}, | |
# checkpoint-2 should have been deleted | |
{"checkpoint-4", "checkpoint-6"}, | |
) | |