myTest01 / analysis /data_cleaner.py
meng2003's picture
Upload 357 files
2d5fdd1
import argparse
import datetime
from analysis.pymo.parsers import BVHParser
from analysis.pymo.writers import BVHWriter
from pymo.viz_tools import *
from pymo.preprocessing import *
from sklearn.pipeline import Pipeline
from matplotlib.animation import FuncAnimation
from pathlib import Path
import csv
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
# %matplotlib
def mp4_for_bvh_file(filename, output_dir):
if len(joint_names) > 0:
data_pipe = Pipeline([
('dwnsampl', DownSampler(tgt_fps=args.fps, keep_all=False)),
('root', RootTransformer('pos_rot_deltas')),
("pos", MocapParameterizer("position")),
])
else:
data_pipe = Pipeline([
('dwnsampl', DownSampler(tgt_fps=args.fps, keep_all=False)),
('root', RootTransformer('pos_rot_deltas')),
("pos", MocapParameterizer("position")),
])
parser = BVHParser()
parsed_data = parser.parse(filename)
piped_data = data_pipe.fit_transform([parsed_data])
assert len(piped_data) == 1
render_mp4(piped_data[0], output_dir.__str__() + "/"+ filename.stem.__str__()+".mp4", axis_scale=3, elev=0, azim=45)
return piped_data, data_pipe
def load_bvh_file(filename, joint_names=[], param="position"):
if len(joint_names) > 0:
data_pipe = Pipeline([
('dwnsampl', DownSampler(tgt_fps=args.fps, keep_all=False)),
('root', RootTransformer('pos_rot_deltas')),
(param, MocapParameterizer(param)),
('jtsel', JointSelector(joint_names, include_root=False)),
('np', Numpyfier())
])
else:
data_pipe = Pipeline([
('dwnsampl', DownSampler(tgt_fps=args.fps, keep_all=False)),
('root', RootTransformer('pos_rot_deltas')),
(param, MocapParameterizer(param)),
('np', Numpyfier())
])
parser = BVHParser()
parsed_data = parser.parse(filename)
piped_data = data_pipe.fit_transform([parsed_data])
assert len(piped_data) == 1
return piped_data, data_pipe
def save_below_floor_tuples(below_floor_tuples, outfile_path):
with open(outfile_path, mode='w') as csv_file:
fieldnames = ['video_path', 'offset']
csv_writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
csv_writer.writeheader()
for (video_name, offset) in below_floor_tuples:
csv_writer.writerow({
"video_path": video_name,
"offset": offset,
})
def save_jump_tuples(jump_tuples, outfile_path):
with open(outfile_path, mode='w') as csv_file:
fieldnames = ['video_path', 'jump_time', 'jump_size']
csv_writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
csv_writer.writeheader()
for (video_name, jump_size, jump_time) in jump_tuples:
csv_writer.writerow({
"video_path": video_name,
"jump_time": jump_time,
"jump_size": jump_size
})
secs = jump_time/args.fps
print(
"\n\nmax jump video: {}\nmax jump time: {}\n max jump: {}".format(
video_name, datetime.timedelta(seconds=secs), jump_size
))
print('vlc command:\nvlc "{}" --start-time {}'.format(str(video_name).replace(".bvh", ""), secs-5))
def calculate_jumps(traj):
return np.abs(traj[1:] - traj[:-1])
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--dir', type=str, help='path to the bhv files dir')
parser.add_argument("--fps", type=int, default=60)
parser.add_argument("--top-k", type=int, default=10, help="save top k biggest jumps")
parser.add_argument("--param", type=str, default="position")
parser.add_argument("--detect-below-floor", action="store_true")
parser.add_argument("--floor-z", type=float, default=-0.08727)
parser.add_argument("--ignore-first-secs", type=float, default=0)
parser.add_argument("--plot", action="store_true", help="plot jump distributions")
parser.add_argument("--mp4", action="store_true", help="create mp4 visualisation")
parser.add_argument("--output-dir", default="analysis/data_cleaning")
args = parser.parse_args()
if args.detect_below_floor:
if args.param != "position":
raise ValueError("param must be position for below floor and is {}.".format(args.param))
output_dir = Path(args.output_dir)
jumps_output_file = (output_dir / "jumps_fps_{}_param_{}".format(args.fps, args.param)).with_suffix(".csv")
below_floor_output_file = (output_dir / "below_floor").with_suffix(".csv")
# which joint to load
# joint_names = ['Spine', 'Spine1', 'Spine2', 'Neck', 'Head', 'RightShoulder', 'RightArm', 'RightForeArm', 'RightHand',
# 'LeftShoulder', 'LeftArm', 'LeftForeArm', 'LeftHand', 'RightUpLeg', 'RightLeg', 'RightFoot',
# 'RightToeBase', 'LeftUpLeg', 'LeftLeg', 'LeftFoot', 'LeftToeBase']
# joint_names = ['Spine', 'Spine1', 'Spine2', 'Neck', 'Head', 'RightShoulder', 'RightArm', 'RightForeArm', 'RightHand', 'LeftShoulder', 'LeftArm', 'LeftForeArm', 'LeftHand', 'RightUpLeg', 'RightLeg', 'RightFoot', 'RightToeBase', 'LeftUpLeg', 'LeftLeg', 'LeftFoot', 'LeftToeBase']
# joint_names = ["Head"]
joint_names = []
all_jumps = []
below_floor_tuples = []
jump_tuples_per_step = []
filenames = list(Path(args.dir).glob("*.bvh"))
for i, filename in enumerate(filenames):
print("[{}/{}]".format(i, len(filenames)))
piped_data, data_pipe = load_bvh_file(filename, joint_names=joint_names, param=args.param)
if piped_data.size == 0:
raise ValueError("No joints found. {} ".format(joint_names))
traj = piped_data[0]
# jumps
traj_jumps = calculate_jumps(traj)
all_jumps.append(traj_jumps)
max_per_step = traj_jumps.max(axis=-1)
for pos, st in enumerate(max_per_step):
# ignore jumps at the beginning
if pos/args.fps > args.ignore_first_secs:
jump_tuples_per_step.append((filename, st, pos))
# below the floor
if args.detect_below_floor:
# detect if below the floor
traj_per_obj = traj.reshape(traj.shape[0], -1, 3)
min_z = traj_per_obj[:, :, 1].min()
eps = 0.01
if min_z < (args.floor_z - eps):
below_floor_tuples.append((filename, min_z))
if args.mp4:
mp4_for_bvh_file(filename=filename, output_dir=output_dir)
# k biggest jumps
top_k_jumps = sorted(jump_tuples_per_step, key=lambda s: s[1], reverse=True)[:args.top_k]
save_jump_tuples(top_k_jumps, jumps_output_file)
save_below_floor_tuples(below_floor_tuples, below_floor_output_file)
if args.plot:
all_jumps = np.vstack(all_jumps)
# plot
for j in range(all_jumps.shape[-1]):
joint_jumps = all_jumps[:, j]
plt.scatter(j*np.ones_like(joint_jumps), joint_jumps, s=1)
plt.savefig(output_dir / "joint_distances.png")
plt.savefig(output_dir / "joint_distances.svg")