yjhuangcd
First commit
9965bf6
import argparse
import inspect
import torch.nn.functional as F
from music_rule_guidance import music_rules
from . import gaussian_diffusion as gd
from .respace import SpacedDiffusion, space_timesteps
from .unet import SuperResModel, UNetModel, EncoderUNetModel
NUM_CLASSES = 3 # number of datasets
def diffusion_defaults():
"""
Defaults for image and classifier training.
"""
return dict(
learn_sigma=False,
diffusion_steps=1000,
noise_schedule="linear",
timestep_respacing="",
use_kl=False,
predict_xstart=False,
rescale_timesteps=False,
rescale_learned_sigmas=False,
)
def classifier_defaults():
"""
Defaults for classifier models.
"""
return dict(
image_size=64,
in_channels=3,
classifier_use_fp16=False,
classifier_width=128,
classifier_depth=2,
classifier_attention_resolutions="32,16,8", # 16
classifier_use_scale_shift_norm=True, # False
classifier_resblock_updown=True, # False
classifier_pool="attention",
num_classes=3,
chord=False,
)
def model_and_diffusion_image_defaults():
"""
Defaults for image training.
"""
res = dict(
image_size=64,
in_channels=3,
num_channels=128,
num_res_blocks=2,
num_heads=4,
num_heads_upsample=-1,
num_head_channels=-1,
attention_resolutions="32,16,8",
channel_mult="",
dropout=0.0,
class_cond=False,
use_checkpoint=False,
use_scale_shift_norm=True,
resblock_updown=False,
use_fp16=False,
use_new_attention_order=False,
)
res.update(diffusion_defaults())
return res
def model_and_diffusion_defaults():
"""
Defaults for piano roll training.
"""
res = dict(
image_size=128,
in_channels=1,
num_channels=128,
num_res_blocks=2,
num_heads=4,
num_heads_upsample=-1,
num_head_channels=-1,
attention_resolutions="32,16,8",
channel_mult="",
dropout=0.0,
class_cond=False,
use_checkpoint=False,
use_scale_shift_norm=True,
resblock_updown=False,
use_fp16=False,
use_new_attention_order=False,
)
res.update(diffusion_defaults())
return res
def classifier_and_diffusion_defaults():
res = classifier_defaults()
res.update(diffusion_defaults())
return res
def create_diffusion(
learn_sigma,
diffusion_steps,
noise_schedule,
timestep_respacing,
use_kl,
predict_xstart,
rescale_timesteps,
rescale_learned_sigmas,
):
diffusion = create_gaussian_diffusion(
steps=diffusion_steps,
learn_sigma=learn_sigma,
noise_schedule=noise_schedule,
use_kl=use_kl,
predict_xstart=predict_xstart,
rescale_timesteps=rescale_timesteps,
rescale_learned_sigmas=rescale_learned_sigmas,
timestep_respacing=timestep_respacing,
)
return diffusion
def create_model_and_diffusion(
image_size,
in_channels,
class_cond,
learn_sigma,
num_channels,
num_res_blocks,
channel_mult,
num_heads,
num_head_channels,
num_heads_upsample,
attention_resolutions,
dropout,
diffusion_steps,
noise_schedule,
timestep_respacing,
use_kl,
predict_xstart,
rescale_timesteps,
rescale_learned_sigmas,
use_checkpoint,
use_scale_shift_norm,
resblock_updown,
use_fp16,
use_new_attention_order,
):
model = create_model(
image_size,
num_channels,
num_res_blocks,
in_channels,
channel_mult=channel_mult,
learn_sigma=learn_sigma,
class_cond=class_cond,
use_checkpoint=use_checkpoint,
attention_resolutions=attention_resolutions,
num_heads=num_heads,
num_head_channels=num_head_channels,
num_heads_upsample=num_heads_upsample,
use_scale_shift_norm=use_scale_shift_norm,
dropout=dropout,
resblock_updown=resblock_updown,
use_fp16=use_fp16,
use_new_attention_order=use_new_attention_order,
)
diffusion = create_gaussian_diffusion(
steps=diffusion_steps,
learn_sigma=learn_sigma,
noise_schedule=noise_schedule,
use_kl=use_kl,
predict_xstart=predict_xstart,
rescale_timesteps=rescale_timesteps,
rescale_learned_sigmas=rescale_learned_sigmas,
timestep_respacing=timestep_respacing,
)
return model, diffusion
def create_model(
image_size,
num_channels,
num_res_blocks,
in_channels=3,
channel_mult="",
learn_sigma=False,
class_cond=False,
use_checkpoint=False,
attention_resolutions="16",
num_heads=4,
num_head_channels=-1,
num_heads_upsample=-1,
use_scale_shift_norm=False,
dropout=0,
resblock_updown=False,
use_fp16=False,
use_new_attention_order=False,
):
image_size = image_size[-1] # if H != W, use W as image_size
if channel_mult == "":
if image_size == 512:
channel_mult = (0.5, 1, 1, 2, 2, 4, 4)
elif image_size == 256:
channel_mult = (1, 1, 2, 2, 4, 4)
elif image_size == 128:
channel_mult = (1, 1, 2, 3, 4)
elif image_size == 64:
channel_mult = (1, 2, 3, 4)
elif image_size == 32:
channel_mult = (1, 2, 2, 2)
elif image_size == 16:
channel_mult = (1, 2, 2)
else:
raise ValueError(f"unsupported image size: {image_size}")
else:
channel_mult = tuple(int(ch_mult) for ch_mult in channel_mult.split(","))
attention_ds = []
for res in attention_resolutions.split(","):
attention_ds.append(image_size // int(res))
return UNetModel(
image_size=image_size,
in_channels=in_channels,
model_channels=num_channels,
out_channels=(in_channels if not learn_sigma else 2*in_channels),
num_res_blocks=num_res_blocks,
attention_resolutions=tuple(attention_ds),
dropout=dropout,
channel_mult=channel_mult,
num_classes=(NUM_CLASSES if class_cond else None),
use_checkpoint=use_checkpoint,
use_fp16=use_fp16,
num_heads=num_heads,
num_head_channels=num_head_channels,
num_heads_upsample=num_heads_upsample,
use_scale_shift_norm=use_scale_shift_norm,
resblock_updown=resblock_updown,
use_new_attention_order=use_new_attention_order,
)
def create_classifier_and_diffusion(
image_size,
in_channels,
classifier_use_fp16,
classifier_width,
classifier_depth,
classifier_attention_resolutions,
classifier_use_scale_shift_norm,
classifier_resblock_updown,
classifier_pool,
learn_sigma,
diffusion_steps,
noise_schedule,
timestep_respacing,
use_kl,
predict_xstart,
rescale_timesteps,
rescale_learned_sigmas,
num_classes,
chord,
):
classifier = create_classifier(
image_size,
in_channels,
classifier_use_fp16,
classifier_width,
classifier_depth,
classifier_attention_resolutions,
classifier_use_scale_shift_norm,
classifier_resblock_updown,
classifier_pool,
num_classes,
chord,
)
diffusion = create_gaussian_diffusion(
steps=diffusion_steps,
learn_sigma=learn_sigma,
noise_schedule=noise_schedule,
use_kl=use_kl,
predict_xstart=predict_xstart,
rescale_timesteps=rescale_timesteps,
rescale_learned_sigmas=rescale_learned_sigmas,
timestep_respacing=timestep_respacing,
)
return classifier, diffusion
def create_classifier(
image_size,
in_channels,
classifier_use_fp16,
classifier_width,
classifier_depth,
classifier_attention_resolutions,
classifier_use_scale_shift_norm,
classifier_resblock_updown,
classifier_pool,
num_classes,
chord,
):
image_size = image_size[-1] # if H != W, use W as image_size
if image_size == 512:
channel_mult = (0.5, 1, 1, 2, 2, 4, 4)
elif image_size == 256:
channel_mult = (1, 1, 2, 2, 4, 4)
elif image_size == 128:
channel_mult = (1, 1, 2, 3, 4)
elif image_size == 64:
channel_mult = (1, 2, 3, 4)
elif image_size == 16: # debug data load in
channel_mult = (1, 2, 2)
else:
raise ValueError(f"unsupported image size: {image_size}")
attention_ds = []
for res in classifier_attention_resolutions.split(","):
attention_ds.append(image_size // int(res))
return EncoderUNetModel(
image_size=image_size,
in_channels=in_channels,
model_channels=classifier_width,
out_channels=num_classes,
num_res_blocks=classifier_depth,
attention_resolutions=tuple(attention_ds),
channel_mult=channel_mult,
use_fp16=classifier_use_fp16,
num_head_channels=64,
use_scale_shift_norm=classifier_use_scale_shift_norm,
resblock_updown=classifier_resblock_updown,
pool=classifier_pool,
chord=chord,
)
def sr_model_and_diffusion_defaults():
res = model_and_diffusion_defaults()
res["large_size"] = 256
res["small_size"] = 64
arg_names = inspect.getfullargspec(sr_create_model_and_diffusion)[0]
for k in res.copy().keys():
if k not in arg_names:
del res[k]
return res
def sr_create_model_and_diffusion(
large_size,
small_size,
class_cond,
learn_sigma,
num_channels,
num_res_blocks,
num_heads,
num_head_channels,
num_heads_upsample,
attention_resolutions,
dropout,
diffusion_steps,
noise_schedule,
timestep_respacing,
use_kl,
predict_xstart,
rescale_timesteps,
rescale_learned_sigmas,
use_checkpoint,
use_scale_shift_norm,
resblock_updown,
use_fp16,
):
model = sr_create_model(
large_size,
small_size,
num_channels,
num_res_blocks,
learn_sigma=learn_sigma,
class_cond=class_cond,
use_checkpoint=use_checkpoint,
attention_resolutions=attention_resolutions,
num_heads=num_heads,
num_head_channels=num_head_channels,
num_heads_upsample=num_heads_upsample,
use_scale_shift_norm=use_scale_shift_norm,
dropout=dropout,
resblock_updown=resblock_updown,
use_fp16=use_fp16,
)
diffusion = create_gaussian_diffusion(
steps=diffusion_steps,
learn_sigma=learn_sigma,
noise_schedule=noise_schedule,
use_kl=use_kl,
predict_xstart=predict_xstart,
rescale_timesteps=rescale_timesteps,
rescale_learned_sigmas=rescale_learned_sigmas,
timestep_respacing=timestep_respacing,
)
return model, diffusion
def sr_create_model(
large_size,
small_size,
num_channels,
num_res_blocks,
learn_sigma,
class_cond,
use_checkpoint,
attention_resolutions,
num_heads,
num_head_channels,
num_heads_upsample,
use_scale_shift_norm,
dropout,
resblock_updown,
use_fp16,
):
_ = small_size # hack to prevent unused variable
if large_size == 512:
channel_mult = (1, 1, 2, 2, 4, 4)
elif large_size == 256:
channel_mult = (1, 1, 2, 2, 4, 4)
elif large_size == 64:
channel_mult = (1, 2, 3, 4)
else:
raise ValueError(f"unsupported large size: {large_size}")
attention_ds = []
for res in attention_resolutions.split(","):
attention_ds.append(large_size // int(res))
return SuperResModel(
image_size=large_size,
in_channels=3,
model_channels=num_channels,
out_channels=(3 if not learn_sigma else 6),
num_res_blocks=num_res_blocks,
attention_resolutions=tuple(attention_ds),
dropout=dropout,
channel_mult=channel_mult,
num_classes=(NUM_CLASSES if class_cond else None),
use_checkpoint=use_checkpoint,
num_heads=num_heads,
num_head_channels=num_head_channels,
num_heads_upsample=num_heads_upsample,
use_scale_shift_norm=use_scale_shift_norm,
resblock_updown=resblock_updown,
use_fp16=use_fp16,
)
def create_gaussian_diffusion(
*,
steps=1000,
learn_sigma=False,
sigma_small=False,
noise_schedule="linear",
use_kl=False,
predict_xstart=False,
rescale_timesteps=False,
rescale_learned_sigmas=False,
timestep_respacing="",
):
betas = gd.get_named_beta_schedule(noise_schedule, steps)
if use_kl:
loss_type = gd.LossType.RESCALED_KL
elif rescale_learned_sigmas:
loss_type = gd.LossType.RESCALED_MSE
else:
loss_type = gd.LossType.MSE
if not timestep_respacing:
timestep_respacing = [steps]
return SpacedDiffusion(
use_timesteps=space_timesteps(steps, timestep_respacing),
betas=betas,
model_mean_type=(
gd.ModelMeanType.EPSILON if not predict_xstart else gd.ModelMeanType.START_X
),
model_var_type=(
(
gd.ModelVarType.FIXED_LARGE
if not sigma_small
else gd.ModelVarType.FIXED_SMALL
)
if not learn_sigma
else gd.ModelVarType.LEARNED_RANGE
),
loss_type=loss_type,
rescale_timesteps=rescale_timesteps,
)
def add_dict_to_argparser(parser, default_dict):
for k, v in default_dict.items():
v_type = type(v)
if v is None:
v_type = str
elif isinstance(v, bool):
v_type = str2bool
if k == 'image_size':
parser.add_argument(f"--{k}", nargs='+', default=v, type=v_type)
else:
parser.add_argument(f"--{k}", default=v, type=v_type)
def args_to_dict(args, keys):
return {k: getattr(args, k) for k in keys}
def str2bool(v):
"""
https://stackoverflow.com/questions/15008758/parsing-boolean-values-with-argparse
"""
if isinstance(v, bool):
return v
if v.lower() in ("yes", "true", "t", "y", "1"):
return True
elif v.lower() in ("no", "false", "f", "n", "0"):
return False
else:
raise argparse.ArgumentTypeError("boolean value expected")