|
from diffusers import DiffusionPipeline |
|
import torch |
|
import numpy as np |
|
import importlib.util |
|
import sys |
|
from huggingface_hub import hf_hub_download |
|
from safetensors.torch import load_file |
|
import os |
|
from .vae import AutoencoderKL |
|
from .mar import mar_base, mar_large, mar_huge |
|
|
|
|
|
class MARModel(DiffusionPipeline): |
|
|
|
def __init__(self): |
|
super().__init__() |
|
|
|
@torch.no_grad() |
|
def __call__(self, *args, **kwargs): |
|
""" |
|
This method downloads the model and VAE components, |
|
then executes the forward pass based on the user's input. |
|
""" |
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
|
|
|
|
|
|
|
buffer_size = kwargs.get("buffer_size", 64) |
|
diffloss_d = kwargs.get("diffloss_d", 3) |
|
diffloss_w = kwargs.get("diffloss_w", 1024) |
|
num_sampling_steps = kwargs.get("num_sampling_steps", 100) |
|
model_type = kwargs.get("model_type", "mar_base") |
|
|
|
|
|
if model_type == "mar_base": |
|
self.model = mar_base( |
|
buffer_size=buffer_size, |
|
diffloss_d=diffloss_d, |
|
diffloss_w=diffloss_w, |
|
num_sampling_steps=str(num_sampling_steps) |
|
).to(device) |
|
elif model_type == "mar_large": |
|
self.model = mar_large( |
|
buffer_size=buffer_size, |
|
diffloss_d=diffloss_d, |
|
diffloss_w=diffloss_w, |
|
num_sampling_steps=str(num_sampling_steps) |
|
).to(device) |
|
elif model_type == "mar_huge": |
|
self.model = mar_huge( |
|
buffer_size=buffer_size, |
|
diffloss_d=diffloss_d, |
|
diffloss_w=diffloss_w, |
|
num_sampling_steps=str(num_sampling_steps) |
|
).to(device) |
|
|
|
model_checkpoint_path = hf_hub_download( |
|
repo_id=kwargs.get("repo_id", "jadechoghari/mar"), |
|
filename=kwargs.get("model_filename", "checkpoint-last.pth") |
|
) |
|
|
|
state_dict = torch.load(model_checkpoint_path, map_location=device)["model_ema"] |
|
|
|
self.model.load_state_dict(state_dict, strict=False) |
|
self.model.eval() |
|
|
|
|
|
vae_checkpoint_path = hf_hub_download( |
|
repo_id=kwargs.get("repo_id", "jadechoghari/mar"), |
|
filename=kwargs.get("vae_filename", "kl16.ckpt") |
|
) |
|
|
|
vae = AutoencoderKL(embed_dim=16, ch_mult=(1, 1, 2, 2, 4), ckpt_path=vae_checkpoint_path) |
|
vae = vae.to(device).eval() |
|
|
|
|
|
seed = kwargs.get("seed", 0) |
|
torch.manual_seed(seed) |
|
np.random.seed(seed) |
|
|
|
num_ar_steps = kwargs.get("num_ar_steps", 64) |
|
cfg_scale = kwargs.get("cfg_scale", 4) |
|
cfg_schedule = kwargs.get("cfg_schedule", "constant") |
|
temperature = kwargs.get("temperature", 1.0) |
|
class_labels = kwargs.get("class_labels", [207, 360, 388, 113, 355, 980, 323, 979]) |
|
class_labels = torch.Tensor(class_labels).long().to(device) |
|
|
|
|
|
with torch.cuda.amp.autocast(): |
|
sampled_tokens = self.model.sample_tokens( |
|
bsz=len(class_labels), num_iter=num_ar_steps, |
|
cfg=cfg_scale, cfg_schedule=cfg_schedule, |
|
labels=torch.Tensor(class_labels).long().to(device), |
|
temperature=temperature, progress=True |
|
) |
|
|
|
sampled_images = vae.decode(sampled_tokens / 0.2325) |
|
|
|
return sampled_images |
|
|
|
|