Spaces:
Running
on
A10G
Running
on
A10G
import argparse | |
import huggingface_hub | |
import k_diffusion as K | |
import torch | |
from diffusers import UNet2DConditionModel | |
UPSCALER_REPO = "pcuenq/k-upscaler" | |
def resnet_to_diffusers_checkpoint(resnet, checkpoint, *, diffusers_resnet_prefix, resnet_prefix): | |
rv = { | |
# norm1 | |
f"{diffusers_resnet_prefix}.norm1.linear.weight": checkpoint[f"{resnet_prefix}.main.0.mapper.weight"], | |
f"{diffusers_resnet_prefix}.norm1.linear.bias": checkpoint[f"{resnet_prefix}.main.0.mapper.bias"], | |
# conv1 | |
f"{diffusers_resnet_prefix}.conv1.weight": checkpoint[f"{resnet_prefix}.main.2.weight"], | |
f"{diffusers_resnet_prefix}.conv1.bias": checkpoint[f"{resnet_prefix}.main.2.bias"], | |
# norm2 | |
f"{diffusers_resnet_prefix}.norm2.linear.weight": checkpoint[f"{resnet_prefix}.main.4.mapper.weight"], | |
f"{diffusers_resnet_prefix}.norm2.linear.bias": checkpoint[f"{resnet_prefix}.main.4.mapper.bias"], | |
# conv2 | |
f"{diffusers_resnet_prefix}.conv2.weight": checkpoint[f"{resnet_prefix}.main.6.weight"], | |
f"{diffusers_resnet_prefix}.conv2.bias": checkpoint[f"{resnet_prefix}.main.6.bias"], | |
} | |
if resnet.conv_shortcut is not None: | |
rv.update( | |
{ | |
f"{diffusers_resnet_prefix}.conv_shortcut.weight": checkpoint[f"{resnet_prefix}.skip.weight"], | |
} | |
) | |
return rv | |
def self_attn_to_diffusers_checkpoint(checkpoint, *, diffusers_attention_prefix, attention_prefix): | |
weight_q, weight_k, weight_v = checkpoint[f"{attention_prefix}.qkv_proj.weight"].chunk(3, dim=0) | |
bias_q, bias_k, bias_v = checkpoint[f"{attention_prefix}.qkv_proj.bias"].chunk(3, dim=0) | |
rv = { | |
# norm | |
f"{diffusers_attention_prefix}.norm1.linear.weight": checkpoint[f"{attention_prefix}.norm_in.mapper.weight"], | |
f"{diffusers_attention_prefix}.norm1.linear.bias": checkpoint[f"{attention_prefix}.norm_in.mapper.bias"], | |
# to_q | |
f"{diffusers_attention_prefix}.attn1.to_q.weight": weight_q.squeeze(-1).squeeze(-1), | |
f"{diffusers_attention_prefix}.attn1.to_q.bias": bias_q, | |
# to_k | |
f"{diffusers_attention_prefix}.attn1.to_k.weight": weight_k.squeeze(-1).squeeze(-1), | |
f"{diffusers_attention_prefix}.attn1.to_k.bias": bias_k, | |
# to_v | |
f"{diffusers_attention_prefix}.attn1.to_v.weight": weight_v.squeeze(-1).squeeze(-1), | |
f"{diffusers_attention_prefix}.attn1.to_v.bias": bias_v, | |
# to_out | |
f"{diffusers_attention_prefix}.attn1.to_out.0.weight": checkpoint[f"{attention_prefix}.out_proj.weight"] | |
.squeeze(-1) | |
.squeeze(-1), | |
f"{diffusers_attention_prefix}.attn1.to_out.0.bias": checkpoint[f"{attention_prefix}.out_proj.bias"], | |
} | |
return rv | |
def cross_attn_to_diffusers_checkpoint( | |
checkpoint, *, diffusers_attention_prefix, diffusers_attention_index, attention_prefix | |
): | |
weight_k, weight_v = checkpoint[f"{attention_prefix}.kv_proj.weight"].chunk(2, dim=0) | |
bias_k, bias_v = checkpoint[f"{attention_prefix}.kv_proj.bias"].chunk(2, dim=0) | |
rv = { | |
# norm2 (ada groupnorm) | |
f"{diffusers_attention_prefix}.norm{diffusers_attention_index}.linear.weight": checkpoint[ | |
f"{attention_prefix}.norm_dec.mapper.weight" | |
], | |
f"{diffusers_attention_prefix}.norm{diffusers_attention_index}.linear.bias": checkpoint[ | |
f"{attention_prefix}.norm_dec.mapper.bias" | |
], | |
# layernorm on encoder_hidden_state | |
f"{diffusers_attention_prefix}.attn{diffusers_attention_index}.norm_cross.weight": checkpoint[ | |
f"{attention_prefix}.norm_enc.weight" | |
], | |
f"{diffusers_attention_prefix}.attn{diffusers_attention_index}.norm_cross.bias": checkpoint[ | |
f"{attention_prefix}.norm_enc.bias" | |
], | |
# to_q | |
f"{diffusers_attention_prefix}.attn{diffusers_attention_index}.to_q.weight": checkpoint[ | |
f"{attention_prefix}.q_proj.weight" | |
] | |
.squeeze(-1) | |
.squeeze(-1), | |
f"{diffusers_attention_prefix}.attn{diffusers_attention_index}.to_q.bias": checkpoint[ | |
f"{attention_prefix}.q_proj.bias" | |
], | |
# to_k | |
f"{diffusers_attention_prefix}.attn{diffusers_attention_index}.to_k.weight": weight_k.squeeze(-1).squeeze(-1), | |
f"{diffusers_attention_prefix}.attn{diffusers_attention_index}.to_k.bias": bias_k, | |
# to_v | |
f"{diffusers_attention_prefix}.attn{diffusers_attention_index}.to_v.weight": weight_v.squeeze(-1).squeeze(-1), | |
f"{diffusers_attention_prefix}.attn{diffusers_attention_index}.to_v.bias": bias_v, | |
# to_out | |
f"{diffusers_attention_prefix}.attn{diffusers_attention_index}.to_out.0.weight": checkpoint[ | |
f"{attention_prefix}.out_proj.weight" | |
] | |
.squeeze(-1) | |
.squeeze(-1), | |
f"{diffusers_attention_prefix}.attn{diffusers_attention_index}.to_out.0.bias": checkpoint[ | |
f"{attention_prefix}.out_proj.bias" | |
], | |
} | |
return rv | |
def block_to_diffusers_checkpoint(block, checkpoint, block_idx, block_type): | |
block_prefix = "inner_model.u_net.u_blocks" if block_type == "up" else "inner_model.u_net.d_blocks" | |
block_prefix = f"{block_prefix}.{block_idx}" | |
diffusers_checkpoint = {} | |
if not hasattr(block, "attentions"): | |
n = 1 # resnet only | |
elif not block.attentions[0].add_self_attention: | |
n = 2 # resnet -> cross-attention | |
else: | |
n = 3 # resnet -> self-attention -> cross-attention) | |
for resnet_idx, resnet in enumerate(block.resnets): | |
# diffusers_resnet_prefix = f"{diffusers_up_block_prefix}.resnets.{resnet_idx}" | |
diffusers_resnet_prefix = f"{block_type}_blocks.{block_idx}.resnets.{resnet_idx}" | |
idx = n * resnet_idx if block_type == "up" else n * resnet_idx + 1 | |
resnet_prefix = f"{block_prefix}.{idx}" if block_type == "up" else f"{block_prefix}.{idx}" | |
diffusers_checkpoint.update( | |
resnet_to_diffusers_checkpoint( | |
resnet, checkpoint, diffusers_resnet_prefix=diffusers_resnet_prefix, resnet_prefix=resnet_prefix | |
) | |
) | |
if hasattr(block, "attentions"): | |
for attention_idx, attention in enumerate(block.attentions): | |
diffusers_attention_prefix = f"{block_type}_blocks.{block_idx}.attentions.{attention_idx}" | |
idx = n * attention_idx + 1 if block_type == "up" else n * attention_idx + 2 | |
self_attention_prefix = f"{block_prefix}.{idx}" | |
cross_attention_prefix = f"{block_prefix}.{idx }" | |
cross_attention_index = 1 if not attention.add_self_attention else 2 | |
idx = ( | |
n * attention_idx + cross_attention_index | |
if block_type == "up" | |
else n * attention_idx + cross_attention_index + 1 | |
) | |
cross_attention_prefix = f"{block_prefix}.{idx }" | |
diffusers_checkpoint.update( | |
cross_attn_to_diffusers_checkpoint( | |
checkpoint, | |
diffusers_attention_prefix=diffusers_attention_prefix, | |
diffusers_attention_index=2, | |
attention_prefix=cross_attention_prefix, | |
) | |
) | |
if attention.add_self_attention is True: | |
diffusers_checkpoint.update( | |
self_attn_to_diffusers_checkpoint( | |
checkpoint, | |
diffusers_attention_prefix=diffusers_attention_prefix, | |
attention_prefix=self_attention_prefix, | |
) | |
) | |
return diffusers_checkpoint | |
def unet_to_diffusers_checkpoint(model, checkpoint): | |
diffusers_checkpoint = {} | |
# pre-processing | |
diffusers_checkpoint.update( | |
{ | |
"conv_in.weight": checkpoint["inner_model.proj_in.weight"], | |
"conv_in.bias": checkpoint["inner_model.proj_in.bias"], | |
} | |
) | |
# timestep and class embedding | |
diffusers_checkpoint.update( | |
{ | |
"time_proj.weight": checkpoint["inner_model.timestep_embed.weight"].squeeze(-1), | |
"time_embedding.linear_1.weight": checkpoint["inner_model.mapping.0.weight"], | |
"time_embedding.linear_1.bias": checkpoint["inner_model.mapping.0.bias"], | |
"time_embedding.linear_2.weight": checkpoint["inner_model.mapping.2.weight"], | |
"time_embedding.linear_2.bias": checkpoint["inner_model.mapping.2.bias"], | |
"time_embedding.cond_proj.weight": checkpoint["inner_model.mapping_cond.weight"], | |
} | |
) | |
# down_blocks | |
for down_block_idx, down_block in enumerate(model.down_blocks): | |
diffusers_checkpoint.update(block_to_diffusers_checkpoint(down_block, checkpoint, down_block_idx, "down")) | |
# up_blocks | |
for up_block_idx, up_block in enumerate(model.up_blocks): | |
diffusers_checkpoint.update(block_to_diffusers_checkpoint(up_block, checkpoint, up_block_idx, "up")) | |
# post-processing | |
diffusers_checkpoint.update( | |
{ | |
"conv_out.weight": checkpoint["inner_model.proj_out.weight"], | |
"conv_out.bias": checkpoint["inner_model.proj_out.bias"], | |
} | |
) | |
return diffusers_checkpoint | |
def unet_model_from_original_config(original_config): | |
in_channels = original_config["input_channels"] + original_config["unet_cond_dim"] | |
out_channels = original_config["input_channels"] + (1 if original_config["has_variance"] else 0) | |
block_out_channels = original_config["channels"] | |
assert ( | |
len(set(original_config["depths"])) == 1 | |
), "UNet2DConditionModel currently do not support blocks with different number of layers" | |
layers_per_block = original_config["depths"][0] | |
class_labels_dim = original_config["mapping_cond_dim"] | |
cross_attention_dim = original_config["cross_cond_dim"] | |
attn1_types = [] | |
attn2_types = [] | |
for s, c in zip(original_config["self_attn_depths"], original_config["cross_attn_depths"]): | |
if s: | |
a1 = "self" | |
a2 = "cross" if c else None | |
elif c: | |
a1 = "cross" | |
a2 = None | |
else: | |
a1 = None | |
a2 = None | |
attn1_types.append(a1) | |
attn2_types.append(a2) | |
unet = UNet2DConditionModel( | |
in_channels=in_channels, | |
out_channels=out_channels, | |
down_block_types=("KDownBlock2D", "KCrossAttnDownBlock2D", "KCrossAttnDownBlock2D", "KCrossAttnDownBlock2D"), | |
mid_block_type=None, | |
up_block_types=("KCrossAttnUpBlock2D", "KCrossAttnUpBlock2D", "KCrossAttnUpBlock2D", "KUpBlock2D"), | |
block_out_channels=block_out_channels, | |
layers_per_block=layers_per_block, | |
act_fn="gelu", | |
norm_num_groups=None, | |
cross_attention_dim=cross_attention_dim, | |
attention_head_dim=64, | |
time_cond_proj_dim=class_labels_dim, | |
resnet_time_scale_shift="scale_shift", | |
time_embedding_type="fourier", | |
timestep_post_act="gelu", | |
conv_in_kernel=1, | |
conv_out_kernel=1, | |
) | |
return unet | |
def main(args): | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
orig_config_path = huggingface_hub.hf_hub_download(UPSCALER_REPO, "config_laion_text_cond_latent_upscaler_2.json") | |
orig_weights_path = huggingface_hub.hf_hub_download( | |
UPSCALER_REPO, "laion_text_cond_latent_upscaler_2_1_00470000_slim.pth" | |
) | |
print(f"loading original model configuration from {orig_config_path}") | |
print(f"loading original model checkpoint from {orig_weights_path}") | |
print("converting to diffusers unet") | |
orig_config = K.config.load_config(open(orig_config_path))["model"] | |
model = unet_model_from_original_config(orig_config) | |
orig_checkpoint = torch.load(orig_weights_path, map_location=device)["model_ema"] | |
converted_checkpoint = unet_to_diffusers_checkpoint(model, orig_checkpoint) | |
model.load_state_dict(converted_checkpoint, strict=True) | |
model.save_pretrained(args.dump_path) | |
print(f"saving converted unet model in {args.dump_path}") | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument("--dump_path", default=None, type=str, required=True, help="Path to the output model.") | |
args = parser.parse_args() | |
main(args) | |