Spaces:
Runtime error
Runtime error
import argparse | |
import datetime | |
from analysis.pymo.parsers import BVHParser | |
from analysis.pymo.writers import BVHWriter | |
from pymo.viz_tools import * | |
from pymo.preprocessing import * | |
from sklearn.pipeline import Pipeline | |
from matplotlib.animation import FuncAnimation | |
from pathlib import Path | |
import csv | |
import matplotlib.pyplot as plt | |
from mpl_toolkits.mplot3d import Axes3D | |
# %matplotlib | |
def mp4_for_bvh_file(filename, output_dir): | |
if len(joint_names) > 0: | |
data_pipe = Pipeline([ | |
('dwnsampl', DownSampler(tgt_fps=args.fps, keep_all=False)), | |
('root', RootTransformer('pos_rot_deltas')), | |
("pos", MocapParameterizer("position")), | |
]) | |
else: | |
data_pipe = Pipeline([ | |
('dwnsampl', DownSampler(tgt_fps=args.fps, keep_all=False)), | |
('root', RootTransformer('pos_rot_deltas')), | |
("pos", MocapParameterizer("position")), | |
]) | |
parser = BVHParser() | |
parsed_data = parser.parse(filename) | |
piped_data = data_pipe.fit_transform([parsed_data]) | |
assert len(piped_data) == 1 | |
render_mp4(piped_data[0], output_dir.__str__() + "/"+ filename.stem.__str__()+".mp4", axis_scale=3, elev=0, azim=45) | |
return piped_data, data_pipe | |
def load_bvh_file(filename, joint_names=[], param="position"): | |
if len(joint_names) > 0: | |
data_pipe = Pipeline([ | |
('dwnsampl', DownSampler(tgt_fps=args.fps, keep_all=False)), | |
('root', RootTransformer('pos_rot_deltas')), | |
(param, MocapParameterizer(param)), | |
('jtsel', JointSelector(joint_names, include_root=False)), | |
('np', Numpyfier()) | |
]) | |
else: | |
data_pipe = Pipeline([ | |
('dwnsampl', DownSampler(tgt_fps=args.fps, keep_all=False)), | |
('root', RootTransformer('pos_rot_deltas')), | |
(param, MocapParameterizer(param)), | |
('np', Numpyfier()) | |
]) | |
parser = BVHParser() | |
parsed_data = parser.parse(filename) | |
piped_data = data_pipe.fit_transform([parsed_data]) | |
assert len(piped_data) == 1 | |
return piped_data, data_pipe | |
def save_below_floor_tuples(below_floor_tuples, outfile_path): | |
with open(outfile_path, mode='w') as csv_file: | |
fieldnames = ['video_path', 'offset'] | |
csv_writer = csv.DictWriter(csv_file, fieldnames=fieldnames) | |
csv_writer.writeheader() | |
for (video_name, offset) in below_floor_tuples: | |
csv_writer.writerow({ | |
"video_path": video_name, | |
"offset": offset, | |
}) | |
def save_jump_tuples(jump_tuples, outfile_path): | |
with open(outfile_path, mode='w') as csv_file: | |
fieldnames = ['video_path', 'jump_time', 'jump_size'] | |
csv_writer = csv.DictWriter(csv_file, fieldnames=fieldnames) | |
csv_writer.writeheader() | |
for (video_name, jump_size, jump_time) in jump_tuples: | |
csv_writer.writerow({ | |
"video_path": video_name, | |
"jump_time": jump_time, | |
"jump_size": jump_size | |
}) | |
secs = jump_time/args.fps | |
print( | |
"\n\nmax jump video: {}\nmax jump time: {}\n max jump: {}".format( | |
video_name, datetime.timedelta(seconds=secs), jump_size | |
)) | |
print('vlc command:\nvlc "{}" --start-time {}'.format(str(video_name).replace(".bvh", ""), secs-5)) | |
def calculate_jumps(traj): | |
return np.abs(traj[1:] - traj[:-1]) | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser() | |
parser.add_argument('--dir', type=str, help='path to the bhv files dir') | |
parser.add_argument("--fps", type=int, default=60) | |
parser.add_argument("--top-k", type=int, default=10, help="save top k biggest jumps") | |
parser.add_argument("--param", type=str, default="position") | |
parser.add_argument("--detect-below-floor", action="store_true") | |
parser.add_argument("--floor-z", type=float, default=-0.08727) | |
parser.add_argument("--ignore-first-secs", type=float, default=0) | |
parser.add_argument("--plot", action="store_true", help="plot jump distributions") | |
parser.add_argument("--mp4", action="store_true", help="create mp4 visualisation") | |
parser.add_argument("--output-dir", default="analysis/data_cleaning") | |
args = parser.parse_args() | |
if args.detect_below_floor: | |
if args.param != "position": | |
raise ValueError("param must be position for below floor and is {}.".format(args.param)) | |
output_dir = Path(args.output_dir) | |
jumps_output_file = (output_dir / "jumps_fps_{}_param_{}".format(args.fps, args.param)).with_suffix(".csv") | |
below_floor_output_file = (output_dir / "below_floor").with_suffix(".csv") | |
# which joint to load | |
# joint_names = ['Spine', 'Spine1', 'Spine2', 'Neck', 'Head', 'RightShoulder', 'RightArm', 'RightForeArm', 'RightHand', | |
# 'LeftShoulder', 'LeftArm', 'LeftForeArm', 'LeftHand', 'RightUpLeg', 'RightLeg', 'RightFoot', | |
# 'RightToeBase', 'LeftUpLeg', 'LeftLeg', 'LeftFoot', 'LeftToeBase'] | |
# joint_names = ['Spine', 'Spine1', 'Spine2', 'Neck', 'Head', 'RightShoulder', 'RightArm', 'RightForeArm', 'RightHand', 'LeftShoulder', 'LeftArm', 'LeftForeArm', 'LeftHand', 'RightUpLeg', 'RightLeg', 'RightFoot', 'RightToeBase', 'LeftUpLeg', 'LeftLeg', 'LeftFoot', 'LeftToeBase'] | |
# joint_names = ["Head"] | |
joint_names = [] | |
all_jumps = [] | |
below_floor_tuples = [] | |
jump_tuples_per_step = [] | |
filenames = list(Path(args.dir).glob("*.bvh")) | |
for i, filename in enumerate(filenames): | |
print("[{}/{}]".format(i, len(filenames))) | |
piped_data, data_pipe = load_bvh_file(filename, joint_names=joint_names, param=args.param) | |
if piped_data.size == 0: | |
raise ValueError("No joints found. {} ".format(joint_names)) | |
traj = piped_data[0] | |
# jumps | |
traj_jumps = calculate_jumps(traj) | |
all_jumps.append(traj_jumps) | |
max_per_step = traj_jumps.max(axis=-1) | |
for pos, st in enumerate(max_per_step): | |
# ignore jumps at the beginning | |
if pos/args.fps > args.ignore_first_secs: | |
jump_tuples_per_step.append((filename, st, pos)) | |
# below the floor | |
if args.detect_below_floor: | |
# detect if below the floor | |
traj_per_obj = traj.reshape(traj.shape[0], -1, 3) | |
min_z = traj_per_obj[:, :, 1].min() | |
eps = 0.01 | |
if min_z < (args.floor_z - eps): | |
below_floor_tuples.append((filename, min_z)) | |
if args.mp4: | |
mp4_for_bvh_file(filename=filename, output_dir=output_dir) | |
# k biggest jumps | |
top_k_jumps = sorted(jump_tuples_per_step, key=lambda s: s[1], reverse=True)[:args.top_k] | |
save_jump_tuples(top_k_jumps, jumps_output_file) | |
save_below_floor_tuples(below_floor_tuples, below_floor_output_file) | |
if args.plot: | |
all_jumps = np.vstack(all_jumps) | |
# plot | |
for j in range(all_jumps.shape[-1]): | |
joint_jumps = all_jumps[:, j] | |
plt.scatter(j*np.ones_like(joint_jumps), joint_jumps, s=1) | |
plt.savefig(output_dir / "joint_distances.png") | |
plt.savefig(output_dir / "joint_distances.svg") | |