Open-Sora-Plan-v1.1.0 / examples /rec_imvi_vae.py
LinB203
update
bab971b
raw
history blame contribute delete
No virus
5.86 kB
import math
import random
import argparse
from typing import Optional
import cv2
import numpy as np
import numpy.typing as npt
import torch
from PIL import Image
from decord import VideoReader, cpu
from torch.nn import functional as F
from pytorchvideo.transforms import ShortSideScale
from torchvision.transforms import Lambda, Compose
import sys
sys.path.append(".")
from opensora.models.ae import getae_wrapper
from opensora.dataset.transform import CenterCropVideo, resize
from opensora.models.ae.videobase import CausalVAEModel
def array_to_video(image_array: npt.NDArray, fps: float = 30.0, output_file: str = 'output_video.mp4') -> None:
height, width, channels = image_array[0].shape
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
video_writer = cv2.VideoWriter(output_file, fourcc, float(fps), (width, height))
for image in image_array:
image_rgb = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
video_writer.write(image_rgb)
video_writer.release()
def custom_to_video(x: torch.Tensor, fps: float = 2.0, output_file: str = 'output_video.mp4') -> None:
x = x.detach().cpu()
x = torch.clamp(x, -1, 1)
x = (x + 1) / 2
x = x.permute(0, 2, 3, 1).numpy()
x = (255 * x).astype(np.uint8)
array_to_video(x, fps=fps, output_file=output_file)
return
def read_video(video_path: str, num_frames: int, sample_rate: int) -> torch.Tensor:
decord_vr = VideoReader(video_path, ctx=cpu(0))
total_frames = len(decord_vr)
sample_frames_len = sample_rate * num_frames
if total_frames > sample_frames_len:
s = random.randint(0, total_frames - sample_frames_len - 1)
s = 0
e = s + sample_frames_len
num_frames = num_frames
else:
s = 0
e = total_frames
num_frames = int(total_frames / sample_frames_len * num_frames)
print(f'sample_frames_len {sample_frames_len}, only can sample {num_frames * sample_rate}', video_path,
total_frames)
frame_id_list = np.linspace(s, e - 1, num_frames, dtype=int)
video_data = decord_vr.get_batch(frame_id_list).asnumpy()
video_data = torch.from_numpy(video_data)
video_data = video_data.permute(3, 0, 1, 2) # (T, H, W, C) -> (C, T, H, W)
return video_data
class ResizeVideo:
def __init__(
self,
size,
interpolation_mode="bilinear",
):
self.size = size
self.interpolation_mode = interpolation_mode
def __call__(self, clip):
_, _, h, w = clip.shape
if w < h:
new_h = int(math.floor((float(h) / w) * self.size))
new_w = self.size
else:
new_h = self.size
new_w = int(math.floor((float(w) / h) * self.size))
return torch.nn.functional.interpolate(
clip, size=(new_h, new_w), mode=self.interpolation_mode, align_corners=False, antialias=True
)
def __repr__(self) -> str:
return f"{self.__class__.__name__}(size={self.size}, interpolation_mode={self.interpolation_mode}"
def preprocess(video_data: torch.Tensor, short_size: int = 128, crop_size: Optional[int] = None) -> torch.Tensor:
transform = Compose(
[
Lambda(lambda x: ((x / 255.0) * 2 - 1)),
ResizeVideo(size=short_size),
CenterCropVideo(crop_size) if crop_size is not None else Lambda(lambda x: x),
]
)
video_outputs = transform(video_data)
video_outputs = torch.unsqueeze(video_outputs, 0)
return video_outputs
def main(args: argparse.Namespace):
device = args.device
kwarg = {}
# vae = getae_wrapper(args.ae)(args.model_path, subfolder="vae", cache_dir='cache_dir', **kwarg).to(device)
vae = getae_wrapper(args.ae)(args.ae_path, **kwarg).to(device)
if args.enable_tiling:
vae.vae.enable_tiling()
vae.vae.tile_overlap_factor = args.tile_overlap_factor
vae.eval()
vae = vae.to(device)
vae = vae.half()
with torch.no_grad():
x_vae = preprocess(read_video(args.video_path, args.num_frames, args.sample_rate), args.resolution,
args.crop_size)
x_vae = x_vae.to(device, dtype=torch.float16) # b c t h w
# from tqdm import tqdm
# for i in tqdm(range(10000000)):
latents = vae.encode(x_vae)
latents = latents.to(torch.float16)
video_recon = vae.decode(latents) # b t c h w
if video_recon.shape[2] == 1:
x = video_recon[0, 0, :, :, :]
x = x.squeeze()
x = x.detach().cpu().numpy()
x = np.clip(x, -1, 1)
x = (x + 1) / 2
x = (255 * x).astype(np.uint8)
x = x.transpose(1, 2, 0)
image = Image.fromarray(x)
image.save(args.rec_path.replace('mp4', 'jpg'))
else:
custom_to_video(video_recon[0], fps=args.fps, output_file=args.rec_path)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--video_path', type=str, default='')
parser.add_argument('--rec_path', type=str, default='')
parser.add_argument('--ae', type=str, default='')
parser.add_argument('--ae_path', type=str, default='')
parser.add_argument('--model_path', type=str, default='results/pretrained')
parser.add_argument('--fps', type=int, default=30)
parser.add_argument('--resolution', type=int, default=336)
parser.add_argument('--crop_size', type=int, default=None)
parser.add_argument('--num_frames', type=int, default=100)
parser.add_argument('--sample_rate', type=int, default=1)
parser.add_argument('--device', type=str, default="cuda")
parser.add_argument('--tile_overlap_factor', type=float, default=0.25)
parser.add_argument('--enable_tiling', action='store_true')
parser.add_argument('--enable_time_chunk', action='store_true')
args = parser.parse_args()
main(args)