Spaces:
Runtime error
Runtime error
| import numpy as np | |
| # import librosa | |
| from pathlib import Path | |
| import json | |
| import os.path | |
| import sys | |
| import argparse | |
| import pickle | |
| import torch | |
| THIS_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| ROOT_DIR = os.path.abspath(os.path.join(THIS_DIR, os.pardir)) | |
| DATA_DIR = os.path.join(ROOT_DIR, 'data') | |
| sys.path.append(ROOT_DIR) | |
| from utils import distribute_tasks | |
| from analysis.pymo.parsers import BVHParser | |
| from analysis.pymo.data import Joint, MocapData | |
| from analysis.pymo.preprocessing import * | |
| from sklearn.pipeline import Pipeline | |
| import joblib as jl | |
| parser = argparse.ArgumentParser(description="Preprocess motion data") | |
| parser.add_argument("data_path", type=str, help="Directory contining Beat Saber level folders") | |
| parser.add_argument("--param", type=str, default="expmap", help="expmap, position") | |
| parser.add_argument("--replace_existing", action="store_true") | |
| parser.add_argument("--do_mirror", action="store_true", help="whether to augment the data with mirrored motion") | |
| parser.add_argument("--fps", type=int, default=60) | |
| args = parser.parse_args() | |
| # makes arugments into global variables of the same name, used later in the code | |
| globals().update(vars(args)) | |
| data_path = Path(data_path) | |
| ## distributing tasks accross nodes ## | |
| from mpi4py import MPI | |
| comm = MPI.COMM_WORLD | |
| rank = comm.Get_rank() | |
| size = comm.Get_size() | |
| print(rank) | |
| p = BVHParser() | |
| if do_mirror: | |
| data_pipe = Pipeline([ | |
| ('dwnsampl', DownSampler(tgt_fps=fps, keep_all=False)), | |
| ('mir', Mirror(axis='X', append=True)), | |
| ('root', RootTransformer('pos_rot_deltas')), | |
| # ('jtsel', JointSelector(['Spine', 'Spine1', 'Spine2', 'Neck', 'Head', 'RightShoulder', 'RightArm', 'RightForeArm', 'RightHand', 'LeftShoulder', 'LeftArm', 'LeftForeArm', 'LeftHand', 'RightUpLeg', 'RightLeg', 'RightFoot', 'RightToeBase', 'LeftUpLeg', 'LeftLeg', 'LeftFoot', 'LeftToeBase'], include_root=True)), | |
| ('jtsel', JointSelector(['Spine', 'Spine1', 'Neck', 'Head', 'RightShoulder', 'RightArm', 'RightForeArm', 'RightHand', 'LeftShoulder', 'LeftArm', 'LeftForeArm', 'LeftHand', 'RightUpLeg', 'RightLeg', 'RightFoot', 'RightToeBase', 'LeftUpLeg', 'LeftLeg', 'LeftFoot', 'LeftToeBase'], include_root=True)), | |
| (param, MocapParameterizer(param)), | |
| ('cnst', ConstantsRemover(only_cols=["Hips_Xposition", "Hips_Zposition"])), | |
| ('np', Numpyfier()) | |
| ]) | |
| else: | |
| data_pipe = Pipeline([ | |
| ('dwnsampl', DownSampler(tgt_fps=fps, keep_all=False)), | |
| ('root', RootTransformer('pos_rot_deltas')), | |
| # ('mir', Mirror(axis='X', append=True)), | |
| # ('jtsel', JointSelector(['Spine', 'Spine1', 'Spine2', 'Neck', 'Head', 'RightShoulder', 'RightArm', 'RightForeArm', 'RightHand', 'LeftShoulder', 'LeftArm', 'LeftForeArm', 'LeftHand', 'RightUpLeg', 'RightLeg', 'RightFoot', 'RightToeBase', 'LeftUpLeg', 'LeftLeg', 'LeftFoot', 'LeftToeBase'], include_root=True)), | |
| ('jtsel', JointSelector(['Spine', 'Spine1', 'Neck', 'Head', 'RightShoulder', 'RightArm', 'RightForeArm', 'RightHand', 'LeftShoulder', 'LeftArm', 'LeftForeArm', 'LeftHand', 'RightUpLeg', 'RightLeg', 'RightFoot', 'RightToeBase', 'LeftUpLeg', 'LeftLeg', 'LeftFoot', 'LeftToeBase'], include_root=True)), | |
| (param, MocapParameterizer(param)), | |
| ('cnst', ConstantsRemover(only_cols=["Hips_Xposition", "Hips_Zposition"])), | |
| ('np', Numpyfier()) | |
| ]) | |
| def extract_joint_angles(files): | |
| if len(files)>0: | |
| data_all = list() | |
| for f in files: | |
| data_all.append(p.parse(f)) | |
| out_data = data_pipe.fit_transform(data_all) | |
| if do_mirror: | |
| # NOTE: the datapipe will append the mirrored files to the end | |
| assert len(out_data) == 2*len(files) | |
| else: | |
| assert len(out_data) == len(files) | |
| if rank == 0: | |
| jl.dump(data_pipe, os.path.join(data_path, 'motion_'+param+'_data_pipe.sav')) | |
| fi=0 | |
| if do_mirror: | |
| for f in files: | |
| features_file = f + "_"+param+".npy" | |
| if replace_existing or not os.path.isfile(features_file): | |
| np.save(features_file, out_data[fi]) | |
| features_file_mirror = f[:-4]+"_mirrored" + ".bvh_"+param+".npy" | |
| if replace_existing or not os.path.isfile(features_file_mirror): | |
| np.save(features_file_mirror, out_data[len(files)+fi]) | |
| fi=fi+1 | |
| else: | |
| for f in files: | |
| features_file = f + "_"+param+".npy" | |
| if replace_existing or not os.path.isfile(features_file): | |
| np.save(features_file, out_data[fi]) | |
| fi=fi+1 | |
| candidate_motion_files = sorted(data_path.glob('**/*.bvh'), key=lambda path: path.parent.__str__()) | |
| #candidate_motion_files = candidate_motion_files[:32] | |
| tasks = distribute_tasks(candidate_motion_files,rank,size) | |
| files = [path.__str__() for i, path in enumerate(candidate_motion_files) if i in tasks] | |
| extract_joint_angles(files) | |