Spaces:
Runtime error
Runtime error
import argparse | |
import datetime | |
import logging | |
import inspect | |
import math | |
import os | |
from typing import Dict, Optional, Tuple | |
from omegaconf import OmegaConf | |
import torch | |
import torch.nn.functional as F | |
import torch.utils.checkpoint | |
import diffusers | |
import transformers | |
from accelerate import Accelerator | |
from accelerate.logging import get_logger | |
from accelerate.utils import set_seed | |
from diffusers import AutoencoderKL, DDPMScheduler, DDIMScheduler | |
from diffusers.optimization import get_scheduler | |
from diffusers.utils import check_min_version | |
from diffusers.utils.import_utils import is_xformers_available | |
from tqdm.auto import tqdm | |
from transformers import CLIPTextModel, CLIPTokenizer | |
import sys | |
sys.path.append('FollowYourPose') | |
from followyourpose.models.unet import UNet3DConditionModel | |
from followyourpose.pipelines.pipeline_followyourpose import FollowYourPosePipeline | |
from followyourpose.util import save_videos_grid, ddim_inversion | |
from einops import rearrange | |
check_min_version("0.10.0.dev0") | |
logger = get_logger(__name__, log_level="INFO") | |
def collate_fn(examples): | |
"""Concat a batch of sampled image in dataloader | |
""" | |
batch = { | |
"prompt_ids": torch.cat([example["prompt_ids"] for example in examples], dim=0), | |
"images": torch.stack([example["images"] for example in examples]), | |
} | |
return batch | |
def test( | |
pretrained_model_path: str, | |
output_dir: str, | |
validation_data: Dict, | |
validation_steps: int = 100, | |
train_batch_size: int = 1, | |
gradient_accumulation_steps: int = 1, | |
gradient_checkpointing: bool = True, | |
resume_from_checkpoint: Optional[str] = None, | |
mixed_precision: Optional[str] = "fp16", | |
enable_xformers_memory_efficient_attention: bool = True, | |
seed: Optional[int] = None, | |
skeleton_path: Optional[str] = None, | |
): | |
*_, config = inspect.getargvalues(inspect.currentframe()) | |
accelerator = Accelerator( | |
gradient_accumulation_steps=gradient_accumulation_steps, | |
mixed_precision=mixed_precision, | |
) | |
# Make one log on every process with the configuration for debugging. | |
logging.basicConfig( | |
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s", | |
datefmt="%m/%d/%Y %H:%M:%S", | |
level=logging.INFO, | |
) | |
logger.info(accelerator.state, main_process_only=False) | |
if accelerator.is_local_main_process: | |
transformers.utils.logging.set_verbosity_warning() | |
diffusers.utils.logging.set_verbosity_info() | |
else: | |
transformers.utils.logging.set_verbosity_error() | |
diffusers.utils.logging.set_verbosity_error() | |
# If passed along, set the training seed now. | |
if seed is not None: | |
set_seed(seed) | |
# Handle the output folder creation | |
if accelerator.is_main_process: | |
os.makedirs(output_dir, exist_ok=True) | |
os.makedirs(f"{output_dir}/samples", exist_ok=True) | |
os.makedirs(f"{output_dir}/inv_latents", exist_ok=True) | |
OmegaConf.save(config, os.path.join(output_dir, 'config.yaml')) | |
# Load scheduler, tokenizer and models. | |
noise_scheduler = DDPMScheduler.from_pretrained(pretrained_model_path, subfolder="scheduler") | |
tokenizer = CLIPTokenizer.from_pretrained(pretrained_model_path, subfolder="tokenizer") | |
text_encoder = CLIPTextModel.from_pretrained(pretrained_model_path, subfolder="text_encoder") | |
vae = AutoencoderKL.from_pretrained(pretrained_model_path, subfolder="vae") | |
unet = UNet3DConditionModel.from_pretrained_2d(pretrained_model_path, subfolder="unet") | |
# Freeze vae and text_encoder | |
vae.requires_grad_(False) | |
text_encoder.requires_grad_(False) | |
unet.requires_grad_(False) | |
if enable_xformers_memory_efficient_attention: | |
if is_xformers_available(): | |
unet.enable_xformers_memory_efficient_attention() | |
else: | |
raise ValueError("xformers is not available. Make sure it is installed correctly") | |
if gradient_checkpointing: | |
unet.enable_gradient_checkpointing() | |
# Get the validation pipeline | |
validation_pipeline = FollowYourPosePipeline( | |
vae=vae, text_encoder=text_encoder, tokenizer=tokenizer, unet=unet, | |
scheduler=DDIMScheduler.from_pretrained(pretrained_model_path, subfolder="scheduler") | |
) | |
validation_pipeline.enable_vae_slicing() | |
ddim_inv_scheduler = DDIMScheduler.from_pretrained(pretrained_model_path, subfolder='scheduler') | |
ddim_inv_scheduler.set_timesteps(validation_data.num_inv_steps) | |
unet = accelerator.prepare(unet) | |
# For mixed precision training we cast the text_encoder and vae weights to half-precision | |
# as these models are only used for inference, keeping weights in full precision is not required. | |
weight_dtype = torch.float32 | |
if accelerator.mixed_precision == "fp16": | |
weight_dtype = torch.float16 | |
elif accelerator.mixed_precision == "bf16": | |
weight_dtype = torch.bfloat16 | |
# Move text_encode and vae to gpu and cast to weight_dtype | |
text_encoder.to(accelerator.device, dtype=weight_dtype) | |
vae.to(accelerator.device, dtype=weight_dtype) | |
# We need to recalculate our total training steps as the size of the training dataloader may have changed. | |
# We need to initialize the trackers we use, and also store our configuration. | |
# The trackers initializes automatically on the main process. | |
if accelerator.is_main_process: | |
accelerator.init_trackers("text2video-fine-tune") | |
global_step = 0 | |
first_epoch = 0 | |
# Potentially load in the weights and states from a previous save | |
load_path = None | |
if resume_from_checkpoint: | |
if resume_from_checkpoint != "latest": | |
load_path = resume_from_checkpoint | |
output_dir = os.path.abspath(os.path.join(resume_from_checkpoint, "..")) | |
accelerator.print(f"load from checkpoint {load_path}") | |
accelerator.load_state(load_path) | |
global_step = int(load_path.split("-")[-1]) | |
if accelerator.is_main_process: | |
samples = [] | |
generator = torch.Generator(device=accelerator.device) | |
generator.manual_seed(seed) | |
ddim_inv_latent = None | |
from datetime import datetime | |
now = str(datetime.now()) | |
print(now) | |
for idx, prompt in enumerate(validation_data.prompts): | |
sample = validation_pipeline(prompt, generator=generator, latents=ddim_inv_latent, | |
skeleton_path=skeleton_path, | |
**validation_data).videos | |
save_path = f"{output_dir}/inference/sample-{global_step}-{str(seed)}-{now}/{prompt}.gif" | |
save_videos_grid(sample, save_path, fps=4) | |
# samples.append(sample) | |
# samples = torch.concat(samples) | |
# save_path = f"{output_dir}/inference/sample-{global_step}-{str(seed)}-{now}.mp4" | |
# save_videos_grid(samples, save_path) | |
logger.info(f"Saved samples to {save_path}") | |
return save_path | |