Spaces:
Runtime error
Runtime error
import random | |
from dataclasses import dataclass | |
from typing import BinaryIO, Dict, List, Optional, Union | |
import numpy as np | |
from .ply_util import write_ply | |
COLORS = frozenset(["R", "G", "B", "A"]) | |
def preprocess(data, channel): | |
if channel in COLORS: | |
return np.round(data * 255.0) | |
return data | |
class PointCloud: | |
""" | |
An array of points sampled on a surface. Each point may have zero or more | |
channel attributes. | |
:param coords: an [N x 3] array of point coordinates. | |
:param channels: a dict mapping names to [N] arrays of channel values. | |
""" | |
coords: np.ndarray | |
channels: Dict[str, np.ndarray] | |
def load(cls, f: Union[str, BinaryIO]) -> "PointCloud": | |
""" | |
Load the point cloud from a .npz file. | |
""" | |
if isinstance(f, str): | |
with open(f, "rb") as reader: | |
return cls.load(reader) | |
else: | |
obj = np.load(f) | |
keys = list(obj.keys()) | |
return PointCloud( | |
coords=obj["coords"], | |
channels={k: obj[k] for k in keys if k != "coords"}, | |
) | |
def save(self, f: Union[str, BinaryIO]): | |
""" | |
Save the point cloud to a .npz file. | |
""" | |
if isinstance(f, str): | |
with open(f, "wb") as writer: | |
self.save(writer) | |
else: | |
np.savez(f, coords=self.coords, **self.channels) | |
def write_ply(self, raw_f: BinaryIO): | |
write_ply( | |
raw_f, | |
coords=self.coords, | |
rgb=( | |
np.stack([self.channels[x] for x in "RGB"], axis=1) | |
if all(x in self.channels for x in "RGB") | |
else None | |
), | |
) | |
def random_sample(self, num_points: int, **subsample_kwargs) -> "PointCloud": | |
""" | |
Sample a random subset of this PointCloud. | |
:param num_points: maximum number of points to sample. | |
:param subsample_kwargs: arguments to self.subsample(). | |
:return: a reduced PointCloud, or self if num_points is not less than | |
the current number of points. | |
""" | |
if len(self.coords) <= num_points: | |
return self | |
indices = np.random.choice(len(self.coords), size=(num_points,), replace=False) | |
return self.subsample(indices, **subsample_kwargs) | |
def farthest_point_sample( | |
self, num_points: int, init_idx: Optional[int] = None, **subsample_kwargs | |
) -> "PointCloud": | |
""" | |
Sample a subset of the point cloud that is evenly distributed in space. | |
First, a random point is selected. Then each successive point is chosen | |
such that it is furthest from the currently selected points. | |
The time complexity of this operation is O(NM), where N is the original | |
number of points and M is the reduced number. Therefore, performance | |
can be improved by randomly subsampling points with random_sample() | |
before running farthest_point_sample(). | |
:param num_points: maximum number of points to sample. | |
:param init_idx: if specified, the first point to sample. | |
:param subsample_kwargs: arguments to self.subsample(). | |
:return: a reduced PointCloud, or self if num_points is not less than | |
the current number of points. | |
""" | |
if len(self.coords) <= num_points: | |
return self | |
init_idx = random.randrange(len(self.coords)) if init_idx is None else init_idx | |
indices = np.zeros([num_points], dtype=np.int64) | |
indices[0] = init_idx | |
sq_norms = np.sum(self.coords**2, axis=-1) | |
def compute_dists(idx: int): | |
# Utilize equality: ||A-B||^2 = ||A||^2 + ||B||^2 - 2*(A @ B). | |
return sq_norms + sq_norms[idx] - 2 * (self.coords @ self.coords[idx]) | |
cur_dists = compute_dists(init_idx) | |
for i in range(1, num_points): | |
idx = np.argmax(cur_dists) | |
indices[i] = idx | |
cur_dists = np.minimum(cur_dists, compute_dists(idx)) | |
return self.subsample(indices, **subsample_kwargs) | |
def subsample(self, indices: np.ndarray, average_neighbors: bool = False) -> "PointCloud": | |
if not average_neighbors: | |
return PointCloud( | |
coords=self.coords[indices], | |
channels={k: v[indices] for k, v in self.channels.items()}, | |
) | |
new_coords = self.coords[indices] | |
neighbor_indices = PointCloud(coords=new_coords, channels={}).nearest_points(self.coords) | |
# Make sure every point points to itself, which might not | |
# be the case if points are duplicated or there is rounding | |
# error. | |
neighbor_indices[indices] = np.arange(len(indices)) | |
new_channels = {} | |
for k, v in self.channels.items(): | |
v_sum = np.zeros_like(v[: len(indices)]) | |
v_count = np.zeros_like(v[: len(indices)]) | |
np.add.at(v_sum, neighbor_indices, v) | |
np.add.at(v_count, neighbor_indices, 1) | |
new_channels[k] = v_sum / v_count | |
return PointCloud(coords=new_coords, channels=new_channels) | |
def select_channels(self, channel_names: List[str]) -> np.ndarray: | |
data = np.stack([preprocess(self.channels[name], name) for name in channel_names], axis=-1) | |
return data | |
def nearest_points(self, points: np.ndarray, batch_size: int = 16384) -> np.ndarray: | |
""" | |
For each point in another set of points, compute the point in this | |
pointcloud which is closest. | |
:param points: an [N x 3] array of points. | |
:param batch_size: the number of neighbor distances to compute at once. | |
Smaller values save memory, while larger values may | |
make the computation faster. | |
:return: an [N] array of indices into self.coords. | |
""" | |
norms = np.sum(self.coords**2, axis=-1) | |
all_indices = [] | |
for i in range(0, len(points), batch_size): | |
batch = points[i : i + batch_size] | |
dists = norms + np.sum(batch**2, axis=-1)[:, None] - 2 * (batch @ self.coords.T) | |
all_indices.append(np.argmin(dists, axis=-1)) | |
return np.concatenate(all_indices, axis=0) | |
def combine(self, other: "PointCloud") -> "PointCloud": | |
assert self.channels.keys() == other.channels.keys() | |
return PointCloud( | |
coords=np.concatenate([self.coords, other.coords], axis=0), | |
channels={ | |
k: np.concatenate([v, other.channels[k]], axis=0) for k, v in self.channels.items() | |
}, | |
) |