Spaces:
Running on T4
Running on T4
| """ | |
| Streaming joint recovery from 263-dim motion features. | |
| Extracted from FloodDiffusion utils for standalone use (only numpy + torch). | |
| """ | |
| import numpy as np | |
| import torch | |
| def qinv(q): | |
| assert q.shape[-1] == 4, "q must be a tensor of shape (*, 4)" | |
| mask = torch.ones_like(q) | |
| mask[..., 1:] = -mask[..., 1:] | |
| return q * mask | |
| def qrot(q, v): | |
| assert q.shape[-1] == 4 | |
| assert v.shape[-1] == 3 | |
| assert q.shape[:-1] == v.shape[:-1] | |
| original_shape = list(v.shape) | |
| q = q.contiguous().view(-1, 4) | |
| v = v.contiguous().view(-1, 3) | |
| qvec = q[:, 1:] | |
| uv = torch.cross(qvec, v, dim=1) | |
| uuv = torch.cross(qvec, uv, dim=1) | |
| return (v + 2 * (q[:, :1] * uv + uuv)).view(original_shape) | |
| class StreamJointRecovery263: | |
| """ | |
| Stream version of recover_joint_positions_263 that processes one frame at a time. | |
| Maintains cumulative state for rotation angles and positions. | |
| Key insight: The batch version uses PREVIOUS frame's velocity for the current frame, | |
| so we need to delay the velocity application by one frame. | |
| Args: | |
| joints_num: Number of joints in the skeleton | |
| smoothing_alpha: EMA smoothing factor (0.0 to 1.0) | |
| - 1.0 = no smoothing (default), output follows input exactly | |
| - 0.0 = infinite smoothing, output never changes | |
| - Recommended values: 0.3-0.7 for visible smoothing | |
| - Formula: smoothed = alpha * current + (1 - alpha) * previous | |
| """ | |
| def __init__(self, joints_num: int, smoothing_alpha: float = 1.0): | |
| self.joints_num = joints_num | |
| self.smoothing_alpha = np.clip(smoothing_alpha, 0.0, 1.0) | |
| self.reset() | |
| def reset(self): | |
| """Reset the accumulated state""" | |
| self.r_rot_ang_accum = 0.0 | |
| self.r_pos_accum = np.array([0.0, 0.0, 0.0]) | |
| # Store previous frame's velocities for delayed application | |
| self.prev_rot_vel = 0.0 | |
| self.prev_linear_vel = np.array([0.0, 0.0]) | |
| # Store previous smoothed joints for EMA | |
| self.prev_smoothed_joints = None | |
| def process_frame(self, frame_data: np.ndarray) -> np.ndarray: | |
| """ | |
| Process a single frame and return joint positions for that frame. | |
| Args: | |
| frame_data: numpy array of shape (263,) for a single frame | |
| Returns: | |
| joints: numpy array of shape (joints_num, 3) representing joint positions | |
| """ | |
| # Convert to torch tensor | |
| feature_vec = torch.from_numpy(frame_data).float() | |
| # Extract current frame's velocities (will be used in NEXT frame) | |
| curr_rot_vel = feature_vec[0].item() | |
| curr_linear_vel = feature_vec[1:3].numpy() | |
| # Update accumulated rotation angle with PREVIOUS frame's velocity FIRST | |
| # This matches the batch processing: r_rot_ang[i] uses rot_vel[i-1] | |
| self.r_rot_ang_accum += self.prev_rot_vel | |
| # Calculate current rotation quaternion using updated accumulated angle | |
| r_rot_quat = torch.zeros(4) | |
| r_rot_quat[0] = np.cos(self.r_rot_ang_accum) | |
| r_rot_quat[2] = np.sin(self.r_rot_ang_accum) | |
| # Create velocity vector with Y=0 using PREVIOUS frame's velocity | |
| r_vel = np.array([self.prev_linear_vel[0], 0.0, self.prev_linear_vel[1]]) | |
| # Apply inverse rotation to velocity using CURRENT rotation | |
| r_vel_torch = torch.from_numpy(r_vel).float() | |
| r_vel_rotated = qrot(qinv(r_rot_quat).unsqueeze(0), r_vel_torch.unsqueeze(0)) | |
| r_vel_rotated = r_vel_rotated.squeeze(0).numpy() | |
| # Update accumulated position with rotated velocity | |
| self.r_pos_accum += r_vel_rotated | |
| # Get Y position from data | |
| r_pos = self.r_pos_accum.copy() | |
| r_pos[1] = feature_vec[3].item() | |
| # Extract local joint positions | |
| positions = feature_vec[4 : (self.joints_num - 1) * 3 + 4] | |
| positions = positions.view(-1, 3) | |
| # Apply inverse rotation to local joints | |
| r_rot_quat_expanded = ( | |
| qinv(r_rot_quat).unsqueeze(0).expand(positions.shape[0], 4) | |
| ) | |
| positions = qrot(r_rot_quat_expanded, positions) | |
| # Add root XZ to joints | |
| positions[:, 0] += r_pos[0] | |
| positions[:, 2] += r_pos[2] | |
| # Concatenate root and joints | |
| r_pos_torch = torch.from_numpy(r_pos).float() | |
| positions = torch.cat([r_pos_torch.unsqueeze(0), positions], dim=0) | |
| # Convert to numpy | |
| joints_np = positions.detach().cpu().numpy() | |
| # Apply EMA smoothing if enabled | |
| if self.smoothing_alpha < 1.0: | |
| if self.prev_smoothed_joints is None: | |
| # First frame, no smoothing possible | |
| self.prev_smoothed_joints = joints_np.copy() | |
| else: | |
| # EMA: smoothed = alpha * current + (1 - alpha) * previous | |
| joints_np = ( | |
| self.smoothing_alpha * joints_np | |
| + (1.0 - self.smoothing_alpha) * self.prev_smoothed_joints | |
| ) | |
| self.prev_smoothed_joints = joints_np.copy() | |
| # Store current velocities for next frame | |
| self.prev_rot_vel = curr_rot_vel | |
| self.prev_linear_vel = curr_linear_vel | |
| return joints_np | |