genrl / demo /t2v.py
mazpie's picture
Update demo/t2v.py
ba39165 verified
raw
history blame
5.21 kB
from pathlib import Path
import os
import sys
sys.path.append(str(Path(os.path.abspath(''))))
import torch
import numpy as np
from tools.genrl_utils import ViCLIPGlobalInstance
import time
import torchvision
from huggingface_hub import hf_hub_download
import spaces
# IMPORT HF_TOKEN
hf_token = os.environ['HF_TOKEN']
def save_videos(batch_tensors, savedir, filenames, fps=10):
# b,samples,c,t,h,w
n_samples = batch_tensors.shape[1]
for idx, vid_tensor in enumerate(batch_tensors):
video = vid_tensor.detach().cpu()
video = torch.clamp(video.float(), 0., 1.)
video = video.permute(1, 0, 2, 3, 4) # t,n,c,h,w
frame_grids = [torchvision.utils.make_grid(framesheet, nrow=int(n_samples)) for framesheet in video] #[3, 1*h, n*w]
grid = torch.stack(frame_grids, dim=0) # stack in temporal dim [t, 3, n*h, w]
grid = (grid * 255).to(torch.uint8).permute(0, 2, 3, 1)
savepath = os.path.join(savedir, f"{filenames[idx]}.mp4")
torchvision.io.write_video(savepath, grid, fps=fps, video_codec='h264', options={'crf': '10'})
class Text2Video():
def __init__(self,result_dir='./tmp/',gpu_num=1) -> None:
model_folder = str(Path(os.path.abspath('')) / 'models')
model_filename = 'genrl_stickman_500k_2.pt'
if not os.path.isfile(os.path.join(model_folder, model_filename)):
self.download_model(model_folder, model_filename)
if not os.path.isfile(os.path.join(model_folder, 'InternVideo2-stage2_1b-224p-f4.pt')):
self.download_internvideo2(model_folder)
self.agent = torch.load(os.path.join(model_folder, model_filename),map_location='cpu')
model_name = 'internvideo2'
# Get ViCLIP
viclip_global_instance = ViCLIPGlobalInstance(model_name)
if not viclip_global_instance._instantiated:
print("Instantiating InternVideo2")
viclip_global_instance.instantiate(device='cpu')
self.clip = viclip_global_instance.viclip
self.tokenizer = viclip_global_instance.viclip_tokenizer
self.result_dir = result_dir
if not os.path.exists(self.result_dir):
os.mkdir(self.result_dir)
@spaces.GPU
def get_prompt(self, prompt, duration):
torch.cuda.empty_cache()
self.agent.to('cuda')
self.clip.to('cuda')
print('start:', prompt, time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time())))
start = time.time()
prompt_str = prompt.replace("/", "_slash_") if "/" in prompt else prompt
prompt_str = prompt_str.replace(" ", "_") if " " in prompt else prompt_str
labels_list = [prompt_str]
with torch.no_grad():
wm = world_model = self.agent.wm
connector = self.agent.wm.connector
decoder = world_model.heads['decoder']
n_frames = connector.n_frames
# Get text(video) embed
text_feat = []
for text in labels_list:
with torch.no_grad():
text_feat.append(self.clip.get_txt_feat(text,))
text_feat = torch.stack(text_feat, dim=0).to(self.clip.device)
video_embed = text_feat
B = video_embed.shape[0]
T = 1
# Get actions
video_embed = video_embed.repeat(1, duration, 1)
with torch.no_grad():
# Imagine
prior = wm.connector.video_imagine(video_embed, None, sample=False, reset_every_n_frames=False, denoise=True)
# Decode
prior_recon = decoder(wm.decoder_input_fn(prior))['observation'].mean + 0.5
save_videos(prior_recon.unsqueeze(0), self.result_dir, filenames=[prompt_str], fps=15)
print(f"Saved in {prompt_str}.mp4. Time used: {(time.time() - start):.2f} seconds")
# Offload GPU
self.agent.to('cpu')
self.clip.to('cpu')
return os.path.join(self.result_dir, f"{prompt_str}.mp4")
def download_model(self, model_folder, model_filename):
REPO_ID = 'mazpie/genrl_models'
filename_list = [model_filename]
if not os.path.exists(model_folder):
os.makedirs(model_folder)
for filename in filename_list:
local_file = os.path.join(model_folder, filename)
if not os.path.exists(local_file):
hf_hub_download(repo_id=REPO_ID, filename=filename, local_dir=model_folder, local_dir_use_symlinks=False)
def download_internvideo2(self, model_folder):
REPO_ID = 'OpenGVLab/InternVideo2-Stage2_1B-224p-f4'
filename_list = ['InternVideo2-stage2_1b-224p-f4.pt']
if not os.path.exists(model_folder):
os.makedirs(model_folder)
for filename in filename_list:
local_file = os.path.join(model_folder, filename)
if not os.path.exists(local_file):
hf_hub_download(repo_id=REPO_ID, filename=filename, local_dir=model_folder, local_dir_use_symlinks=False, token=hf_token)
if __name__ == '__main__':
t2v = Text2Video()
video_path = t2v.get_prompt('a black swan swims on the pond', 8)
print('done', video_path)