Spaces:
Runtime error
Runtime error
File size: 8,059 Bytes
2d5fdd1 5898e2f 2d5fdd1 5898e2f 2d5fdd1 5898e2f 2d5fdd1 5898e2f 2d5fdd1 5898e2f 2d5fdd1 5898e2f 2d5fdd1 5898e2f 2d5fdd1 5898e2f 2d5fdd1 5898e2f 2d5fdd1 5898e2f 2d5fdd1 5898e2f 2d5fdd1 5898e2f 2d5fdd1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
import sys
import os
THIS_DIR = os.path.dirname(os.path.abspath(__file__))
ROOT_DIR = os.path.abspath(os.path.join(THIS_DIR, os.pardir))#os.pardir--get parent directory
sys.path.append(ROOT_DIR)
import numpy as np; import scipy.linalg
# LUL
w_shape = [219,219]
w_init = np.linalg.qr(np.random.randn(*w_shape))[0].astype(np.float32)
#np.random.randn(size)--Generate the matrix according to size
#np.linalg.qr--caculate matrix QR decomposition
np_p, np_l, np_u = scipy.linalg.lu(w_init)
from training.datasets import create_dataset, create_dataloader
from models import create_model
from training.options.train_options import TrainOptions
import torch
import pytorch_lightning as pl
import numpy as np
import pickle, json, yaml
import sklearn
import argparse
import os, glob
from pathlib import Path
from analysis.visualization.generate_video_from_mats import generate_video_from_mats
from analysis.visualization.generate_video_from_expmaps import generate_video_from_expmaps
from analysis.visualization.generate_video_from_moglow_pos import generate_video_from_moglow_loc
from training.utils import get_latest_checkpoint
if __name__ == '__main__':
print("Hi")
parser = argparse.ArgumentParser(description='Generate with model')
parser.add_argument('--data_dir', type=str)
parser.add_argument('--seeds', type=str, help='in the format: mod,seq_id;mod,seq_id')
parser.add_argument('--seeds_file', type=str, help='file from which to choose a random seed')
parser.add_argument('--output_folder', type=str)
parser.add_argument('--audio_format', type=str, default="wav")
parser.add_argument('--experiment_name', type=str)
parser.add_argument('--seq_id', type=str)
parser.add_argument('--max_length', type=int, default=-1)
parser.add_argument('--no-use_scalers', dest='use_scalers', action='store_false')#action='store_false'--set the Bool Value 0
parser.add_argument('--generate_video', action='store_true')#action='store_true'--set the Bool Value 1--choose to generate video
parser.add_argument('--generate_bvh', action='store_true')
parser.add_argument('--generate_ground_truth', action='store_true')
parser.add_argument('--fps', type=int, default=20)
parser.add_argument('--seed',type=str,help='nothing')
args = parser.parse_args()
data_dir = args.data_dir
audio_format = args.audio_format
fps = args.fps
output_folder = args.output_folder
seq_id = args.seq_id
seed=args.seed
if args.seeds is not None:
seeds = {mod:seq for mod,seq in [tuple(x.split(",")) for x in args.seeds.split(";")]}
else:
seeds = {}
if seq_id is None:
temp_base_filenames = [x[:-1] for x in open(data_dir + "/base_filenames_test.txt", "r").readlines()]#find the name of the song in the txt created before
seq_id = np.random.choice(temp_base_filenames)
if args.seeds_file is not None:
print("choosing random seed from "+args.seeds_file)
temp_base_filenames = [x[:-1] for x in open(args.seeds_file, "r").readlines()]
seq_id = np.random.choice(temp_base_filenames)
print('seq_id: ',seq_id)
#load hparams file
default_save_path = "training/experiments/"+args.experiment_name
logs_path = default_save_path
latest_checkpoint = get_latest_checkpoint(logs_path)
print(latest_checkpoint)
checkpoint_dir = Path(latest_checkpoint).parent.parent.absolute()
# exp_opt = json.loads(open("training/experiments/"+args.experiment_name+"/opt.json","r").read())
exp_opt = yaml.safe_load(open(str(checkpoint_dir)+"/hparams.yaml","r").read())#yaml is a file saving the parameters(type=dictionery)
opt = vars(TrainOptions().parse(parse_args=["--model", exp_opt["model"]]))
print(opt)
opt.update(exp_opt)
# opt["cond_concat_dims"] = True
# opt["bn_momentum"] = 0.0
opt["batch_size"] = 1
opt["phase"] = "inference"
opt["tpu_cores"] = 0
class Struct:
def __init__(self, **entries):
self.__dict__.update(entries)
print(opt)
opt = Struct(**opt)
input_mods = opt.input_modalities.split(",")
output_mods = opt.output_modalities.split(",")
output_time_offsets = [int(x) for x in str(opt.output_time_offsets).split(",")]
if args.use_scalers:
scalers = [x+"_scaler.pkl" for x in output_mods]
else:
scalers = []
# Load latest trained checkpoint from experiment
model = create_model(opt)
model = model.load_from_checkpoint(latest_checkpoint, opt=opt)
# Load input features (sequences must have been processed previously into features)
features = {}
for i,mod in enumerate(input_mods):
if mod in seeds:
feature = np.load(data_dir+"/"+seeds[mod]+"."+mod+".npy")
elif (seed is not None and i==0):
#feature = np.load(data_dir+"/"+seq_id+"."+mod+".npy")
feature=np.load(seed+'.npy')
else:
feature=np.load(data_dir+'/'+seq_id+'.'+mod+'.npy')
if args.max_length != -1:
feature = feature[:args.max_length]
if model.input_fix_length_types[i] == "single":
features["in_"+mod] = np.expand_dims(np.expand_dims(feature,1),1)
else:
features["in_"+mod] = np.expand_dims(feature,1)
# Generate prediction
if torch.cuda.is_available():
model.cuda()
#import pdb;pdb.set_trace()
#import time
#start_time = time.time()
predicted_mods = model.generate(features, ground_truth=args.generate_ground_truth)
#print("--- %s seconds ---" % (time.time() - start_time))
if len(predicted_mods) == 0:
print("Sequence too short!")
else:
# import pdb;pdb.set_trace()
for i, mod in enumerate(output_mods):
predicted_mod = predicted_mods[i].cpu().numpy()
if len(scalers)>0:
transform = pickle.load(open(data_dir+"/"+scalers[i], "rb"))
predicted_mod = transform.inverse_transform(predicted_mod)
print(predicted_mod)
predicted_features_file = output_folder+"/"+args.experiment_name+"/predicted_mods/"+seq_id+"."+mod+".generated"
np.save(predicted_features_file,predicted_mod)
predicted_features_file += ".npy"
if args.generate_video:
trim_audio = output_time_offsets[i] / fps #converting trim_audio from being in frames (which is more convenient as thats how we specify the output_shift in the models), to seconds
print("trim_audio: ",trim_audio)
audio_file = data_dir + "/" + seq_id + "."+audio_format
#output_folder = output_folder+"/"+args.experiment_name+"/videos/"
if mod == "joint_angles_scaled":
generate_video_from_mats(predicted_features_file,output_folder,audio_file,trim_audio,fps,plot_mats)
elif mod == "expmap_scaled" or mod == "expmap_scaled_20" or mod == "expmap_cr_scaled_20":
pipeline_file = f'{data_dir}/motion_{mod}_data_pipe.sav'
generate_video_from_expmaps(predicted_features_file,pipeline_file,output_folder,audio_file,trim_audio,args.generate_bvh)
elif mod == "position_scaled":
control_file = f'{data_dir}/{seq_id}.moglow_control_scaled.npy'
data = np.load(predicted_features_file)[:,0,:]
control = np.load(control_file)
if args.use_scalers:
transform = pickle.load(open(data_dir+"/moglow_control_scaled_scaler.pkl", "rb"))
control = transform.inverse_transform(control)
control = control[int(opt.output_time_offsets.split(",")[0]):]
generate_video_from_moglow_loc(data,control,output_folder,seq_id,audio_file,fps,trim_audio)
else:
print("Warning: mod "+mod+" not supported")
# raise NotImplementedError(f'Feature type {feature_type} not implemented')
pass
|