import argparse import inspect from . import gaussian_diffusion as gd from .respace import SpacedDiffusion, space_timesteps from .unet import SuperResModel, UNetModel, EncoderUNetModel NUM_CLASSES = 1000 def diffusion_defaults(): """ Defaults for image and classifier training. """ return dict( learn_sigma=False, diffusion_steps=1000, noise_schedule="linear", timestep_respacing="", use_kl=False, predict_xstart=False, rescale_timesteps=False, rescale_learned_sigmas=False, ) def classifier_defaults(): """ Defaults for classifier models. """ return dict( image_size=64, classifier_use_fp16=False, classifier_width=128, classifier_depth=2, classifier_attention_resolutions="32,16,8", # 16 classifier_use_scale_shift_norm=True, # False classifier_resblock_updown=True, # False classifier_pool="attention", ) def model_and_diffusion_defaults(): """ Defaults for image training. """ res = dict( image_size=64, num_channels=128, num_res_blocks=2, num_heads=4, num_heads_upsample=-1, num_head_channels=-1, attention_resolutions="16,8", channel_mult="", dropout=0.0, class_cond=False, use_checkpoint=False, use_scale_shift_norm=True, resblock_updown=False, use_fp16=False, use_new_attention_order=False, ) res.update(diffusion_defaults()) return res def classifier_and_diffusion_defaults(): res = classifier_defaults() res.update(diffusion_defaults()) return res def create_model_and_diffusion( image_size, class_cond, learn_sigma, num_channels, num_res_blocks, channel_mult, num_heads, num_head_channels, num_heads_upsample, attention_resolutions, dropout, diffusion_steps, noise_schedule, timestep_respacing, use_kl, predict_xstart, rescale_timesteps, rescale_learned_sigmas, use_checkpoint, use_scale_shift_norm, resblock_updown, use_fp16, use_new_attention_order, ): model = create_model( image_size, num_channels, num_res_blocks, channel_mult=channel_mult, learn_sigma=learn_sigma, class_cond=class_cond, use_checkpoint=use_checkpoint, attention_resolutions=attention_resolutions, num_heads=num_heads, num_head_channels=num_head_channels, num_heads_upsample=num_heads_upsample, use_scale_shift_norm=use_scale_shift_norm, dropout=dropout, resblock_updown=resblock_updown, use_fp16=use_fp16, use_new_attention_order=use_new_attention_order, ) diffusion = create_gaussian_diffusion( steps=diffusion_steps, learn_sigma=learn_sigma, noise_schedule=noise_schedule, use_kl=use_kl, predict_xstart=predict_xstart, rescale_timesteps=rescale_timesteps, rescale_learned_sigmas=rescale_learned_sigmas, timestep_respacing=timestep_respacing, ) return model, diffusion def create_model( image_size, num_channels, num_res_blocks, channel_mult="", learn_sigma=False, class_cond=False, use_checkpoint=False, attention_resolutions="16", num_heads=1, num_head_channels=-1, num_heads_upsample=-1, use_scale_shift_norm=False, dropout=0, resblock_updown=False, use_fp16=False, use_new_attention_order=False, ): if channel_mult == "": if image_size == 512: channel_mult = (0.5, 1, 1, 2, 2, 4, 4) elif image_size == 256: channel_mult = (1, 1, 2, 2, 4, 4) elif image_size == 128: channel_mult = (1, 1, 2, 3, 4) elif image_size == 64: channel_mult = (1, 2, 3, 4) else: raise ValueError(f"unsupported image size: {image_size}") else: channel_mult = tuple(int(ch_mult) for ch_mult in channel_mult.split(",")) attention_ds = [] for res in attention_resolutions.split(","): attention_ds.append(image_size // int(res)) return UNetModel( image_size=image_size, in_channels=3, model_channels=num_channels, out_channels=(3 if not learn_sigma else 6), num_res_blocks=num_res_blocks, attention_resolutions=tuple(attention_ds), dropout=dropout, channel_mult=channel_mult, num_classes=(NUM_CLASSES if class_cond else None), use_checkpoint=use_checkpoint, use_fp16=use_fp16, num_heads=num_heads, num_head_channels=num_head_channels, num_heads_upsample=num_heads_upsample, use_scale_shift_norm=use_scale_shift_norm, resblock_updown=resblock_updown, use_new_attention_order=use_new_attention_order, ) def create_classifier_and_diffusion( image_size, classifier_use_fp16, classifier_width, classifier_depth, classifier_attention_resolutions, classifier_use_scale_shift_norm, classifier_resblock_updown, classifier_pool, learn_sigma, diffusion_steps, noise_schedule, timestep_respacing, use_kl, predict_xstart, rescale_timesteps, rescale_learned_sigmas, ): classifier = create_classifier( image_size, classifier_use_fp16, classifier_width, classifier_depth, classifier_attention_resolutions, classifier_use_scale_shift_norm, classifier_resblock_updown, classifier_pool, ) diffusion = create_gaussian_diffusion( steps=diffusion_steps, learn_sigma=learn_sigma, noise_schedule=noise_schedule, use_kl=use_kl, predict_xstart=predict_xstart, rescale_timesteps=rescale_timesteps, rescale_learned_sigmas=rescale_learned_sigmas, timestep_respacing=timestep_respacing, ) return classifier, diffusion def create_classifier( image_size, classifier_use_fp16, classifier_width, classifier_depth, classifier_attention_resolutions, classifier_use_scale_shift_norm, classifier_resblock_updown, classifier_pool, ): if image_size == 512: channel_mult = (0.5, 1, 1, 2, 2, 4, 4) elif image_size == 256: channel_mult = (1, 1, 2, 2, 4, 4) elif image_size == 128: channel_mult = (1, 1, 2, 3, 4) elif image_size == 64: channel_mult = (1, 2, 3, 4) else: raise ValueError(f"unsupported image size: {image_size}") attention_ds = [] for res in classifier_attention_resolutions.split(","): attention_ds.append(image_size // int(res)) return EncoderUNetModel( image_size=image_size, in_channels=3, model_channels=classifier_width, out_channels=1000, num_res_blocks=classifier_depth, attention_resolutions=tuple(attention_ds), channel_mult=channel_mult, use_fp16=classifier_use_fp16, num_head_channels=64, use_scale_shift_norm=classifier_use_scale_shift_norm, resblock_updown=classifier_resblock_updown, pool=classifier_pool, ) def sr_model_and_diffusion_defaults(): res = model_and_diffusion_defaults() res["large_size"] = 256 res["small_size"] = 64 arg_names = inspect.getfullargspec(sr_create_model_and_diffusion)[0] for k in res.copy().keys(): if k not in arg_names: del res[k] return res def sr_create_model_and_diffusion( large_size, small_size, class_cond, learn_sigma, num_channels, num_res_blocks, num_heads, num_head_channels, num_heads_upsample, attention_resolutions, dropout, diffusion_steps, noise_schedule, timestep_respacing, use_kl, predict_xstart, rescale_timesteps, rescale_learned_sigmas, use_checkpoint, use_scale_shift_norm, resblock_updown, use_fp16, ): model = sr_create_model( large_size, small_size, num_channels, num_res_blocks, learn_sigma=learn_sigma, class_cond=class_cond, use_checkpoint=use_checkpoint, attention_resolutions=attention_resolutions, num_heads=num_heads, num_head_channels=num_head_channels, num_heads_upsample=num_heads_upsample, use_scale_shift_norm=use_scale_shift_norm, dropout=dropout, resblock_updown=resblock_updown, use_fp16=use_fp16, ) diffusion = create_gaussian_diffusion( steps=diffusion_steps, learn_sigma=learn_sigma, noise_schedule=noise_schedule, use_kl=use_kl, predict_xstart=predict_xstart, rescale_timesteps=rescale_timesteps, rescale_learned_sigmas=rescale_learned_sigmas, timestep_respacing=timestep_respacing, ) return model, diffusion def sr_create_model( large_size, small_size, num_channels, num_res_blocks, learn_sigma, class_cond, use_checkpoint, attention_resolutions, num_heads, num_head_channels, num_heads_upsample, use_scale_shift_norm, dropout, resblock_updown, use_fp16, ): _ = small_size # hack to prevent unused variable if large_size == 512: channel_mult = (1, 1, 2, 2, 4, 4) elif large_size == 256: channel_mult = (1, 1, 2, 2, 4, 4) elif large_size == 64: channel_mult = (1, 2, 3, 4) else: raise ValueError(f"unsupported large size: {large_size}") attention_ds = [] for res in attention_resolutions.split(","): attention_ds.append(large_size // int(res)) return SuperResModel( image_size=large_size, in_channels=3, model_channels=num_channels, out_channels=(3 if not learn_sigma else 6), num_res_blocks=num_res_blocks, attention_resolutions=tuple(attention_ds), dropout=dropout, channel_mult=channel_mult, num_classes=(NUM_CLASSES if class_cond else None), use_checkpoint=use_checkpoint, num_heads=num_heads, num_head_channels=num_head_channels, num_heads_upsample=num_heads_upsample, use_scale_shift_norm=use_scale_shift_norm, resblock_updown=resblock_updown, use_fp16=use_fp16, ) def create_gaussian_diffusion( *, steps=1000, learn_sigma=False, sigma_small=False, noise_schedule="linear", use_kl=False, predict_xstart=False, rescale_timesteps=False, rescale_learned_sigmas=False, timestep_respacing="", ): betas = gd.get_named_beta_schedule(noise_schedule, steps) if use_kl: loss_type = gd.LossType.RESCALED_KL elif rescale_learned_sigmas: loss_type = gd.LossType.RESCALED_MSE else: loss_type = gd.LossType.MSE if not timestep_respacing: timestep_respacing = [steps] return SpacedDiffusion( use_timesteps=space_timesteps(steps, timestep_respacing), betas=betas, model_mean_type=( gd.ModelMeanType.EPSILON if not predict_xstart else gd.ModelMeanType.START_X ), model_var_type=( ( gd.ModelVarType.FIXED_LARGE if not sigma_small else gd.ModelVarType.FIXED_SMALL ) if not learn_sigma else gd.ModelVarType.LEARNED_RANGE ), loss_type=loss_type, rescale_timesteps=rescale_timesteps, ) def add_dict_to_argparser(parser, default_dict): for k, v in default_dict.items(): v_type = type(v) if v is None: v_type = str elif isinstance(v, bool): v_type = str2bool parser.add_argument(f"--{k}", default=v, type=v_type) def args_to_dict(args, keys): return {k: getattr(args, k) for k in keys} def str2bool(v): """ https://stackoverflow.com/questions/15008758/parsing-boolean-values-with-argparse """ if isinstance(v, bool): return v if v.lower() in ("yes", "true", "t", "y", "1"): return True elif v.lower() in ("no", "false", "f", "n", "0"): return False else: raise argparse.ArgumentTypeError("boolean value expected")