SchwarzXia
Support for benchmarking of diffusion models (#31)
56a3a83 unverified
import gc
import time
from diffusers import AutoPipelineForText2Image, DiffusionPipeline
import numpy as np
from PIL import Image
import torch
from transformers.trainer_utils import set_seed
import tyro
from zeus.monitor import ZeusMonitor
from utils import get_logger, CsvHandler
from metrics import load_prompts, calculate_clip_score
# default parameters
DEVICE = "cuda:0"
WEIGHT_DTYPE = torch.float16
SEED = 0
OUTPUT_FILE = "results.csv"
OUTPUT_IMAGES = "images/"
def get_pipeline(model, device=DEVICE, weight_dtype=WEIGHT_DTYPE):
try:
return AutoPipelineForText2Image.from_pretrained(
model, torch_dtype=weight_dtype, safety_checker=None
).to(device)
except ValueError:
return DiffusionPipeline.from_pretrained(
model, torch_dtype=weight_dtype, safety_checker=None
).to(device)
def gpu_warmup(pipeline):
"""Warm up the GPU by running the given pipeline for 10 secs."""
logger = get_logger()
logger.info("Warming up GPU")
generator = torch.manual_seed(2)
timeout_start = time.time()
prompts, _ = load_prompts(1, 1)
while time.time() < timeout_start + 10:
_ = pipeline(
prompts, num_images_per_prompt=10, generator=generator, output_type="numpy"
).images
logger.info("Finished warming up GPU")
def benchmark(
model: str,
benchmark_size: int = 0,
batch_size: int = 1,
result_file: str = OUTPUT_FILE,
images_path: str = OUTPUT_IMAGES,
device: str = DEVICE,
seed: int = SEED,
weight_dtype: torch.dtype = WEIGHT_DTYPE,
write_header: bool = False,
warmup: bool = False,
settings: dict = {},
) -> None:
"""Benchmarks given model with a set of parameters.
Args:
model: The name of the model to benchmark, as shown on HuggingFace.
benchmark_size: The number of prompts to benchmark on. If 0, benchmarks
the entire parti-prompts dataset.
batch_size: The size of each batch of prompts. When benchmarking, the
prompts are split into batches of this size, and prompts are fed into
the model in batches.
result_file: The path to the output csv file.
images_path: The path to the output images directory.
device: The device to run the benchmark on.
seed: The seed to use for the RNG.
weight_dtype: The weight dtype to use for the model.
write_header: Whether to write the header row to the output csv file,
recommended to be True for the first run.
warmup: Whether to warm up the GPU before running the benchmark,
recommended to be True for the first run of a model.
settings: Any additional settings to pass to the pipeline, supports
any keyword parameters accepted by the model chosen. See HuggingFace
documentation on particular models for more details.
"""
logger = get_logger()
logger.info("Running benchmark for model: " + model)
csv_handler = CsvHandler(result_file)
if write_header:
csv_handler.write_header(
[
"model",
"GPU",
"num_prompts",
"batch_size",
"clip_score",
"average_batch_latency(s)",
"throughput(image/s)",
"avg_energy(J)",
"peak_memory(GB)",
]
)
set_seed(seed)
prompts, batched_prompts = load_prompts(benchmark_size, batch_size)
logger.info("Loaded prompts")
generator = torch.manual_seed(seed)
torch.cuda.set_device(device)
monitor = ZeusMonitor(gpu_indices=[torch.cuda.current_device()])
pipeline = get_pipeline(model, device=device, weight_dtype=weight_dtype)
if warmup:
gpu_warmup(pipeline)
torch.cuda.empty_cache()
gc.collect()
torch.cuda.reset_peak_memory_stats(device=device)
monitor.begin_window("generate")
images = []
for batch in batched_prompts:
image = pipeline(
batch, generator=generator, output_type="np", **settings
).images
images.append(image)
images = np.concatenate(images)
result_monitor = monitor.end_window("generate")
peak_memory = torch.cuda.max_memory_allocated(device=device)
for saved_image, saved_prompt in zip(images[::10], prompts[::10]):
saved_image = (saved_image * 255).astype(np.uint8)
Image.fromarray(saved_image).save(images_path + saved_prompt + ".png")
clip_score = calculate_clip_score(images, prompts)
result = {
"model": model,
"GPU": torch.cuda.get_device_name(device),
"num_prompts": len(prompts),
"batch_size": batch_size,
"clip_score": clip_score,
"avg_batch_latency": result_monitor.time / (benchmark_size / batch_size),
"throughput": benchmark_size / result_monitor.time,
"avg_energy": result_monitor.total_energy / benchmark_size,
"peak_memory": peak_memory,
}
logger.info("Results for model " + model + ":")
logger.info(result)
csv_handler.write_results(result)
logger.info("Finished benchmarking for " + model)
if __name__ == "__main__":
tyro.cli(benchmark)