Diffusers documentation

分布式推理

You are viewing main version, which requires installation from source. If you'd like regular pip install, checkout the latest stable version (v0.35.1).
Hugging Face's logo
Join the Hugging Face community

and get access to the augmented documentation experience

to get started

分布式推理

在分布式设置中,您可以使用 🤗 AcceleratePyTorch Distributed 在多个 GPU 上运行推理,这对于并行生成多个提示非常有用。

本指南将向您展示如何使用 🤗 Accelerate 和 PyTorch Distributed 进行分布式推理。

🤗 Accelerate

🤗 Accelerate 是一个旨在简化在分布式设置中训练或运行推理的库。它简化了设置分布式环境的过程,让您可以专注于您的 PyTorch 代码。

首先,创建一个 Python 文件并初始化一个 accelerate.PartialState 来创建分布式环境;您的设置会自动检测,因此您无需明确定义 rankworld_size。将 DiffusionPipeline 移动到 distributed_state.device 以为每个进程分配一个 GPU。

现在使用 split_between_processes 实用程序作为上下文管理器,自动在进程数之间分发提示。

import torch
from accelerate import PartialState
from diffusers import DiffusionPipeline

pipeline = DiffusionPipeline.from_pretrained(
    "stable-diffusion-v1-5/stable-diffusion-v1-5", torch_dtype=torch.float16, use_safetensors=True
)
distributed_state = PartialState()
pipeline.to(distributed_state.device)

with distributed_state.split_between_processes(["a dog", "a cat"]) as prompt:
    result = pipeline(prompt).images[0]
    result.save(f"result_{distributed_state.process_index}.png")

使用 --num_processes 参数指定要使用的 GPU 数量,并调用 accelerate launch 来运行脚本:

accelerate launch run_distributed.py --num_processes=2

参考这个最小示例 脚本 以在多个 GPU 上运行推理。要了解更多信息,请查看 使用 🤗 Accelerate 进行分布式推理 指南。

PyTorch Distributed

PyTorch 支持 DistributedDataParallel,它启用了数据 并行性。

首先,创建一个 Python 文件并导入 torch.distributedtorch.multiprocessing 来设置分布式进程组,并为每个 GPU 上的推理生成进程。您还应该初始化一个 DiffusionPipeline

import torch
import torch.distributed as dist
import torch.multiprocessing as mp

from diffusers import DiffusionPipeline

sd = DiffusionPipeline.from_pretrained(
    "stable-diffusion-v1-5/stable-diffusion-v1-5", torch_dtype=torch.float16, use_safetensors=True
)

您需要创建一个函数来运行推理;init_process_group 处理创建一个分布式环境,指定要使用的后端类型、当前进程的 rank 以及参与进程的数量 world_size。如果您在 2 个 GPU 上并行运行推理,那么 world_size 就是 2。

DiffusionPipeline 移动到 rank,并使用 get_rank 为每个进程分配一个 GPU,其中每个进程处理不同的提示:

def run_inference(rank, world_size):
    dist.init_process_group("nccl", rank=rank, world_size=world_size)

    sd.to(rank)

    if torch.distributed.get_rank() == 0:
        prompt = "a dog"
    elif torch.distributed.get_rank() == 1:
        prompt = "a cat"

    image = sd(prompt).images[0]
    image.save(f"./{'_'.join(prompt)}.png")

要运行分布式推理,调用 mp.spawnworld_size 定义的 GPU 数量上运行 run_inference 函数:

def main():
    world_size = 2
    mp.spawn(run_inference, args=(world_size,), nprocs=world_size, join=True)


if __name__ == "__main__":
    main()

完成推理脚本后,使用 --nproc_per_node 参数指定要使用的 GPU 数量,并调用 torchrun 来运行脚本:

torchrun run_distributed.py --nproc_per_node=2

您可以在 DiffusionPipeline 中使用 device_map 将其模型级组件分布在多个设备上。请参考 设备放置 指南了解更多信息。

模型分片

现代扩散系统,如 Flux,非常大且包含多个模型。例如,Flux.1-Dev 由两个文本编码器 - T5-XXLCLIP-L - 一个 扩散变换器,以及一个 VAE 组成。对于如此大的模型,在消费级 GPU 上运行推理可能具有挑战性。

模型分片是一种技术,当模型无法容纳在单个 GPU 上时,将模型分布在多个 GPU 上。下面的示例假设有两个 16GB GPU 可用于推理。

开始使用文本编码器计算文本嵌入。通过设置 device_map="balanced" 将文本编码器保持在两个GPU上。balanced 策略将模型均匀分布在所有可用GPU上。使用 max_memory 参数为每个GPU上的每个文本编码器分配最大内存量。

在此步骤加载文本编码器!扩散变换器和VAE在后续步骤中加载以节省内存。

from diffusers import FluxPipeline
import torch

prompt = "a photo of a dog with cat-like look"

pipeline = FluxPipeline.from_pretrained(
    "black-forest-labs/FLUX.1-dev",
    transformer=None,
    vae=None,
    device_map="balanced",
    max_memory={0: "16GB", 1: "16GB"},
    torch_dtype=torch.bfloat16
)
with torch.no_grad():
    print("Encoding prompts.")
    prompt_embeds, pooled_prompt_embeds, text_ids = pipeline.encode_prompt(
        prompt=prompt, prompt_2=None, max_sequence_length=512
    )

一旦文本嵌入计算完成,从GPU中移除它们以为扩散变换器腾出空间。

import gc 

def flush():
    gc.collect()
    torch.cuda.empty_cache()
    torch.cuda.reset_max_memory_allocated()
    torch.cuda.reset_peak_memory_stats()

del pipeline.text_encoder
del pipeline.text_encoder_2
del pipeline.tokenizer
del pipeline.tokenizer_2
del pipeline

flush()

接下来加载扩散变换器,它有125亿参数。这次,设置 device_map="auto" 以自动将模型分布在两个16GB GPU上。auto 策略由 Accelerate 支持,并作为 大模型推理 功能的一部分可用。它首先将模型分布在最快的设备(GPU)上,然后在需要时移动到较慢的设备如CPU和硬盘。将模型参数存储在较慢设备上的权衡是推理延迟较慢。

from diffusers import AutoModel
import torch 

transformer = AutoModel.from_pretrained(
    "black-forest-labs/FLUX.1-dev", 
    subfolder="transformer",
    device_map="auto",
    torch_dtype=torch.bfloat16
)

在任何时候,您可以尝试 print(pipeline.hf_device_map) 来查看各种模型如何在设备上分布。这对于跟踪模型的设备放置很有用。您也可以尝试 print(transformer.hf_device_map) 来查看变换器模型如何在设备上分片。

将变换器模型添加到管道中以进行去噪,但将其他模型级组件如文本编码器和VAE设置为 None,因为您还不需要它们。

pipeline = FluxPipeline.from_pretrained(
    "black-forest-labs/FLUX.1-dev",
    text_encoder=None,
    text_encoder_2=None,
    tokenizer=None,
    tokenizer_2=None,
    vae=None,
    transformer=transformer,
    torch_dtype=torch.bfloat16
)

print("Running denoising.")
height, width = 768, 1360
latents = pipeline(
   
     
prompt_embeds=prompt_embeds,
pooled_prompt_embeds=pooled_prompt_embeds,
num_inference_steps=50,
guidance_scale=3.5,
height=height,
width=width,
output_type="latent",
).images

从内存中移除管道和变换器,因为它们不再需要。

del pipeline.transformer
del pipeline

flush()

最后,使用变分自编码器(VAE)将潜在表示解码为图像。VAE通常足够小,可以在单个GPU上加载。

from diffusers import AutoencoderKL
from diffusers.image_processor import VaeImageProcessor
import torch 

vae = AutoencoderKL.from_pretrained(ckpt_id, subfolder="vae", torch_dtype=torch.bfloat16).to("cuda")
vae_scale_factor = 2 ** (len(vae.config.block_out_channels))
image_processor = VaeImageProcessor(vae_scale_factor=vae_scale_factor)

with torch.no_grad():
    print("运行解码中。")
    latents = FluxPipeline._unpack_latents(latents, height, width, vae_scale_factor)
    latents = (latents / vae.config.scaling_factor) + vae.config.shift_factor

    image = vae.decode(latents, return_dict=False)[0]
    image = image_processor.postprocess(image, output_type="pil")
    image[0].save("split_transformer.png")

通过选择性加载和卸载在特定阶段所需的模型,并将最大模型分片到多个GPU上,可以在消费级GPU上运行大型模型的推理。

< > Update on GitHub