import os from typing import List, Dict, Union from tqdm import tqdm import torch import safetensors from huggingface_hub import hf_hub_download from transformers import AutoTokenizer, CLIPTextModelWithProjection from diffusers import ( StableDiffusionXLPipeline, UNet2DConditionModel, EulerDiscreteScheduler, ) from diffusers.loaders import LoraLoaderMixin SDXL_REPO = "stabilityai/stable-diffusion-xl-base-1.0" JSDXL_REPO = "stabilityai/japanese-stable-diffusion-xl" L_REPO = "ByteDance/SDXL-Lightning" MERGED_FILE = "evosdxl_jp_v1.safetensors" def load_state_dict(checkpoint_file: Union[str, os.PathLike], device: str = "cpu"): file_extension = os.path.basename(checkpoint_file).split(".")[-1] if file_extension == "safetensors": return safetensors.torch.load_file(checkpoint_file, device=device) else: return torch.load(checkpoint_file, map_location=device) def load_from_pretrained( repo_id, filename="diffusion_pytorch_model.fp16.safetensors", subfolder="unet", device="cuda", ) -> Dict[str, torch.Tensor]: return load_state_dict( hf_hub_download( repo_id=repo_id, filename=filename, subfolder=subfolder, ), device=device, ) def reshape_weight_task_tensors(task_tensors, weights): """ Reshapes `weights` to match the shape of `task_tensors` by unsqeezing in the remaining dimenions. Args: task_tensors (`torch.Tensor`): The tensors that will be used to reshape `weights`. weights (`torch.Tensor`): The tensor to be reshaped. Returns: `torch.Tensor`: The reshaped tensor. """ new_shape = weights.shape + (1,) * (task_tensors.dim() - weights.dim()) weights = weights.view(new_shape) return weights def linear(task_tensors: List[torch.Tensor], weights: torch.Tensor) -> torch.Tensor: """ Merge the task tensors using `linear`. Args: task_tensors(`List[torch.Tensor]`):The task tensors to merge. weights (`torch.Tensor`):The weights of the task tensors. Returns: `torch.Tensor`: The merged tensor. """ task_tensors = torch.stack(task_tensors, dim=0) # weighted task tensors weights = reshape_weight_task_tensors(task_tensors, weights) weighted_task_tensors = task_tensors * weights mixed_task_tensors = weighted_task_tensors.sum(dim=0) return mixed_task_tensors def merge_models( task_tensors, weights, ): keys = list(task_tensors[0].keys()) weights = torch.tensor(weights, device=task_tensors[0][keys[0]].device) state_dict = {} for key in tqdm(keys, desc="Merging"): w_list = [] for i, sd in enumerate(task_tensors): w = sd.pop(key) w_list.append(w) new_w = linear(task_tensors=w_list, weights=weights) state_dict[key] = new_w return state_dict def split_conv_attn(weights): attn_tensors = {} conv_tensors = {} for key in list(weights.keys()): if any(k in key for k in ["to_k", "to_q", "to_v", "to_out.0"]): attn_tensors[key] = weights.pop(key) else: conv_tensors[key] = weights.pop(key) return {"conv": conv_tensors, "attn": attn_tensors} def merge_evosdxl_jp(device="cpu") -> StableDiffusionXLPipeline: sdxl_weights = split_conv_attn(load_from_pretrained(SDXL_REPO, device=device)) dpo_weights = split_conv_attn( load_from_pretrained( "mhdang/dpo-sdxl-text2image-v1", "diffusion_pytorch_model.safetensors", device=device, ) ) jn_weights = split_conv_attn( load_from_pretrained("RunDiffusion/Juggernaut-XL-v9", device=device) ) jsdxl_weights = split_conv_attn(load_from_pretrained(JSDXL_REPO, device=device)) tensors = [sdxl_weights, dpo_weights, jn_weights, jsdxl_weights] new_conv = merge_models( [sd["conv"] for sd in tensors], [ 0.15928833971605916, 0.1032449268871776, 0.6503217149752791, 0.08714501842148402, ], ) new_attn = merge_models( [sd["attn"] for sd in tensors], [ 0.1877279276437178, 0.20014114603909822, 0.3922685507065275, 0.2198623756106564, ], ) del sdxl_weights, dpo_weights, jn_weights, jsdxl_weights torch.cuda.empty_cache() unet_config = UNet2DConditionModel.load_config(SDXL_REPO, subfolder="unet") unet = UNet2DConditionModel.from_config(unet_config).to(device=device) unet.load_state_dict({**new_conv, **new_attn}) state_dict, network_alphas = LoraLoaderMixin.lora_state_dict( L_REPO, weight_name="sdxl_lightning_4step_lora.safetensors" ) LoraLoaderMixin.load_lora_into_unet(state_dict, network_alphas, unet) unet.fuse_lora(lora_scale=3.224682864579401) new_weights = split_conv_attn(unet.state_dict()) l_weights = split_conv_attn( load_from_pretrained( L_REPO, "sdxl_lightning_4step_unet.safetensors", subfolder=None, device=device, ) ) jnl_weights = split_conv_attn( load_from_pretrained( "RunDiffusion/Juggernaut-XL-Lightning", "diffusion_pytorch_model.bin", device=device, ) ) tensors = [l_weights, jnl_weights, new_weights] new_conv = merge_models( [sd["conv"] for sd in tensors], [0.47222002022088533, 0.48419531030361584, 0.04358466947549889], ) new_attn = merge_models( [sd["attn"] for sd in tensors], [0.023119324530758375, 0.04924981616469831, 0.9276308593045434], ) new_weights = {**new_conv, **new_attn} safetensors.torch.save_file(new_weights, MERGED_FILE) def load_evosdxl_jp(device="cuda"): unet_config = UNet2DConditionModel.load_config(SDXL_REPO, subfolder="unet") unet = UNet2DConditionModel.from_config(unet_config).to(device=device) unet.load_state_dict(safetensors.torch.load_file(MERGED_FILE)) text_encoder = CLIPTextModelWithProjection.from_pretrained( JSDXL_REPO, subfolder="text_encoder", torch_dtype=torch.float16, variant="fp16" ) tokenizer = AutoTokenizer.from_pretrained( JSDXL_REPO, subfolder="tokenizer", use_fast=False ) pipe = StableDiffusionXLPipeline.from_pretrained( SDXL_REPO, unet=unet, text_encoder=text_encoder, tokenizer=tokenizer, torch_dtype=torch.float16, variant="fp16", ) # Ensure sampler uses "trailing" timesteps. pipe.scheduler = EulerDiscreteScheduler.from_config( pipe.scheduler.config, timestep_spacing="trailing" ) pipe = pipe.to(device, dtype=torch.float16) return pipe if __name__ == "__main__": if not os.path.exists(MERGED_FILE): merge_evosdxl_jp() pipe: StableDiffusionXLPipeline = load_evosdxl_jp() images = pipe("犬", num_inference_steps=4, guidance_scale=0).images images[0].save("out.png")