| from __future__ import annotations |
|
|
| import json |
| from pathlib import Path |
| from typing import Any |
|
|
| import numpy as np |
|
|
|
|
| class MotokoFeatureExtractor: |
| """Normalize and stack haptic modalities into a single model tensor.""" |
|
|
| def __init__(self, config: dict[str, Any]) -> None: |
| self.config = config |
| self.max_length = int(config.get("max_length", 2048)) |
| self.padding_value = float(config.get("padding_value", 0.0)) |
| self.eps = float(config.get("normalization", {}).get("eps", 1e-6)) |
| self.modalities = config.get("modalities", {}) |
|
|
| @classmethod |
| def from_config(cls, path: str | Path) -> "MotokoFeatureExtractor": |
| with Path(path).open("r", encoding="utf-8") as handle: |
| return cls(json.load(handle)) |
|
|
| def _normalize(self, values: np.ndarray) -> np.ndarray: |
| mean = values.mean(axis=0, keepdims=True) |
| std = values.std(axis=0, keepdims=True) |
| return (values - mean) / np.maximum(std, self.eps) |
|
|
| def _pad_or_trim(self, values: np.ndarray) -> np.ndarray: |
| if values.shape[0] >= self.max_length: |
| return values[: self.max_length] |
|
|
| pad_rows = self.max_length - values.shape[0] |
| pad = np.full((pad_rows, values.shape[1]), self.padding_value, dtype=values.dtype) |
| return np.concatenate([values, pad], axis=0) |
|
|
| def __call__(self, sample: dict[str, np.ndarray]) -> dict[str, np.ndarray]: |
| features: list[np.ndarray] = [] |
|
|
| for name, spec in self.modalities.items(): |
| if not spec.get("enabled", False): |
| continue |
|
|
| channels = int(spec["channels"]) |
| values = np.asarray(sample.get(name, np.zeros((0, channels), dtype=np.float32))) |
|
|
| if values.ndim != 2 or values.shape[1] != channels: |
| raise ValueError( |
| f"Expected modality '{name}' to have shape [timesteps, {channels}], " |
| f"got {values.shape}." |
| ) |
|
|
| normalized = self._normalize(values.astype(np.float32)) |
| features.append(self._pad_or_trim(normalized)) |
|
|
| if not features: |
| raise ValueError("No enabled modalities were provided.") |
|
|
| stacked = np.concatenate(features, axis=1) |
| attention_mask = (np.abs(stacked).sum(axis=1) > 0).astype(np.int64) |
|
|
| return { |
| "input_values": stacked, |
| "attention_mask": attention_mask, |
| } |
|
|