|
|
|
|
|
import torch |
|
import numpy as np |
|
from pykalman import KalmanFilter |
|
PI = np.pi |
|
|
|
device = "cuda" |
|
def get_rotation_matrix(pitch_, yaw_, roll_): |
|
""" the input is in degree |
|
""" |
|
|
|
pitch = pitch_ / 180 * PI |
|
yaw = yaw_ / 180 * PI |
|
roll = roll_ / 180 * PI |
|
|
|
device = pitch.device |
|
|
|
if pitch.ndim == 1: |
|
pitch = pitch.unsqueeze(1) |
|
if yaw.ndim == 1: |
|
yaw = yaw.unsqueeze(1) |
|
if roll.ndim == 1: |
|
roll = roll.unsqueeze(1) |
|
|
|
|
|
bs = pitch.shape[0] |
|
ones = torch.ones([bs, 1]).to(device) |
|
zeros = torch.zeros([bs, 1]).to(device) |
|
x, y, z = pitch, yaw, roll |
|
|
|
rot_x = torch.cat([ |
|
ones, zeros, zeros, |
|
zeros, torch.cos(x), -torch.sin(x), |
|
zeros, torch.sin(x), torch.cos(x) |
|
], dim=1).reshape([bs, 3, 3]) |
|
|
|
rot_y = torch.cat([ |
|
torch.cos(y), zeros, torch.sin(y), |
|
zeros, ones, zeros, |
|
-torch.sin(y), zeros, torch.cos(y) |
|
], dim=1).reshape([bs, 3, 3]) |
|
|
|
rot_z = torch.cat([ |
|
torch.cos(z), -torch.sin(z), zeros, |
|
torch.sin(z), torch.cos(z), zeros, |
|
zeros, zeros, ones |
|
], dim=1).reshape([bs, 3, 3]) |
|
|
|
rot = rot_z @ rot_y @ rot_x |
|
return rot.permute(0, 2, 1) |
|
|
|
def smooth(x_d_lst, shape, device, observation_variance=3e-7, process_variance=1e-5): |
|
x_d_lst_reshape = [x.reshape(-1) for x in x_d_lst] |
|
x_d_stacked = np.vstack(x_d_lst_reshape) |
|
kf = KalmanFilter( |
|
initial_state_mean=x_d_stacked[0], |
|
n_dim_obs=x_d_stacked.shape[1], |
|
transition_covariance=process_variance * np.eye(x_d_stacked.shape[1]), |
|
observation_covariance=observation_variance * np.eye(x_d_stacked.shape[1]) |
|
) |
|
smoothed_state_means, _ = kf.smooth(x_d_stacked) |
|
x_d_lst_smooth = [torch.tensor(state_mean.reshape(shape[-2:]), dtype=torch.float32, device=device) for state_mean in smoothed_state_means] |
|
return x_d_lst_smooth |
|
|
|
class ExponentialMovingAverageFilter: |
|
def __init__(self, alpha=0.6): |
|
self.alpha = alpha |
|
self.smoothed_value = None |
|
|
|
def update(self, new_value): |
|
if self.smoothed_value is None: |
|
self.smoothed_value = new_value |
|
else: |
|
self.smoothed_value = self.alpha * new_value + (1 - self.alpha) * self.smoothed_value |
|
return self.smoothed_value |
|
|
|
class MovingAverageFilter: |
|
def __init__(self, window_size): |
|
self.window_size = window_size |
|
self.buffer = np.zeros((window_size, 7)) |
|
self.index = 0 |
|
self.full = False |
|
|
|
def update(self, new_value): |
|
|
|
self.buffer[self.index] = new_value |
|
self.index = (self.index + 1) % self.window_size |
|
|
|
|
|
if not self.full and self.index == 0: |
|
self.full = True |
|
|
|
|
|
return np.mean(self.buffer[:self.window_size if self.full else self.index], axis=0) |
|
|
|
class MedianFilter: |
|
def __init__(self, window_size): |
|
self.window_size = window_size |
|
self.buffer = np.zeros((window_size, 7)) |
|
self.index = 0 |
|
self.full = False |
|
|
|
def update(self, new_value): |
|
|
|
self.buffer[self.index] = new_value |
|
self.index = (self.index + 1) % self.window_size |
|
|
|
|
|
if not self.full and self.index == 0: |
|
self.full = True |
|
|
|
|
|
return np.median(self.buffer[:self.window_size if self.full else self.index], axis=0) |
|
|
|
def smooth_(ori_data, method="median"): |
|
|
|
data_array = [] |
|
for frame_idx in range(ori_data["n_frames"]): |
|
data_array.append( |
|
np.concatenate(( |
|
ori_data['motion'][frame_idx]["scale"].flatten(), |
|
ori_data['motion'][frame_idx]["t"].flatten(), |
|
ori_data['motion'][frame_idx]["pitch"].flatten(), |
|
ori_data['motion'][frame_idx]["yaw"].flatten(), |
|
ori_data['motion'][frame_idx]["roll"].flatten(), |
|
)) |
|
) |
|
data_array = np.array(data_array).astype(np.float32) |
|
|
|
|
|
|
|
if method == "median": |
|
window_size = 3 |
|
ma_filter = MedianFilter(window_size) |
|
elif method == "ema": |
|
ma_filter = ExponentialMovingAverageFilter(alpha=0.01) |
|
else: |
|
window_size = 10 |
|
ma_filter = MovingAverageFilter(window_size) |
|
smoothed_data = [] |
|
for value in data_array: |
|
smoothed_value = ma_filter.update(value) |
|
smoothed_data.append(smoothed_value) |
|
smoothed_data = np.array(smoothed_data).astype(np.float32) |
|
|
|
|
|
|
|
motion_list = [] |
|
for idx in range(smoothed_data.shape[0]): |
|
exp = ori_data["motion"][idx]["exp"] |
|
scale = smoothed_data[idx][0:1].reshape(1, 1) |
|
|
|
t = smoothed_data[idx][1:4].reshape(1, 3).astype(np.float32) |
|
pitch = smoothed_data[idx][4:5].reshape(1, 1).astype(np.float32) |
|
yaw = smoothed_data[idx][5:6].reshape(1, 1).astype(np.float32) |
|
roll = smoothed_data[idx][6:7].reshape(1, 1).astype(np.float32) |
|
R = get_rotation_matrix(torch.FloatTensor(pitch), torch.FloatTensor(yaw), torch.FloatTensor(roll)) |
|
R = R.reshape(1, 3, 3).cpu().numpy().astype(np.float32) |
|
|
|
motion_list.append({"exp": exp, "scale": scale, "t": t, "pitch": pitch, "yaw": yaw, "roll": roll, "R": R}) |
|
|
|
tgt_motion = {'n_frames': smoothed_data.shape[0], 'output_fps': 25, 'motion': motion_list} |
|
return tgt_motion |
|
|