xiaoanyu123's picture
Add files using upload-large-folder tool
44e6efe verified
# Copyright 2024 Bria AI and The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import gc
import tempfile
import unittest
import numpy as np
import torch
from huggingface_hub import hf_hub_download
from transformers import T5EncoderModel, T5TokenizerFast
from diffusers import (
AutoencoderKL,
BriaTransformer2DModel,
FlowMatchEulerDiscreteScheduler,
)
from diffusers.pipelines.bria import BriaPipeline
# from ..test_pipelines_common import PipelineTesterMixin, check_qkv_fused_layers_exist
from tests.pipelines.test_pipelines_common import PipelineTesterMixin, to_np
from ...testing_utils import (
backend_empty_cache,
enable_full_determinism,
numpy_cosine_similarity_distance,
require_torch_accelerator,
slow,
torch_device,
)
enable_full_determinism()
class BriaPipelineFastTests(PipelineTesterMixin, unittest.TestCase):
pipeline_class = BriaPipeline
params = frozenset(["prompt", "height", "width", "guidance_scale", "prompt_embeds"])
batch_params = frozenset(["prompt"])
test_xformers_attention = False
# there is no xformers processor for Flux
test_xformers_attention = False
test_layerwise_casting = True
test_group_offloading = True
def get_dummy_components(self):
torch.manual_seed(0)
transformer = BriaTransformer2DModel(
patch_size=1,
in_channels=16,
num_layers=1,
num_single_layers=1,
attention_head_dim=8,
num_attention_heads=2,
joint_attention_dim=32,
pooled_projection_dim=None,
axes_dims_rope=[0, 4, 4],
)
torch.manual_seed(0)
vae = AutoencoderKL(
act_fn="silu",
block_out_channels=(32,),
in_channels=3,
out_channels=3,
down_block_types=["DownEncoderBlock2D"],
up_block_types=["UpDecoderBlock2D"],
latent_channels=4,
sample_size=32,
shift_factor=0,
scaling_factor=0.13025,
use_post_quant_conv=True,
use_quant_conv=True,
force_upcast=False,
)
scheduler = FlowMatchEulerDiscreteScheduler()
torch.manual_seed(0)
text_encoder = T5EncoderModel.from_pretrained("hf-internal-testing/tiny-random-t5")
tokenizer = T5TokenizerFast.from_pretrained("hf-internal-testing/tiny-random-t5")
components = {
"scheduler": scheduler,
"text_encoder": text_encoder,
"tokenizer": tokenizer,
"transformer": transformer,
"vae": vae,
"image_encoder": None,
"feature_extractor": None,
}
return components
def get_dummy_inputs(self, device, seed=0):
if str(device).startswith("mps"):
generator = torch.manual_seed(seed)
else:
generator = torch.Generator(device="cpu").manual_seed(seed)
inputs = {
"prompt": "A painting of a squirrel eating a burger",
"negative_prompt": "bad, ugly",
"generator": generator,
"num_inference_steps": 2,
"guidance_scale": 5.0,
"height": 16,
"width": 16,
"max_sequence_length": 48,
"output_type": "np",
}
return inputs
def test_encode_prompt_works_in_isolation(self):
pass
def test_bria_different_prompts(self):
pipe = self.pipeline_class(**self.get_dummy_components()).to(torch_device)
inputs = self.get_dummy_inputs(torch_device)
output_same_prompt = pipe(**inputs).images[0]
inputs = self.get_dummy_inputs(torch_device)
inputs["prompt"] = "a different prompt"
output_different_prompts = pipe(**inputs).images[0]
max_diff = np.abs(output_same_prompt - output_different_prompts).max()
assert max_diff > 1e-6
def test_image_output_shape(self):
pipe = self.pipeline_class(**self.get_dummy_components()).to(torch_device)
inputs = self.get_dummy_inputs(torch_device)
height_width_pairs = [(32, 32), (72, 57)]
for height, width in height_width_pairs:
expected_height = height - height % (pipe.vae_scale_factor * 2)
expected_width = width - width % (pipe.vae_scale_factor * 2)
inputs.update({"height": height, "width": width})
image = pipe(**inputs).images[0]
output_height, output_width, _ = image.shape
assert (output_height, output_width) == (expected_height, expected_width)
@unittest.skipIf(torch_device not in ["cuda", "xpu"], reason="float16 requires CUDA or XPU")
@require_torch_accelerator
def test_save_load_float16(self, expected_max_diff=1e-2):
components = self.get_dummy_components()
for name, module in components.items():
if hasattr(module, "half"):
components[name] = module.to(torch_device).half()
pipe = self.pipeline_class(**components)
for component in pipe.components.values():
if hasattr(component, "set_default_attn_processor"):
component.set_default_attn_processor()
pipe.to(torch_device)
pipe.set_progress_bar_config(disable=None)
inputs = self.get_dummy_inputs(torch_device)
output = pipe(**inputs)[0]
with tempfile.TemporaryDirectory() as tmpdir:
pipe.save_pretrained(tmpdir)
pipe_loaded = self.pipeline_class.from_pretrained(tmpdir, torch_dtype=torch.float16)
for component in pipe_loaded.components.values():
if hasattr(component, "set_default_attn_processor"):
component.set_default_attn_processor()
pipe_loaded.to(torch_device)
pipe_loaded.set_progress_bar_config(disable=None)
for name, component in pipe_loaded.components.items():
if name == "vae":
continue
if hasattr(component, "dtype"):
self.assertTrue(
component.dtype == torch.float16,
f"`{name}.dtype` switched from `float16` to {component.dtype} after loading.",
)
inputs = self.get_dummy_inputs(torch_device)
output_loaded = pipe_loaded(**inputs)[0]
max_diff = np.abs(to_np(output) - to_np(output_loaded)).max()
self.assertLess(
max_diff, expected_max_diff, "The output of the fp16 pipeline changed after saving and loading."
)
def test_bria_image_output_shape(self):
pipe = self.pipeline_class(**self.get_dummy_components()).to(torch_device)
inputs = self.get_dummy_inputs(torch_device)
height_width_pairs = [(16, 16), (32, 32), (64, 64)]
for height, width in height_width_pairs:
expected_height = height - height % (pipe.vae_scale_factor * 2)
expected_width = width - width % (pipe.vae_scale_factor * 2)
inputs.update({"height": height, "width": width})
image = pipe(**inputs).images[0]
output_height, output_width, _ = image.shape
assert (output_height, output_width) == (expected_height, expected_width)
def test_to_dtype(self):
components = self.get_dummy_components()
pipe = self.pipeline_class(**components)
pipe.set_progress_bar_config(disable=None)
model_dtypes = [component.dtype for component in components.values() if hasattr(component, "dtype")]
self.assertTrue([dtype == torch.float32 for dtype in model_dtypes] == [True, True, True])
def test_torch_dtype_dict(self):
components = self.get_dummy_components()
pipe = self.pipeline_class(**components)
with tempfile.TemporaryDirectory() as tmpdirname:
pipe.save_pretrained(tmpdirname)
torch_dtype_dict = {"transformer": torch.bfloat16, "default": torch.float16}
loaded_pipe = self.pipeline_class.from_pretrained(tmpdirname, torch_dtype=torch_dtype_dict)
self.assertEqual(loaded_pipe.transformer.dtype, torch.bfloat16)
self.assertEqual(loaded_pipe.text_encoder.dtype, torch.float16)
self.assertEqual(loaded_pipe.vae.dtype, torch.float16)
with tempfile.TemporaryDirectory() as tmpdirname:
pipe.save_pretrained(tmpdirname)
torch_dtype_dict = {"default": torch.float16}
loaded_pipe = self.pipeline_class.from_pretrained(tmpdirname, torch_dtype=torch_dtype_dict)
self.assertEqual(loaded_pipe.transformer.dtype, torch.float16)
self.assertEqual(loaded_pipe.text_encoder.dtype, torch.float16)
self.assertEqual(loaded_pipe.vae.dtype, torch.float16)
@slow
@require_torch_accelerator
class BriaPipelineSlowTests(unittest.TestCase):
pipeline_class = BriaPipeline
repo_id = "briaai/BRIA-3.2"
def setUp(self):
super().setUp()
gc.collect()
backend_empty_cache(torch_device)
def tearDown(self):
super().tearDown()
gc.collect()
backend_empty_cache(torch_device)
def get_inputs(self, device, seed=0):
generator = torch.Generator(device="cpu").manual_seed(seed)
prompt_embeds = torch.load(
hf_hub_download(repo_id="diffusers/test-slices", repo_type="dataset", filename="flux/prompt_embeds.pt")
).to(torch_device)
return {
"prompt_embeds": prompt_embeds,
"num_inference_steps": 2,
"guidance_scale": 0.0,
"max_sequence_length": 256,
"output_type": "np",
"generator": generator,
}
def test_bria_inference_bf16(self):
pipe = self.pipeline_class.from_pretrained(
self.repo_id, torch_dtype=torch.bfloat16, text_encoder=None, tokenizer=None
)
pipe.to(torch_device)
inputs = self.get_inputs(torch_device)
image = pipe(**inputs).images[0]
image_slice = image[0, :10, :10].flatten()
expected_slice = np.array(
[
0.59729785,
0.6153719,
0.595112,
0.5884763,
0.59366125,
0.5795311,
0.58325,
0.58449626,
0.57737637,
0.58432233,
0.5867875,
0.57824117,
0.5819089,
0.5830988,
0.57730293,
0.57647324,
0.5769151,
0.57312685,
0.57926565,
0.5823928,
0.57783926,
0.57162863,
0.575649,
0.5745547,
0.5740556,
0.5799735,
0.57799566,
0.5715559,
0.5771242,
0.5773058,
],
dtype=np.float32,
)
max_diff = numpy_cosine_similarity_distance(expected_slice, image_slice)
self.assertLess(max_diff, 1e-4, f"Image slice is different from expected slice: {max_diff:.4f}")