Spaces:
Runtime error
Runtime error
import sys | |
import os | |
THIS_DIR = os.path.dirname(os.path.abspath(__file__)) | |
ROOT_DIR = os.path.abspath(os.path.join(THIS_DIR, os.pardir))#os.pardir--get parent directory | |
sys.path.append(ROOT_DIR) | |
import numpy as np; import scipy.linalg | |
# LUL | |
w_shape = [219,219] | |
w_init = np.linalg.qr(np.random.randn(*w_shape))[0].astype(np.float32) | |
#np.random.randn(size)--Generate the matrix according to size | |
#np.linalg.qr--caculate matrix QR decomposition | |
np_p, np_l, np_u = scipy.linalg.lu(w_init) | |
from training.datasets import create_dataset, create_dataloader | |
from models import create_model | |
from training.options.train_options import TrainOptions | |
import torch | |
import pytorch_lightning as pl | |
import numpy as np | |
import pickle, json, yaml | |
import sklearn | |
import argparse | |
import os, glob | |
from pathlib import Path | |
from analysis.visualization.generate_video_from_mats import generate_video_from_mats | |
from analysis.visualization.generate_video_from_expmaps import generate_video_from_expmaps | |
from analysis.visualization.generate_video_from_moglow_pos import generate_video_from_moglow_loc | |
from training.utils import get_latest_checkpoint | |
if __name__ == '__main__': | |
print("Hi") | |
parser = argparse.ArgumentParser(description='Generate with model') | |
parser.add_argument('--data_dir', type=str) | |
parser.add_argument('--seeds', type=str, help='in the format: mod,seq_id;mod,seq_id') | |
parser.add_argument('--seeds_file', type=str, help='file from which to choose a random seed') | |
parser.add_argument('--output_folder', type=str) | |
parser.add_argument('--audio_format', type=str, default="wav") | |
parser.add_argument('--experiment_name', type=str) | |
parser.add_argument('--seq_id', type=str) | |
parser.add_argument('--max_length', type=int, default=-1) | |
parser.add_argument('--no-use_scalers', dest='use_scalers', action='store_false')#action='store_false'--set the Bool Value 0 | |
parser.add_argument('--generate_video', action='store_true')#action='store_true'--set the Bool Value 1--choose to generate video | |
parser.add_argument('--generate_bvh', action='store_true') | |
parser.add_argument('--generate_ground_truth', action='store_true') | |
parser.add_argument('--fps', type=int, default=20) | |
parser.add_argument('--seed',type=str,help='nothing') | |
args = parser.parse_args() | |
data_dir = args.data_dir | |
audio_format = args.audio_format | |
fps = args.fps | |
output_folder = args.output_folder | |
seq_id = args.seq_id | |
seed=args.seed | |
if args.seeds is not None: | |
seeds = {mod:seq for mod,seq in [tuple(x.split(",")) for x in args.seeds.split(";")]} | |
else: | |
seeds = {} | |
if seq_id is None: | |
temp_base_filenames = [x[:-1] for x in open(data_dir + "/base_filenames_test.txt", "r").readlines()]#find the name of the song in the txt created before | |
seq_id = np.random.choice(temp_base_filenames) | |
if args.seeds_file is not None: | |
print("choosing random seed from "+args.seeds_file) | |
temp_base_filenames = [x[:-1] for x in open(args.seeds_file, "r").readlines()] | |
seq_id = np.random.choice(temp_base_filenames) | |
print('seq_id: ',seq_id) | |
#load hparams file | |
default_save_path = "training/experiments/"+args.experiment_name | |
logs_path = default_save_path | |
latest_checkpoint = get_latest_checkpoint(logs_path) | |
print(latest_checkpoint) | |
checkpoint_dir = Path(latest_checkpoint).parent.parent.absolute() | |
# exp_opt = json.loads(open("training/experiments/"+args.experiment_name+"/opt.json","r").read()) | |
exp_opt = yaml.safe_load(open(str(checkpoint_dir)+"/hparams.yaml","r").read())#yaml is a file saving the parameters(type=dictionery) | |
opt = vars(TrainOptions().parse(parse_args=["--model", exp_opt["model"]])) | |
print(opt) | |
opt.update(exp_opt) | |
# opt["cond_concat_dims"] = True | |
# opt["bn_momentum"] = 0.0 | |
opt["batch_size"] = 1 | |
opt["phase"] = "inference" | |
opt["tpu_cores"] = 0 | |
class Struct: | |
def __init__(self, **entries): | |
self.__dict__.update(entries) | |
print(opt) | |
opt = Struct(**opt) | |
input_mods = opt.input_modalities.split(",") | |
output_mods = opt.output_modalities.split(",") | |
output_time_offsets = [int(x) for x in str(opt.output_time_offsets).split(",")] | |
if args.use_scalers: | |
scalers = [x+"_scaler.pkl" for x in output_mods] | |
else: | |
scalers = [] | |
# Load latest trained checkpoint from experiment | |
model = create_model(opt) | |
model = model.load_from_checkpoint(latest_checkpoint, opt=opt) | |
# Load input features (sequences must have been processed previously into features) | |
features = {} | |
for i,mod in enumerate(input_mods): | |
if mod in seeds: | |
feature = np.load(data_dir+"/"+seeds[mod]+"."+mod+".npy") | |
elif (seed is not None and i==0): | |
#feature = np.load(data_dir+"/"+seq_id+"."+mod+".npy") | |
feature=np.load(seed+'.npy') | |
else: | |
feature=np.load(data_dir+'/'+seq_id+'.'+mod+'.npy') | |
if args.max_length != -1: | |
feature = feature[:args.max_length] | |
if model.input_fix_length_types[i] == "single": | |
features["in_"+mod] = np.expand_dims(np.expand_dims(feature,1),1) | |
else: | |
features["in_"+mod] = np.expand_dims(feature,1) | |
# Generate prediction | |
if torch.cuda.is_available(): | |
model.cuda() | |
#import pdb;pdb.set_trace() | |
#import time | |
#start_time = time.time() | |
predicted_mods = model.generate(features, ground_truth=args.generate_ground_truth) | |
#print("--- %s seconds ---" % (time.time() - start_time)) | |
if len(predicted_mods) == 0: | |
print("Sequence too short!") | |
else: | |
# import pdb;pdb.set_trace() | |
for i, mod in enumerate(output_mods): | |
predicted_mod = predicted_mods[i].cpu().numpy() | |
if len(scalers)>0: | |
transform = pickle.load(open(data_dir+"/"+scalers[i], "rb")) | |
predicted_mod = transform.inverse_transform(predicted_mod) | |
print(predicted_mod) | |
predicted_features_file = output_folder+"/"+args.experiment_name+"/predicted_mods/"+seq_id+"."+mod+".generated" | |
np.save(predicted_features_file,predicted_mod) | |
predicted_features_file += ".npy" | |
if args.generate_video: | |
trim_audio = output_time_offsets[i] / fps #converting trim_audio from being in frames (which is more convenient as thats how we specify the output_shift in the models), to seconds | |
print("trim_audio: ",trim_audio) | |
audio_file = data_dir + "/" + seq_id + "."+audio_format | |
#output_folder = output_folder+"/"+args.experiment_name+"/videos/" | |
if mod == "joint_angles_scaled": | |
generate_video_from_mats(predicted_features_file,output_folder,audio_file,trim_audio,fps,plot_mats) | |
elif mod == "expmap_scaled" or mod == "expmap_scaled_20" or mod == "expmap_cr_scaled_20": | |
pipeline_file = f'{data_dir}/motion_{mod}_data_pipe.sav' | |
generate_video_from_expmaps(predicted_features_file,pipeline_file,output_folder,audio_file,trim_audio,args.generate_bvh) | |
elif mod == "position_scaled": | |
control_file = f'{data_dir}/{seq_id}.moglow_control_scaled.npy' | |
data = np.load(predicted_features_file)[:,0,:] | |
control = np.load(control_file) | |
if args.use_scalers: | |
transform = pickle.load(open(data_dir+"/moglow_control_scaled_scaler.pkl", "rb")) | |
control = transform.inverse_transform(control) | |
control = control[int(opt.output_time_offsets.split(",")[0]):] | |
generate_video_from_moglow_loc(data,control,output_folder,seq_id,audio_file,fps,trim_audio) | |
else: | |
print("Warning: mod "+mod+" not supported") | |
# raise NotImplementedError(f'Feature type {feature_type} not implemented') | |
pass | |