|
import random |
|
from dataclasses import dataclass |
|
from typing import BinaryIO, Dict, List, Optional, Union |
|
|
|
import numpy as np |
|
|
|
from .ply_util import write_ply |
|
|
|
COLORS = frozenset(["R", "G", "B", "A"]) |
|
|
|
|
|
def preprocess(data, channel): |
|
if channel in COLORS: |
|
return np.round(data * 255.0) |
|
return data |
|
|
|
|
|
@dataclass |
|
class PointCloud: |
|
""" |
|
An array of points sampled on a surface. Each point may have zero or more |
|
channel attributes. |
|
|
|
:param coords: an [N x 3] array of point coordinates. |
|
:param channels: a dict mapping names to [N] arrays of channel values. |
|
""" |
|
|
|
coords: np.ndarray |
|
channels: Dict[str, np.ndarray] |
|
|
|
@classmethod |
|
def load(cls, f: Union[str, BinaryIO]) -> "PointCloud": |
|
""" |
|
Load the point cloud from a .npz file. |
|
""" |
|
if isinstance(f, str): |
|
with open(f, "rb") as reader: |
|
return cls.load(reader) |
|
else: |
|
obj = np.load(f) |
|
keys = list(obj.keys()) |
|
return PointCloud( |
|
coords=obj["coords"], |
|
channels={k: obj[k] for k in keys if k != "coords"}, |
|
) |
|
|
|
def save(self, f: Union[str, BinaryIO]): |
|
""" |
|
Save the point cloud to a .npz file. |
|
""" |
|
if isinstance(f, str): |
|
with open(f, "wb") as writer: |
|
self.save(writer) |
|
else: |
|
np.savez(f, coords=self.coords, **self.channels) |
|
|
|
def write_ply(self, raw_f: BinaryIO): |
|
write_ply( |
|
raw_f, |
|
coords=self.coords, |
|
rgb=( |
|
np.stack([self.channels[x] for x in "RGB"], axis=1) |
|
if all(x in self.channels for x in "RGB") |
|
else None |
|
), |
|
) |
|
|
|
def random_sample(self, num_points: int, **subsample_kwargs) -> "PointCloud": |
|
""" |
|
Sample a random subset of this PointCloud. |
|
|
|
:param num_points: maximum number of points to sample. |
|
:param subsample_kwargs: arguments to self.subsample(). |
|
:return: a reduced PointCloud, or self if num_points is not less than |
|
the current number of points. |
|
""" |
|
if len(self.coords) <= num_points: |
|
return self |
|
indices = np.random.choice(len(self.coords), size=(num_points,), replace=False) |
|
return self.subsample(indices, **subsample_kwargs) |
|
|
|
def farthest_point_sample( |
|
self, num_points: int, init_idx: Optional[int] = None, **subsample_kwargs |
|
) -> "PointCloud": |
|
""" |
|
Sample a subset of the point cloud that is evenly distributed in space. |
|
|
|
First, a random point is selected. Then each successive point is chosen |
|
such that it is furthest from the currently selected points. |
|
|
|
The time complexity of this operation is O(NM), where N is the original |
|
number of points and M is the reduced number. Therefore, performance |
|
can be improved by randomly subsampling points with random_sample() |
|
before running farthest_point_sample(). |
|
|
|
:param num_points: maximum number of points to sample. |
|
:param init_idx: if specified, the first point to sample. |
|
:param subsample_kwargs: arguments to self.subsample(). |
|
:return: a reduced PointCloud, or self if num_points is not less than |
|
the current number of points. |
|
""" |
|
if len(self.coords) <= num_points: |
|
return self |
|
init_idx = random.randrange(len(self.coords)) if init_idx is None else init_idx |
|
indices = np.zeros([num_points], dtype=np.int64) |
|
indices[0] = init_idx |
|
sq_norms = np.sum(self.coords**2, axis=-1) |
|
|
|
def compute_dists(idx: int): |
|
|
|
return sq_norms + sq_norms[idx] - 2 * (self.coords @ self.coords[idx]) |
|
|
|
cur_dists = compute_dists(init_idx) |
|
for i in range(1, num_points): |
|
idx = np.argmax(cur_dists) |
|
indices[i] = idx |
|
cur_dists = np.minimum(cur_dists, compute_dists(idx)) |
|
return self.subsample(indices, **subsample_kwargs) |
|
|
|
def subsample(self, indices: np.ndarray, average_neighbors: bool = False) -> "PointCloud": |
|
if not average_neighbors: |
|
return PointCloud( |
|
coords=self.coords[indices], |
|
channels={k: v[indices] for k, v in self.channels.items()}, |
|
) |
|
|
|
new_coords = self.coords[indices] |
|
neighbor_indices = PointCloud(coords=new_coords, channels={}).nearest_points(self.coords) |
|
|
|
|
|
|
|
|
|
neighbor_indices[indices] = np.arange(len(indices)) |
|
|
|
new_channels = {} |
|
for k, v in self.channels.items(): |
|
v_sum = np.zeros_like(v[: len(indices)]) |
|
v_count = np.zeros_like(v[: len(indices)]) |
|
np.add.at(v_sum, neighbor_indices, v) |
|
np.add.at(v_count, neighbor_indices, 1) |
|
new_channels[k] = v_sum / v_count |
|
return PointCloud(coords=new_coords, channels=new_channels) |
|
|
|
def select_channels(self, channel_names: List[str]) -> np.ndarray: |
|
data = np.stack([preprocess(self.channels[name], name) for name in channel_names], axis=-1) |
|
return data |
|
|
|
def nearest_points(self, points: np.ndarray, batch_size: int = 16384) -> np.ndarray: |
|
""" |
|
For each point in another set of points, compute the point in this |
|
pointcloud which is closest. |
|
|
|
:param points: an [N x 3] array of points. |
|
:param batch_size: the number of neighbor distances to compute at once. |
|
Smaller values save memory, while larger values may |
|
make the computation faster. |
|
:return: an [N] array of indices into self.coords. |
|
""" |
|
norms = np.sum(self.coords**2, axis=-1) |
|
all_indices = [] |
|
for i in range(0, len(points), batch_size): |
|
batch = points[i : i + batch_size] |
|
dists = norms + np.sum(batch**2, axis=-1)[:, None] - 2 * (batch @ self.coords.T) |
|
all_indices.append(np.argmin(dists, axis=-1)) |
|
return np.concatenate(all_indices, axis=0) |
|
|
|
def combine(self, other: "PointCloud") -> "PointCloud": |
|
assert self.channels.keys() == other.channels.keys() |
|
return PointCloud( |
|
coords=np.concatenate([self.coords, other.coords], axis=0), |
|
channels={ |
|
k: np.concatenate([v, other.channels[k]], axis=0) for k, v in self.channels.items() |
|
}, |
|
) |
|
|