""" Preprocessing Tranformers Based on sci-kit's API By Omid Alemi Created on June 12, 2017 Modified by Simon Alexanderson, 2020-06-24 """ import copy import numpy as np import pandas as pd import scipy.ndimage.filters as filters from scipy.spatial.transform import Rotation as R from sklearn.base import BaseEstimator, TransformerMixin from pymo.Pivots import Pivots from pymo.Quaternions import Quaternions # import transforms3d as t3d class MocapParameterizer(BaseEstimator, TransformerMixin): def __init__(self, param_type="euler"): """ param_type = {'euler', 'quat', 'expmap', 'position', 'expmap2pos'} """ self.param_type = param_type def fit(self, X, y=None): return self def transform(self, X, y=None): print("MocapParameterizer: " + self.param_type) if self.param_type == "euler": return X elif self.param_type == "expmap": return self._to_expmap(X) elif self.param_type == "quat": return X elif self.param_type == "position": return self._to_pos(X) elif self.param_type == "expmap2pos": return self._expmap_to_pos(X) else: raise "param types: euler, quat, expmap, position, expmap2pos" # return X def inverse_transform(self, X, copy=None): if self.param_type == "euler": return X elif self.param_type == "expmap": return self._expmap_to_euler(X) elif self.param_type == "quat": raise "quat2euler is not supported" elif self.param_type == "position": raise "positions 2 eulers is not supported" return X else: raise "param types: euler, quat, expmap, position" def fix_rotvec(self, rots): """fix problems with discontinuous rotation vectors""" new_rots = rots.copy() # Compute angles and alternative rotation angles angs = np.linalg.norm(rots, axis=1) alt_angs = 2 * np.pi - angs # find discontinuities by checking if the alternative representation is closer d_angs = np.diff(angs, axis=0) d_angs2 = alt_angs[1:] - angs[:-1] swps = np.where(np.abs(d_angs2) < np.abs(d_angs))[0] # reshape into intervals where we should flip rotation axis isodd = swps.shape[0] % 2 == 1 if isodd: swps = swps[:-1] intv = 1 + swps.reshape((swps.shape[0] // 2, 2)) # flip rotations in selected intervals for ii in range(intv.shape[0]): new_ax = -rots[intv[ii, 0] : intv[ii, 1], :] / np.tile(angs[intv[ii, 0] : intv[ii, 1], None], (1, 3)) new_angs = alt_angs[intv[ii, 0] : intv[ii, 1]] new_rots[intv[ii, 0] : intv[ii, 1], :] = new_ax * np.tile(new_angs[:, None], (1, 3)) return new_rots def _to_pos(self, X): """Converts joints rotations in Euler angles to joint positions""" Q = [] for track in X: channels = [] titles = [] euler_df = track.values # Create a new DataFrame to store the exponential map rep pos_df = pd.DataFrame(index=euler_df.index) # List the columns that contain rotation channels rot_cols = [c for c in euler_df.columns if ("rotation" in c)] # List the columns that contain position channels pos_cols = [c for c in euler_df.columns if ("position" in c)] # List the joints that are not end sites, i.e., have channels joints = (joint for joint in track.skeleton) tree_data = {} for joint in track.traverse(): parent = track.skeleton[joint]["parent"] rot_order = track.skeleton[joint]["order"] # Get the rotation columns that belong to this joint rc = euler_df[[c for c in rot_cols if joint in c]] # Get the position columns that belong to this joint pc = euler_df[[c for c in pos_cols if joint in c]] # Make sure the columns are organized in xyz order if rc.shape[1] < 3: euler_values = [[0, 0, 0] for f in rc.iterrows()] rot_order = "XYZ" else: euler_values = [ [ f[1]["%s_%srotation" % (joint, rot_order[0])], f[1]["%s_%srotation" % (joint, rot_order[1])], f[1]["%s_%srotation" % (joint, rot_order[2])], ] for f in rc.iterrows() ] if pc.shape[1] < 3: pos_values = [[0, 0, 0] for f in pc.iterrows()] else: pos_values = [ [f[1]["%s_Xposition" % joint], f[1]["%s_Yposition" % joint], f[1]["%s_Zposition" % joint]] for f in pc.iterrows() ] # Convert the eulers to rotation matrices rotmats = R.from_euler(rot_order, euler_values, degrees=True).inv() tree_data[joint] = [[], []] # to store the rotation matrix # to store the calculated position if track.root_name == joint: tree_data[joint][0] = rotmats tree_data[joint][1] = pos_values else: # for every frame i, multiply this joint's rotmat to the rotmat of its parent tree_data[joint][0] = rotmats * tree_data[parent][0] # add the position channel to the offset and store it in k, for every frame i k = pos_values + np.asarray(track.skeleton[joint]["offsets"]) # multiply k to the rotmat of the parent for every frame i q = tree_data[parent][0].inv().apply(k) # add q to the position of the parent, for every frame i tree_data[joint][1] = tree_data[parent][1] + q # Create the corresponding columns in the new DataFrame pos_df["%s_Xposition" % joint] = pd.Series(data=[e[0] for e in tree_data[joint][1]], index=pos_df.index) pos_df["%s_Yposition" % joint] = pd.Series(data=[e[1] for e in tree_data[joint][1]], index=pos_df.index) pos_df["%s_Zposition" % joint] = pd.Series(data=[e[2] for e in tree_data[joint][1]], index=pos_df.index) new_track = track.clone() new_track.values = pos_df Q.append(new_track) return Q def _to_expmap(self, X): """Converts Euler angles to Exponential Maps""" Q = [] for track in X: channels = [] titles = [] euler_df = track.values # Create a new DataFrame to store the exponential map rep exp_df = euler_df.copy() # List the columns that contain rotation channels rots = [c for c in euler_df.columns if ("rotation" in c and "Nub" not in c)] # List the joints that are not end sites, i.e., have channels joints = (joint for joint in track.skeleton if "Nub" not in joint) for joint in joints: r = euler_df[[c for c in rots if joint in c]] # Get the columns that belong to this joint rot_order = track.skeleton[joint]["order"] r1_col = "%s_%srotation" % (joint, rot_order[0]) r2_col = "%s_%srotation" % (joint, rot_order[1]) r3_col = "%s_%srotation" % (joint, rot_order[2]) exp_df.drop([r1_col, r2_col, r3_col], axis=1, inplace=True) euler = [[f[1][r1_col], f[1][r2_col], f[1][r3_col]] for f in r.iterrows()] exps = np.array(self.fix_rotvec(R.from_euler(rot_order.lower(), euler, degrees=True).as_rotvec())) # Create the corresponding columns in the new DataFrame exp_df.insert( loc=0, column="%s_gamma" % joint, value=pd.Series(data=[e[2] for e in exps], index=exp_df.index) ) exp_df.insert( loc=0, column="%s_beta" % joint, value=pd.Series(data=[e[1] for e in exps], index=exp_df.index) ) exp_df.insert( loc=0, column="%s_alpha" % joint, value=pd.Series(data=[e[0] for e in exps], index=exp_df.index) ) # print(exp_df.columns) new_track = track.clone() new_track.values = exp_df Q.append(new_track) return Q def _expmap_to_euler(self, X): Q = [] for track in X: channels = [] titles = [] exp_df = track.values # Create a new DataFrame to store the exponential map rep euler_df = exp_df.copy() # List the columns that contain rotation channels exp_params = [ c for c in exp_df.columns if (any(p in c for p in ["alpha", "beta", "gamma"]) and "Nub" not in c) ] # List the joints that are not end sites, i.e., have channels joints = (joint for joint in track.skeleton if "Nub" not in joint) for joint in joints: r = exp_df[[c for c in exp_params if joint in c]] # Get the columns that belong to this joint euler_df.drop(["%s_alpha" % joint, "%s_beta" % joint, "%s_gamma" % joint], axis=1, inplace=True) expmap = [ [f[1]["%s_alpha" % joint], f[1]["%s_beta" % joint], f[1]["%s_gamma" % joint]] for f in r.iterrows() ] # Make sure the columsn are organized in xyz order rot_order = track.skeleton[joint]["order"] euler_rots = np.array(R.from_rotvec(expmap).as_euler(rot_order.lower(), degrees=True)) # Create the corresponding columns in the new DataFrame euler_df["%s_%srotation" % (joint, rot_order[0])] = pd.Series( data=[e[0] for e in euler_rots], index=euler_df.index ) euler_df["%s_%srotation" % (joint, rot_order[1])] = pd.Series( data=[e[1] for e in euler_rots], index=euler_df.index ) euler_df["%s_%srotation" % (joint, rot_order[2])] = pd.Series( data=[e[2] for e in euler_rots], index=euler_df.index ) new_track = track.clone() new_track.values = euler_df Q.append(new_track) return Q class Mirror(BaseEstimator, TransformerMixin): def __init__(self, axis="X", append=True): """ Mirrors the data """ self.axis = axis self.append = append def fit(self, X, y=None): return self def transform(self, X, y=None): print("Mirror: " + self.axis) Q = [] if self.append: for track in X: Q.append(track) for track in X: channels = [] titles = [] if self.axis == "X": signs = np.array([1, -1, -1]) if self.axis == "Y": signs = np.array([-1, 1, -1]) if self.axis == "Z": signs = np.array([-1, -1, 1]) euler_df = track.values # Create a new DataFrame to store the exponential map rep new_df = pd.DataFrame(index=euler_df.index) # Copy the root positions into the new DataFrame rxp = "%s_Xposition" % track.root_name ryp = "%s_Yposition" % track.root_name rzp = "%s_Zposition" % track.root_name new_df[rxp] = pd.Series(data=-signs[0] * euler_df[rxp], index=new_df.index) new_df[ryp] = pd.Series(data=-signs[1] * euler_df[ryp], index=new_df.index) new_df[rzp] = pd.Series(data=-signs[2] * euler_df[rzp], index=new_df.index) # List the columns that contain rotation channels rots = [c for c in euler_df.columns if ("rotation" in c and "Nub" not in c)] # lft_rots = [c for c in euler_df.columns if ('Left' in c and 'rotation' in c and 'Nub' not in c)] # rgt_rots = [c for c in euler_df.columns if ('Right' in c and 'rotation' in c and 'Nub' not in c)] lft_joints = (joint for joint in track.skeleton if "Left" in joint and "Nub" not in joint) rgt_joints = (joint for joint in track.skeleton if "Right" in joint and "Nub" not in joint) new_track = track.clone() for lft_joint in lft_joints: rgt_joint = lft_joint.replace("Left", "Right") # Create the corresponding columns in the new DataFrame new_df["%s_Xrotation" % lft_joint] = pd.Series( data=signs[0] * track.values["%s_Xrotation" % rgt_joint], index=new_df.index ) new_df["%s_Yrotation" % lft_joint] = pd.Series( data=signs[1] * track.values["%s_Yrotation" % rgt_joint], index=new_df.index ) new_df["%s_Zrotation" % lft_joint] = pd.Series( data=signs[2] * track.values["%s_Zrotation" % rgt_joint], index=new_df.index ) new_df["%s_Xrotation" % rgt_joint] = pd.Series( data=signs[0] * track.values["%s_Xrotation" % lft_joint], index=new_df.index ) new_df["%s_Yrotation" % rgt_joint] = pd.Series( data=signs[1] * track.values["%s_Yrotation" % lft_joint], index=new_df.index ) new_df["%s_Zrotation" % rgt_joint] = pd.Series( data=signs[2] * track.values["%s_Zrotation" % lft_joint], index=new_df.index ) # List the joints that are not left or right, i.e. are on the trunk joints = ( joint for joint in track.skeleton if "Nub" not in joint and "Left" not in joint and "Right" not in joint ) for joint in joints: # Create the corresponding columns in the new DataFrame new_df["%s_Xrotation" % joint] = pd.Series( data=signs[0] * track.values["%s_Xrotation" % joint], index=new_df.index ) new_df["%s_Yrotation" % joint] = pd.Series( data=signs[1] * track.values["%s_Yrotation" % joint], index=new_df.index ) new_df["%s_Zrotation" % joint] = pd.Series( data=signs[2] * track.values["%s_Zrotation" % joint], index=new_df.index ) new_track.values = new_df Q.append(new_track) return Q def inverse_transform(self, X, copy=None, start_pos=None): return X class JointSelector(BaseEstimator, TransformerMixin): """ Allows for filtering the mocap data to include only the selected joints """ def __init__(self, joints, include_root=False): self.joints = joints self.include_root = include_root def fit(self, X, y=None): selected_joints = [] selected_channels = [] if self.include_root: selected_joints.append(X[0].root_name) selected_joints.extend(self.joints) for joint_name in selected_joints: selected_channels.extend([o for o in X[0].values.columns if (joint_name + "_") in o and "Nub" not in o]) self.selected_joints = selected_joints self.selected_channels = selected_channels self.not_selected = X[0].values.columns.difference(selected_channels) self.not_selected_values = {c: X[0].values[c].values[0] for c in self.not_selected} self.orig_skeleton = X[0].skeleton return self def transform(self, X, y=None): print("JointSelector") Q = [] for track in X: t2 = track.clone() for key in track.skeleton.keys(): if key not in self.selected_joints: t2.skeleton.pop(key) t2.values = track.values[self.selected_channels] Q.append(t2) return Q def inverse_transform(self, X, copy=None): Q = [] for track in X: t2 = track.clone() t2.skeleton = self.orig_skeleton for d in self.not_selected: t2.values[d] = self.not_selected_values[d] Q.append(t2) return Q class Numpyfier(BaseEstimator, TransformerMixin): """ Just converts the values in a MocapData object into a numpy array Useful for the final stage of a pipeline before training """ def __init__(self): pass def fit(self, X, y=None): self.org_mocap_ = X[0].clone() self.org_mocap_.values.drop(self.org_mocap_.values.index, inplace=True) return self def transform(self, X, y=None): print("Numpyfier") Q = [] for track in X: Q.append(track.values.values) # print("Numpyfier:" + str(track.values.columns)) return np.array(Q) def inverse_transform(self, X, copy=None): Q = [] for track in X: new_mocap = self.org_mocap_.clone() time_index = pd.to_timedelta([f for f in range(track.shape[0])], unit="s") new_df = pd.DataFrame(data=track, index=time_index, columns=self.org_mocap_.values.columns) new_mocap.values = new_df Q.append(new_mocap) return Q class Slicer(BaseEstimator, TransformerMixin): """ Slice the data into intervals of equal size """ def __init__(self, window_size, overlap=0.5): self.window_size = window_size self.overlap = overlap pass def fit(self, X, y=None): self.org_mocap_ = X[0].clone() self.org_mocap_.values.drop(self.org_mocap_.values.index, inplace=True) return self def transform(self, X, y=None): print("Slicer") Q = [] for track in X: vals = track.values.values nframes = vals.shape[0] overlap_frames = (int)(self.overlap * self.window_size) n_sequences = (nframes - overlap_frames) // (self.window_size - overlap_frames) if n_sequences > 0: y = np.zeros((n_sequences, self.window_size, vals.shape[1])) # extract sequences from the input data for i in range(0, n_sequences): frameIdx = (self.window_size - overlap_frames) * i Q.append(vals[frameIdx : frameIdx + self.window_size, :]) return np.array(Q) def inverse_transform(self, X, copy=None): Q = [] for track in X: new_mocap = self.org_mocap_.clone() time_index = pd.to_timedelta([f for f in range(track.shape[0])], unit="s") new_df = pd.DataFrame(data=track, index=time_index, columns=self.org_mocap_.values.columns) new_mocap.values = new_df Q.append(new_mocap) return Q class RootTransformer(BaseEstimator, TransformerMixin): def __init__(self, method, position_smoothing=0, rotation_smoothing=0): """ Accepted methods: abdolute_translation_deltas pos_rot_deltas """ self.method = method self.position_smoothing = position_smoothing self.rotation_smoothing = rotation_smoothing def fit(self, X, y=None): return self def transform(self, X, y=None): print("RootTransformer") Q = [] for track in X: if self.method == "abdolute_translation_deltas": new_df = track.values.copy() xpcol = "%s_Xposition" % track.root_name ypcol = "%s_Yposition" % track.root_name zpcol = "%s_Zposition" % track.root_name dxpcol = "%s_dXposition" % track.root_name dzpcol = "%s_dZposition" % track.root_name x = track.values[xpcol].copy() z = track.values[zpcol].copy() if self.position_smoothing > 0: x_sm = filters.gaussian_filter1d(x, self.position_smoothing, axis=0, mode="nearest") z_sm = filters.gaussian_filter1d(z, self.position_smoothing, axis=0, mode="nearest") dx = pd.Series(data=x_sm, index=new_df.index).diff() dz = pd.Series(data=z_sm, index=new_df.index).diff() new_df[xpcol] = x - x_sm new_df[zpcol] = z - z_sm else: dx = x.diff() dz = z.diff() new_df.drop([xpcol, zpcol], axis=1, inplace=True) dx[0] = dx[1] dz[0] = dz[1] new_df[dxpcol] = dx new_df[dzpcol] = dz new_track = track.clone() new_track.values = new_df # end of abdolute_translation_deltas elif self.method == "pos_rot_deltas": new_track = track.clone() # Absolute columns xp_col = "%s_Xposition" % track.root_name yp_col = "%s_Yposition" % track.root_name zp_col = "%s_Zposition" % track.root_name # rot_order = track.skeleton[track.root_name]['order'] # %(joint, rot_order[0]) rot_order = track.skeleton[track.root_name]["order"] r1_col = "%s_%srotation" % (track.root_name, rot_order[0]) r2_col = "%s_%srotation" % (track.root_name, rot_order[1]) r3_col = "%s_%srotation" % (track.root_name, rot_order[2]) # Delta columns dxp_col = "%s_dXposition" % track.root_name dzp_col = "%s_dZposition" % track.root_name dxr_col = "%s_dXrotation" % track.root_name dyr_col = "%s_dYrotation" % track.root_name dzr_col = "%s_dZrotation" % track.root_name positions = np.transpose(np.array([track.values[xp_col], track.values[yp_col], track.values[zp_col]])) rotations = ( np.pi / 180.0 * np.transpose(np.array([track.values[r1_col], track.values[r2_col], track.values[r3_col]])) ) """ Get Trajectory and smooth it""" trajectory_filterwidth = self.position_smoothing reference = positions.copy() * np.array([1, 0, 1]) if trajectory_filterwidth > 0: reference = filters.gaussian_filter1d(reference, trajectory_filterwidth, axis=0, mode="nearest") """ Get Root Velocity """ velocity = np.diff(reference, axis=0) velocity = np.vstack((velocity[0, :], velocity)) """ Remove Root Translation """ positions = positions - reference """ Get Forward Direction along the x-z plane, assuming character is facig z-forward """ # forward = [Rotation(f, 'euler', from_deg=True, order=rot_order).rotmat[:,2] for f in rotations] # get the z-axis of the rotation matrix, assuming character is facig z-forward # print("order:" + rot_order.lower()) quats = Quaternions.from_euler(rotations, order=rot_order.lower(), world=False) forward = quats * np.array([[0, 0, 1]]) forward[:, 1] = 0 """ Smooth Forward Direction """ direction_filterwidth = self.rotation_smoothing if direction_filterwidth > 0: forward = filters.gaussian_filter1d(forward, direction_filterwidth, axis=0, mode="nearest") forward = forward / np.sqrt((forward**2).sum(axis=-1))[..., np.newaxis] """ Remove Y Rotation """ target = np.array([[0, 0, 1]]).repeat(len(forward), axis=0) rotation = Quaternions.between(target, forward)[:, np.newaxis] positions = (-rotation[:, 0]) * positions new_rotations = (-rotation[:, 0]) * quats """ Get Root Rotation """ # print(rotation[:,0]) velocity = (-rotation[:, 0]) * velocity rvelocity = Pivots.from_quaternions(rotation[1:] * -rotation[:-1]).ps rvelocity = np.vstack((rvelocity[0], rvelocity)) # eulers = np.array([t3d.euler.quat2euler(q, axes=('s'+rot_order.lower()[::-1]))[::-1] for q in new_rotations])*180.0/np.pi # we need to put scalar last, and swap rotation order. eulers = R.from_quat(np.array(new_rotations)[:, [1, 2, 3, 0]]).as_euler( rot_order.lower()[::-1], degrees=True )[:, ::-1] new_df = track.values.copy() root_pos_x = pd.Series(data=positions[:, 0], index=new_df.index) root_pos_y = pd.Series(data=positions[:, 1], index=new_df.index) root_pos_z = pd.Series(data=positions[:, 2], index=new_df.index) root_pos_x_diff = pd.Series(data=velocity[:, 0], index=new_df.index) root_pos_z_diff = pd.Series(data=velocity[:, 2], index=new_df.index) root_rot_1 = pd.Series(data=eulers[:, 0], index=new_df.index) root_rot_2 = pd.Series(data=eulers[:, 1], index=new_df.index) root_rot_3 = pd.Series(data=eulers[:, 2], index=new_df.index) root_rot_y_diff = pd.Series(data=rvelocity[:, 0], index=new_df.index) # new_df.drop([xr_col, yr_col, zr_col, xp_col, zp_col], axis=1, inplace=True) new_df[xp_col] = root_pos_x new_df[yp_col] = root_pos_y new_df[zp_col] = root_pos_z new_df[dxp_col] = root_pos_x_diff new_df[dzp_col] = root_pos_z_diff new_df[r1_col] = root_rot_1 new_df[r2_col] = root_rot_2 new_df[r3_col] = root_rot_3 # new_df[dxr_col] = root_rot_x_diff new_df[dyr_col] = root_rot_y_diff # new_df[dzr_col] = root_rot_z_diff new_track.values = new_df elif self.method == "hip_centric": new_track = track.clone() # Absolute columns xp_col = "%s_Xposition" % track.root_name yp_col = "%s_Yposition" % track.root_name zp_col = "%s_Zposition" % track.root_name xr_col = "%s_Xrotation" % track.root_name yr_col = "%s_Yrotation" % track.root_name zr_col = "%s_Zrotation" % track.root_name new_df = track.values.copy() all_zeros = np.zeros(track.values[xp_col].values.shape) new_df[xp_col] = pd.Series(data=all_zeros, index=new_df.index) new_df[yp_col] = pd.Series(data=all_zeros, index=new_df.index) new_df[zp_col] = pd.Series(data=all_zeros, index=new_df.index) new_df[xr_col] = pd.Series(data=all_zeros, index=new_df.index) new_df[yr_col] = pd.Series(data=all_zeros, index=new_df.index) new_df[zr_col] = pd.Series(data=all_zeros, index=new_df.index) new_track.values = new_df # print(new_track.values.columns) Q.append(new_track) return Q def inverse_transform(self, X, copy=None, start_pos=None): Q = [] # TODO: simplify this implementation startx = 0 startz = 0 if start_pos is not None: startx, startz = start_pos for track in X: new_track = track.clone() if self.method == "abdolute_translation_deltas": new_df = new_track.values xpcol = "%s_Xposition" % track.root_name ypcol = "%s_Yposition" % track.root_name zpcol = "%s_Zposition" % track.root_name dxpcol = "%s_dXposition" % track.root_name dzpcol = "%s_dZposition" % track.root_name dx = track.values[dxpcol].values dz = track.values[dzpcol].values recx = [startx] recz = [startz] for i in range(dx.shape[0] - 1): recx.append(recx[i] + dx[i + 1]) recz.append(recz[i] + dz[i + 1]) # recx = [recx[i]+dx[i+1] for i in range(dx.shape[0]-1)] # recz = [recz[i]+dz[i+1] for i in range(dz.shape[0]-1)] # recx = dx[:-1] + dx[1:] # recz = dz[:-1] + dz[1:] if self.position_smoothing > 0: new_df[xpcol] = pd.Series(data=new_df[xpcol] + recx, index=new_df.index) new_df[zpcol] = pd.Series(data=new_df[zpcol] + recz, index=new_df.index) else: new_df[xpcol] = pd.Series(data=recx, index=new_df.index) new_df[zpcol] = pd.Series(data=recz, index=new_df.index) new_df.drop([dxpcol, dzpcol], axis=1, inplace=True) new_track.values = new_df # end of abdolute_translation_deltas elif self.method == "pos_rot_deltas": # Absolute columns rot_order = track.skeleton[track.root_name]["order"] xp_col = "%s_Xposition" % track.root_name yp_col = "%s_Yposition" % track.root_name zp_col = "%s_Zposition" % track.root_name xr_col = "%s_Xrotation" % track.root_name yr_col = "%s_Yrotation" % track.root_name zr_col = "%s_Zrotation" % track.root_name r1_col = "%s_%srotation" % (track.root_name, rot_order[0]) r2_col = "%s_%srotation" % (track.root_name, rot_order[1]) r3_col = "%s_%srotation" % (track.root_name, rot_order[2]) # Delta columns dxp_col = "%s_dXposition" % track.root_name dzp_col = "%s_dZposition" % track.root_name dyr_col = "%s_dYrotation" % track.root_name positions = np.transpose(np.array([track.values[xp_col], track.values[yp_col], track.values[zp_col]])) rotations = ( np.pi / 180.0 * np.transpose(np.array([track.values[r1_col], track.values[r2_col], track.values[r3_col]])) ) quats = Quaternions.from_euler(rotations, order=rot_order.lower(), world=False) new_df = track.values.copy() dx = track.values[dxp_col].values dz = track.values[dzp_col].values dry = track.values[dyr_col].values # rec_p = np.array([startx, 0, startz])+positions[0,:] rec_ry = Quaternions.id(quats.shape[0]) rec_xp = [0] rec_zp = [0] # rec_r = Quaternions.id(quats.shape[0]) for i in range(dx.shape[0] - 1): # print(dry[i]) q_y = Quaternions.from_angle_axis(np.array(dry[i + 1]), np.array([0, 1, 0])) rec_ry[i + 1] = q_y * rec_ry[i] # print("dx: + " + str(dx[i+1])) dp = rec_ry[i + 1] * np.array([dx[i + 1], 0, dz[i + 1]]) rec_xp.append(rec_xp[i] + dp[0, 0]) rec_zp.append(rec_zp[i] + dp[0, 2]) rec_r = rec_ry * quats pp = rec_ry * positions rec_xp = rec_xp + pp[:, 0] rec_zp = rec_zp + pp[:, 2] # eulers = np.array([t3d.euler.quat2euler(q, axes=('s'+rot_order.lower()[::-1]))[::-1] for q in rec_r])*180.0/np.pi # we need to put scalar last, and swap rotation order. eulers = R.from_quat(np.array(rec_r)[:, [1, 2, 3, 0]]).as_euler(rot_order.lower()[::-1], degrees=True)[ :, ::-1 ] new_df = track.values.copy() root_rot_1 = pd.Series(data=eulers[:, 0], index=new_df.index) root_rot_2 = pd.Series(data=eulers[:, 1], index=new_df.index) root_rot_3 = pd.Series(data=eulers[:, 2], index=new_df.index) new_df[xp_col] = pd.Series(data=rec_xp, index=new_df.index) new_df[zp_col] = pd.Series(data=rec_zp, index=new_df.index) new_df[r1_col] = pd.Series(data=root_rot_1, index=new_df.index) new_df[r2_col] = pd.Series(data=root_rot_2, index=new_df.index) new_df[r3_col] = pd.Series(data=root_rot_3, index=new_df.index) new_df.drop([dyr_col, dxp_col, dzp_col], axis=1, inplace=True) new_track.values = new_df # print(new_track.values.columns) Q.append(new_track) return Q class RootCentricPositionNormalizer(BaseEstimator, TransformerMixin): def __init__(self): pass def fit(self, X, y=None): return self def transform(self, X, y=None): Q = [] for track in X: new_track = track.clone() rxp = "%s_Xposition" % track.root_name ryp = "%s_Yposition" % track.root_name rzp = "%s_Zposition" % track.root_name projected_root_pos = track.values[[rxp, ryp, rzp]] projected_root_pos.loc[:, ryp] = 0 # we want the root's projection on the floor plane as the ref new_df = pd.DataFrame(index=track.values.index) all_but_root = [joint for joint in track.skeleton if track.root_name not in joint] # all_but_root = [joint for joint in track.skeleton] for joint in all_but_root: new_df["%s_Xposition" % joint] = pd.Series( data=track.values["%s_Xposition" % joint] - projected_root_pos[rxp], index=new_df.index ) new_df["%s_Yposition" % joint] = pd.Series( data=track.values["%s_Yposition" % joint] - projected_root_pos[ryp], index=new_df.index ) new_df["%s_Zposition" % joint] = pd.Series( data=track.values["%s_Zposition" % joint] - projected_root_pos[rzp], index=new_df.index ) # keep the root as it is now new_df[rxp] = track.values[rxp] new_df[ryp] = track.values[ryp] new_df[rzp] = track.values[rzp] new_track.values = new_df Q.append(new_track) return Q def inverse_transform(self, X, copy=None): Q = [] for track in X: new_track = track.clone() rxp = "%s_Xposition" % track.root_name ryp = "%s_Yposition" % track.root_name rzp = "%s_Zposition" % track.root_name projected_root_pos = track.values[[rxp, ryp, rzp]] projected_root_pos.loc[:, ryp] = 0 # we want the root's projection on the floor plane as the ref new_df = pd.DataFrame(index=track.values.index) for joint in track.skeleton: new_df["%s_Xposition" % joint] = pd.Series( data=track.values["%s_Xposition" % joint] + projected_root_pos[rxp], index=new_df.index ) new_df["%s_Yposition" % joint] = pd.Series( data=track.values["%s_Yposition" % joint] + projected_root_pos[ryp], index=new_df.index ) new_df["%s_Zposition" % joint] = pd.Series( data=track.values["%s_Zposition" % joint] + projected_root_pos[rzp], index=new_df.index ) new_track.values = new_df Q.append(new_track) return Q class Flattener(BaseEstimator, TransformerMixin): def __init__(self): pass def fit(self, X, y=None): return self def transform(self, X, y=None): return np.concatenate(X, axis=0) class ConstantsRemover(BaseEstimator, TransformerMixin): """ For now it just looks at the first track """ def __init__(self, eps=1e-6): self.eps = eps def fit(self, X, y=None): stds = X[0].values.std() cols = X[0].values.columns.values self.const_dims_ = [c for c in cols if (stds[c] < self.eps).any()] self.const_values_ = {c: X[0].values[c].values[0] for c in cols if (stds[c] < self.eps).any()} return self def transform(self, X, y=None): Q = [] for track in X: t2 = track.clone() # for key in t2.skeleton.keys(): # if key in self.ConstDims_: # t2.skeleton.pop(key) # print(track.values.columns.difference(self.const_dims_)) t2.values.drop(self.const_dims_, axis=1, inplace=True) # t2.values = track.values[track.values.columns.difference(self.const_dims_)] Q.append(t2) return Q def inverse_transform(self, X, copy=None): Q = [] for track in X: t2 = track.clone() for d in self.const_dims_: t2.values[d] = self.const_values_[d] # t2.values.assign(d=pd.Series(data=self.const_values_[d], index = t2.values.index)) Q.append(t2) return Q class ListStandardScaler(BaseEstimator, TransformerMixin): def __init__(self, is_DataFrame=False): self.is_DataFrame = is_DataFrame def fit(self, X, y=None): if self.is_DataFrame: X_train_flat = np.concatenate([m.values for m in X], axis=0) else: X_train_flat = np.concatenate([m for m in X], axis=0) self.data_mean_ = np.mean(X_train_flat, axis=0) self.data_std_ = np.std(X_train_flat, axis=0) return self def transform(self, X, y=None): Q = [] for track in X: if self.is_DataFrame: normalized_track = track.copy() normalized_track.values = (track.values - self.data_mean_) / self.data_std_ else: normalized_track = (track - self.data_mean_) / self.data_std_ Q.append(normalized_track) if self.is_DataFrame: return Q else: return np.array(Q) def inverse_transform(self, X, copy=None): Q = [] for track in X: if self.is_DataFrame: unnormalized_track = track.copy() unnormalized_track.values = (track.values * self.data_std_) + self.data_mean_ else: unnormalized_track = (track * self.data_std_) + self.data_mean_ Q.append(unnormalized_track) if self.is_DataFrame: return Q else: return np.array(Q) class ListMinMaxScaler(BaseEstimator, TransformerMixin): def __init__(self, is_DataFrame=False): self.is_DataFrame = is_DataFrame def fit(self, X, y=None): if self.is_DataFrame: X_train_flat = np.concatenate([m.values for m in X], axis=0) else: X_train_flat = np.concatenate([m for m in X], axis=0) self.data_max_ = np.max(X_train_flat, axis=0) self.data_min_ = np.min(X_train_flat, axis=0) return self def transform(self, X, y=None): Q = [] for track in X: if self.is_DataFrame: normalized_track = track.copy() normalized_track.values = (track.values - self.data_min_) / (self.data_max_ - self.data_min_) else: normalized_track = (track - self.data_min_) / (self.data_max_ - self.data_min_) Q.append(normalized_track) if self.is_DataFrame: return Q else: return np.array(Q) def inverse_transform(self, X, copy=None): Q = [] for track in X: if self.is_DataFrame: unnormalized_track = track.copy() unnormalized_track.values = (track.values * (self.data_max_ - self.data_min_)) + self.data_min_ else: unnormalized_track = (track * (self.data_max_ - self.data_min_)) + self.data_min_ Q.append(unnormalized_track) if self.is_DataFrame: return Q else: return np.array(Q) class DownSampler(BaseEstimator, TransformerMixin): def __init__(self, tgt_fps, keep_all=False): self.tgt_fps = tgt_fps self.keep_all = keep_all def fit(self, X, y=None): return self def transform(self, X, y=None): Q = [] for track in X: orig_fps = round(1.0 / track.framerate) rate = orig_fps // self.tgt_fps if orig_fps % self.tgt_fps != 0: print( "error orig_fps (" + str(orig_fps) + ") is not dividable with tgt_fps (" + str(self.tgt_fps) + ")" ) else: print("downsampling with rate: " + str(rate)) for ii in range(0, rate): new_track = track.clone() new_track.values = track.values[ii:-1:rate].copy() new_track.framerate = 1.0 / self.tgt_fps Q.append(new_track) if not self.keep_all: break return Q def inverse_transform(self, X, copy=None): return X class ReverseTime(BaseEstimator, TransformerMixin): def __init__(self, append=True): self.append = append def fit(self, X, y=None): return self def transform(self, X, y=None): Q = [] if self.append: for track in X: Q.append(track) for track in X: new_track = track.clone() new_track.values = track.values[-1::-1] Q.append(new_track) return Q def inverse_transform(self, X, copy=None): return X # TODO: JointsSelector (x) # TODO: SegmentMaker # TODO: DynamicFeaturesAdder # TODO: ShapeFeaturesAdder # TODO: DataFrameNumpier (x) class TemplateTransform(BaseEstimator, TransformerMixin): def __init__(self): pass def fit(self, X, y=None): return self def transform(self, X, y=None): return X