Spaces:
Running
on
Zero
Running
on
Zero
""" | |
3D Point Cloud Augmentation | |
Inspirited by chrischoy/SpatioTemporalSegmentation | |
Author: Xiaoyang Wu (xiaoyang.wu.cs@gmail.com) | |
Please cite our work if the code is helpful to you. | |
""" | |
import random | |
import numbers | |
import scipy | |
import scipy.ndimage | |
import scipy.interpolate | |
import scipy.stats | |
import numpy as np | |
import torch | |
import copy | |
from collections.abc import Sequence, Mapping | |
from pointcept.utils.registry import Registry | |
TRANSFORMS = Registry("transforms") | |
class Collect(object): | |
def __init__(self, keys, offset_keys_dict=None, **kwargs): | |
""" | |
e.g. Collect(keys=[coord], feat_keys=[coord, color]) | |
""" | |
if offset_keys_dict is None: | |
offset_keys_dict = dict(offset="coord") | |
self.keys = keys | |
self.offset_keys = offset_keys_dict | |
self.kwargs = kwargs | |
def __call__(self, data_dict): | |
data = dict() | |
if isinstance(self.keys, str): | |
self.keys = [self.keys] | |
for key in self.keys: | |
data[key] = data_dict[key] | |
for key, value in self.offset_keys.items(): | |
data[key] = torch.tensor([data_dict[value].shape[0]]) | |
for name, keys in self.kwargs.items(): | |
name = name.replace("_keys", "") | |
assert isinstance(keys, Sequence) | |
data[name] = torch.cat([data_dict[key].float() for key in keys], dim=1) | |
return data | |
class Copy(object): | |
def __init__(self, keys_dict=None): | |
if keys_dict is None: | |
keys_dict = dict(coord="origin_coord", segment="origin_segment") | |
self.keys_dict = keys_dict | |
def __call__(self, data_dict): | |
for key, value in self.keys_dict.items(): | |
if isinstance(data_dict[key], np.ndarray): | |
data_dict[value] = data_dict[key].copy() | |
elif isinstance(data_dict[key], torch.Tensor): | |
data_dict[value] = data_dict[key].clone().detach() | |
else: | |
data_dict[value] = copy.deepcopy(data_dict[key]) | |
return data_dict | |
class ToTensor(object): | |
def __call__(self, data): | |
if isinstance(data, torch.Tensor): | |
return data | |
elif isinstance(data, str): | |
# note that str is also a kind of sequence, judgement should before sequence | |
return data | |
elif isinstance(data, int): | |
return torch.LongTensor([data]) | |
elif isinstance(data, float): | |
return torch.FloatTensor([data]) | |
elif isinstance(data, np.ndarray) and np.issubdtype(data.dtype, bool): | |
return torch.from_numpy(data) | |
elif isinstance(data, np.ndarray) and np.issubdtype(data.dtype, np.integer): | |
return torch.from_numpy(data).long() | |
elif isinstance(data, np.ndarray) and np.issubdtype(data.dtype, np.floating): | |
return torch.from_numpy(data).float() | |
elif isinstance(data, Mapping): | |
result = {sub_key: self(item) for sub_key, item in data.items()} | |
return result | |
elif isinstance(data, Sequence): | |
result = [self(item) for item in data] | |
return result | |
else: | |
raise TypeError(f"type {type(data)} cannot be converted to tensor.") | |
class Add(object): | |
def __init__(self, keys_dict=None): | |
if keys_dict is None: | |
keys_dict = dict() | |
self.keys_dict = keys_dict | |
def __call__(self, data_dict): | |
for key, value in self.keys_dict.items(): | |
data_dict[key] = value | |
return data_dict | |
class NormalizeColor(object): | |
def __call__(self, data_dict): | |
if "color" in data_dict.keys(): | |
data_dict["color"] = data_dict["color"] / 127.5 - 1 | |
return data_dict | |
class NormalizeCoord(object): | |
def __call__(self, data_dict): | |
if "coord" in data_dict.keys(): | |
# modified from pointnet2 | |
centroid = np.mean(data_dict["coord"], axis=0) | |
data_dict["coord"] -= centroid | |
m = np.max(np.sqrt(np.sum(data_dict["coord"] ** 2, axis=1))) | |
data_dict["coord"] = data_dict["coord"] / m | |
return data_dict | |
class PositiveShift(object): | |
def __call__(self, data_dict): | |
if "coord" in data_dict.keys(): | |
coord_min = np.min(data_dict["coord"], 0) | |
data_dict["coord"] -= coord_min | |
return data_dict | |
class CenterShift(object): | |
def __init__(self, apply_z=True): | |
self.apply_z = apply_z | |
def __call__(self, data_dict): | |
if "coord" in data_dict.keys(): | |
x_min, y_min, z_min = data_dict["coord"].min(axis=0) | |
x_max, y_max, _ = data_dict["coord"].max(axis=0) | |
if self.apply_z: | |
shift = [(x_min + x_max) / 2, (y_min + y_max) / 2, z_min] | |
else: | |
shift = [(x_min + x_max) / 2, (y_min + y_max) / 2, 0] | |
data_dict["coord"] -= shift | |
return data_dict | |
class RandomShift(object): | |
def __init__(self, shift=((-0.2, 0.2), (-0.2, 0.2), (0, 0))): | |
self.shift = shift | |
def __call__(self, data_dict): | |
if "coord" in data_dict.keys(): | |
shift_x = np.random.uniform(self.shift[0][0], self.shift[0][1]) | |
shift_y = np.random.uniform(self.shift[1][0], self.shift[1][1]) | |
shift_z = np.random.uniform(self.shift[2][0], self.shift[2][1]) | |
data_dict["coord"] += [shift_x, shift_y, shift_z] | |
return data_dict | |
class PointClip(object): | |
def __init__(self, point_cloud_range=(-80, -80, -3, 80, 80, 1)): | |
self.point_cloud_range = point_cloud_range | |
def __call__(self, data_dict): | |
if "coord" in data_dict.keys(): | |
data_dict["coord"] = np.clip( | |
data_dict["coord"], | |
a_min=self.point_cloud_range[:3], | |
a_max=self.point_cloud_range[3:], | |
) | |
return data_dict | |
class RandomDropout(object): | |
def __init__(self, dropout_ratio=0.2, dropout_application_ratio=0.5): | |
""" | |
upright_axis: axis index among x,y,z, i.e. 2 for z | |
""" | |
self.dropout_ratio = dropout_ratio | |
self.dropout_application_ratio = dropout_application_ratio | |
def __call__(self, data_dict): | |
if random.random() < self.dropout_application_ratio: | |
n = len(data_dict["coord"]) | |
idx = np.random.choice(n, int(n * (1 - self.dropout_ratio)), replace=False) | |
if "sampled_index" in data_dict: | |
# for ScanNet data efficient, we need to make sure labeled point is sampled. | |
idx = np.unique(np.append(idx, data_dict["sampled_index"])) | |
mask = np.zeros_like(data_dict["segment"]).astype(bool) | |
mask[data_dict["sampled_index"]] = True | |
data_dict["sampled_index"] = np.where(mask[idx])[0] | |
if "coord" in data_dict.keys(): | |
data_dict["coord"] = data_dict["coord"][idx] | |
if "color" in data_dict.keys(): | |
data_dict["color"] = data_dict["color"][idx] | |
if "normal" in data_dict.keys(): | |
data_dict["normal"] = data_dict["normal"][idx] | |
if "strength" in data_dict.keys(): | |
data_dict["strength"] = data_dict["strength"][idx] | |
if "segment" in data_dict.keys(): | |
data_dict["segment"] = data_dict["segment"][idx] | |
if "instance" in data_dict.keys(): | |
data_dict["instance"] = data_dict["instance"][idx] | |
return data_dict | |
class RandomRotate(object): | |
def __init__(self, angle=None, center=None, axis="z", always_apply=False, p=0.5): | |
self.angle = [-1, 1] if angle is None else angle | |
self.axis = axis | |
self.always_apply = always_apply | |
self.p = p if not self.always_apply else 1 | |
self.center = center | |
def __call__(self, data_dict): | |
if random.random() > self.p: | |
return data_dict | |
angle = np.random.uniform(self.angle[0], self.angle[1]) * np.pi | |
rot_cos, rot_sin = np.cos(angle), np.sin(angle) | |
if self.axis == "x": | |
rot_t = np.array([[1, 0, 0], [0, rot_cos, -rot_sin], [0, rot_sin, rot_cos]]) | |
elif self.axis == "y": | |
rot_t = np.array([[rot_cos, 0, rot_sin], [0, 1, 0], [-rot_sin, 0, rot_cos]]) | |
elif self.axis == "z": | |
rot_t = np.array([[rot_cos, -rot_sin, 0], [rot_sin, rot_cos, 0], [0, 0, 1]]) | |
else: | |
raise NotImplementedError | |
if "coord" in data_dict.keys(): | |
if self.center is None: | |
x_min, y_min, z_min = data_dict["coord"].min(axis=0) | |
x_max, y_max, z_max = data_dict["coord"].max(axis=0) | |
center = [(x_min + x_max) / 2, (y_min + y_max) / 2, (z_min + z_max) / 2] | |
else: | |
center = self.center | |
data_dict["coord"] -= center | |
data_dict["coord"] = np.dot(data_dict["coord"], np.transpose(rot_t)) | |
data_dict["coord"] += center | |
if "normal" in data_dict.keys(): | |
data_dict["normal"] = np.dot(data_dict["normal"], np.transpose(rot_t)) | |
return data_dict | |
class RandomRotateTargetAngle(object): | |
def __init__( | |
self, angle=(1 / 2, 1, 3 / 2), center=None, axis="z", always_apply=False, p=0.75 | |
): | |
self.angle = angle | |
self.axis = axis | |
self.always_apply = always_apply | |
self.p = p if not self.always_apply else 1 | |
self.center = center | |
def __call__(self, data_dict): | |
if random.random() > self.p: | |
return data_dict | |
angle = np.random.choice(self.angle) * np.pi | |
rot_cos, rot_sin = np.cos(angle), np.sin(angle) | |
if self.axis == "x": | |
rot_t = np.array([[1, 0, 0], [0, rot_cos, -rot_sin], [0, rot_sin, rot_cos]]) | |
elif self.axis == "y": | |
rot_t = np.array([[rot_cos, 0, rot_sin], [0, 1, 0], [-rot_sin, 0, rot_cos]]) | |
elif self.axis == "z": | |
rot_t = np.array([[rot_cos, -rot_sin, 0], [rot_sin, rot_cos, 0], [0, 0, 1]]) | |
else: | |
raise NotImplementedError | |
if "coord" in data_dict.keys(): | |
if self.center is None: | |
x_min, y_min, z_min = data_dict["coord"].min(axis=0) | |
x_max, y_max, z_max = data_dict["coord"].max(axis=0) | |
center = [(x_min + x_max) / 2, (y_min + y_max) / 2, (z_min + z_max) / 2] | |
else: | |
center = self.center | |
data_dict["coord"] -= center | |
data_dict["coord"] = np.dot(data_dict["coord"], np.transpose(rot_t)) | |
data_dict["coord"] += center | |
if "normal" in data_dict.keys(): | |
data_dict["normal"] = np.dot(data_dict["normal"], np.transpose(rot_t)) | |
return data_dict | |
class RandomScale(object): | |
def __init__(self, scale=None, anisotropic=False): | |
self.scale = scale if scale is not None else [0.95, 1.05] | |
self.anisotropic = anisotropic | |
def __call__(self, data_dict): | |
if "coord" in data_dict.keys(): | |
scale = np.random.uniform( | |
self.scale[0], self.scale[1], 3 if self.anisotropic else 1 | |
) | |
data_dict["coord"] *= scale | |
return data_dict | |
class RandomFlip(object): | |
def __init__(self, p=0.5): | |
self.p = p | |
def __call__(self, data_dict): | |
if np.random.rand() < self.p: | |
if "coord" in data_dict.keys(): | |
data_dict["coord"][:, 0] = -data_dict["coord"][:, 0] | |
if "normal" in data_dict.keys(): | |
data_dict["normal"][:, 0] = -data_dict["normal"][:, 0] | |
if np.random.rand() < self.p: | |
if "coord" in data_dict.keys(): | |
data_dict["coord"][:, 1] = -data_dict["coord"][:, 1] | |
if "normal" in data_dict.keys(): | |
data_dict["normal"][:, 1] = -data_dict["normal"][:, 1] | |
return data_dict | |
class RandomJitter(object): | |
def __init__(self, sigma=0.01, clip=0.05): | |
assert clip > 0 | |
self.sigma = sigma | |
self.clip = clip | |
def __call__(self, data_dict): | |
if "coord" in data_dict.keys(): | |
jitter = np.clip( | |
self.sigma * np.random.randn(data_dict["coord"].shape[0], 3), | |
-self.clip, | |
self.clip, | |
) | |
data_dict["coord"] += jitter | |
return data_dict | |
class ClipGaussianJitter(object): | |
def __init__(self, scalar=0.02, store_jitter=False): | |
self.scalar = scalar | |
self.mean = np.mean(3) | |
self.cov = np.identity(3) | |
self.quantile = 1.96 | |
self.store_jitter = store_jitter | |
def __call__(self, data_dict): | |
if "coord" in data_dict.keys(): | |
jitter = np.random.multivariate_normal( | |
self.mean, self.cov, data_dict["coord"].shape[0] | |
) | |
jitter = self.scalar * np.clip(jitter / 1.96, -1, 1) | |
data_dict["coord"] += jitter | |
if self.store_jitter: | |
data_dict["jitter"] = jitter | |
return data_dict | |
class ChromaticAutoContrast(object): | |
def __init__(self, p=0.2, blend_factor=None): | |
self.p = p | |
self.blend_factor = blend_factor | |
def __call__(self, data_dict): | |
if "color" in data_dict.keys() and np.random.rand() < self.p: | |
lo = np.min(data_dict["color"], 0, keepdims=True) | |
hi = np.max(data_dict["color"], 0, keepdims=True) | |
scale = 255 / (hi - lo) | |
contrast_feat = (data_dict["color"][:, :3] - lo) * scale | |
blend_factor = ( | |
np.random.rand() if self.blend_factor is None else self.blend_factor | |
) | |
data_dict["color"][:, :3] = (1 - blend_factor) * data_dict["color"][ | |
:, :3 | |
] + blend_factor * contrast_feat | |
return data_dict | |
class ChromaticTranslation(object): | |
def __init__(self, p=0.95, ratio=0.05): | |
self.p = p | |
self.ratio = ratio | |
def __call__(self, data_dict): | |
if "color" in data_dict.keys() and np.random.rand() < self.p: | |
tr = (np.random.rand(1, 3) - 0.5) * 255 * 2 * self.ratio | |
data_dict["color"][:, :3] = np.clip(tr + data_dict["color"][:, :3], 0, 255) | |
return data_dict | |
class ChromaticJitter(object): | |
def __init__(self, p=0.95, std=0.005): | |
self.p = p | |
self.std = std | |
def __call__(self, data_dict): | |
if "color" in data_dict.keys() and np.random.rand() < self.p: | |
noise = np.random.randn(data_dict["color"].shape[0], 3) | |
noise *= self.std * 255 | |
data_dict["color"][:, :3] = np.clip( | |
noise + data_dict["color"][:, :3], 0, 255 | |
) | |
return data_dict | |
class RandomColorGrayScale(object): | |
def __init__(self, p): | |
self.p = p | |
def rgb_to_grayscale(color, num_output_channels=1): | |
if color.shape[-1] < 3: | |
raise TypeError( | |
"Input color should have at least 3 dimensions, but found {}".format( | |
color.shape[-1] | |
) | |
) | |
if num_output_channels not in (1, 3): | |
raise ValueError("num_output_channels should be either 1 or 3") | |
r, g, b = color[..., 0], color[..., 1], color[..., 2] | |
gray = (0.2989 * r + 0.587 * g + 0.114 * b).astype(color.dtype) | |
gray = np.expand_dims(gray, axis=-1) | |
if num_output_channels == 3: | |
gray = np.broadcast_to(gray, color.shape) | |
return gray | |
def __call__(self, data_dict): | |
if np.random.rand() < self.p: | |
data_dict["color"] = self.rgb_to_grayscale(data_dict["color"], 3) | |
return data_dict | |
class RandomColorJitter(object): | |
""" | |
Random Color Jitter for 3D point cloud (refer torchvision) | |
""" | |
def __init__(self, brightness=0, contrast=0, saturation=0, hue=0, p=0.95): | |
self.brightness = self._check_input(brightness, "brightness") | |
self.contrast = self._check_input(contrast, "contrast") | |
self.saturation = self._check_input(saturation, "saturation") | |
self.hue = self._check_input( | |
hue, "hue", center=0, bound=(-0.5, 0.5), clip_first_on_zero=False | |
) | |
self.p = p | |
def _check_input( | |
value, name, center=1, bound=(0, float("inf")), clip_first_on_zero=True | |
): | |
if isinstance(value, numbers.Number): | |
if value < 0: | |
raise ValueError( | |
"If {} is a single number, it must be non negative.".format(name) | |
) | |
value = [center - float(value), center + float(value)] | |
if clip_first_on_zero: | |
value[0] = max(value[0], 0.0) | |
elif isinstance(value, (tuple, list)) and len(value) == 2: | |
if not bound[0] <= value[0] <= value[1] <= bound[1]: | |
raise ValueError("{} values should be between {}".format(name, bound)) | |
else: | |
raise TypeError( | |
"{} should be a single number or a list/tuple with length 2.".format( | |
name | |
) | |
) | |
# if value is 0 or (1., 1.) for brightness/contrast/saturation | |
# or (0., 0.) for hue, do nothing | |
if value[0] == value[1] == center: | |
value = None | |
return value | |
def blend(color1, color2, ratio): | |
ratio = float(ratio) | |
bound = 255.0 | |
return ( | |
(ratio * color1 + (1.0 - ratio) * color2) | |
.clip(0, bound) | |
.astype(color1.dtype) | |
) | |
def rgb2hsv(rgb): | |
r, g, b = rgb[..., 0], rgb[..., 1], rgb[..., 2] | |
maxc = np.max(rgb, axis=-1) | |
minc = np.min(rgb, axis=-1) | |
eqc = maxc == minc | |
cr = maxc - minc | |
s = cr / (np.ones_like(maxc) * eqc + maxc * (1 - eqc)) | |
cr_divisor = np.ones_like(maxc) * eqc + cr * (1 - eqc) | |
rc = (maxc - r) / cr_divisor | |
gc = (maxc - g) / cr_divisor | |
bc = (maxc - b) / cr_divisor | |
hr = (maxc == r) * (bc - gc) | |
hg = ((maxc == g) & (maxc != r)) * (2.0 + rc - bc) | |
hb = ((maxc != g) & (maxc != r)) * (4.0 + gc - rc) | |
h = hr + hg + hb | |
h = (h / 6.0 + 1.0) % 1.0 | |
return np.stack((h, s, maxc), axis=-1) | |
def hsv2rgb(hsv): | |
h, s, v = hsv[..., 0], hsv[..., 1], hsv[..., 2] | |
i = np.floor(h * 6.0) | |
f = (h * 6.0) - i | |
i = i.astype(np.int32) | |
p = np.clip((v * (1.0 - s)), 0.0, 1.0) | |
q = np.clip((v * (1.0 - s * f)), 0.0, 1.0) | |
t = np.clip((v * (1.0 - s * (1.0 - f))), 0.0, 1.0) | |
i = i % 6 | |
mask = np.expand_dims(i, axis=-1) == np.arange(6) | |
a1 = np.stack((v, q, p, p, t, v), axis=-1) | |
a2 = np.stack((t, v, v, q, p, p), axis=-1) | |
a3 = np.stack((p, p, t, v, v, q), axis=-1) | |
a4 = np.stack((a1, a2, a3), axis=-1) | |
return np.einsum("...na, ...nab -> ...nb", mask.astype(hsv.dtype), a4) | |
def adjust_brightness(self, color, brightness_factor): | |
if brightness_factor < 0: | |
raise ValueError( | |
"brightness_factor ({}) is not non-negative.".format(brightness_factor) | |
) | |
return self.blend(color, np.zeros_like(color), brightness_factor) | |
def adjust_contrast(self, color, contrast_factor): | |
if contrast_factor < 0: | |
raise ValueError( | |
"contrast_factor ({}) is not non-negative.".format(contrast_factor) | |
) | |
mean = np.mean(RandomColorGrayScale.rgb_to_grayscale(color)) | |
return self.blend(color, mean, contrast_factor) | |
def adjust_saturation(self, color, saturation_factor): | |
if saturation_factor < 0: | |
raise ValueError( | |
"saturation_factor ({}) is not non-negative.".format(saturation_factor) | |
) | |
gray = RandomColorGrayScale.rgb_to_grayscale(color) | |
return self.blend(color, gray, saturation_factor) | |
def adjust_hue(self, color, hue_factor): | |
if not (-0.5 <= hue_factor <= 0.5): | |
raise ValueError( | |
"hue_factor ({}) is not in [-0.5, 0.5].".format(hue_factor) | |
) | |
orig_dtype = color.dtype | |
hsv = self.rgb2hsv(color / 255.0) | |
h, s, v = hsv[..., 0], hsv[..., 1], hsv[..., 2] | |
h = (h + hue_factor) % 1.0 | |
hsv = np.stack((h, s, v), axis=-1) | |
color_hue_adj = (self.hsv2rgb(hsv) * 255.0).astype(orig_dtype) | |
return color_hue_adj | |
def get_params(brightness, contrast, saturation, hue): | |
fn_idx = torch.randperm(4) | |
b = ( | |
None | |
if brightness is None | |
else np.random.uniform(brightness[0], brightness[1]) | |
) | |
c = None if contrast is None else np.random.uniform(contrast[0], contrast[1]) | |
s = ( | |
None | |
if saturation is None | |
else np.random.uniform(saturation[0], saturation[1]) | |
) | |
h = None if hue is None else np.random.uniform(hue[0], hue[1]) | |
return fn_idx, b, c, s, h | |
def __call__(self, data_dict): | |
( | |
fn_idx, | |
brightness_factor, | |
contrast_factor, | |
saturation_factor, | |
hue_factor, | |
) = self.get_params(self.brightness, self.contrast, self.saturation, self.hue) | |
for fn_id in fn_idx: | |
if ( | |
fn_id == 0 | |
and brightness_factor is not None | |
and np.random.rand() < self.p | |
): | |
data_dict["color"] = self.adjust_brightness( | |
data_dict["color"], brightness_factor | |
) | |
elif ( | |
fn_id == 1 and contrast_factor is not None and np.random.rand() < self.p | |
): | |
data_dict["color"] = self.adjust_contrast( | |
data_dict["color"], contrast_factor | |
) | |
elif ( | |
fn_id == 2 | |
and saturation_factor is not None | |
and np.random.rand() < self.p | |
): | |
data_dict["color"] = self.adjust_saturation( | |
data_dict["color"], saturation_factor | |
) | |
elif fn_id == 3 and hue_factor is not None and np.random.rand() < self.p: | |
data_dict["color"] = self.adjust_hue(data_dict["color"], hue_factor) | |
return data_dict | |
class HueSaturationTranslation(object): | |
def rgb_to_hsv(rgb): | |
# Translated from source of colorsys.rgb_to_hsv | |
# r,g,b should be a numpy arrays with values between 0 and 255 | |
# rgb_to_hsv returns an array of floats between 0.0 and 1.0. | |
rgb = rgb.astype("float") | |
hsv = np.zeros_like(rgb) | |
# in case an RGBA array was passed, just copy the A channel | |
hsv[..., 3:] = rgb[..., 3:] | |
r, g, b = rgb[..., 0], rgb[..., 1], rgb[..., 2] | |
maxc = np.max(rgb[..., :3], axis=-1) | |
minc = np.min(rgb[..., :3], axis=-1) | |
hsv[..., 2] = maxc | |
mask = maxc != minc | |
hsv[mask, 1] = (maxc - minc)[mask] / maxc[mask] | |
rc = np.zeros_like(r) | |
gc = np.zeros_like(g) | |
bc = np.zeros_like(b) | |
rc[mask] = (maxc - r)[mask] / (maxc - minc)[mask] | |
gc[mask] = (maxc - g)[mask] / (maxc - minc)[mask] | |
bc[mask] = (maxc - b)[mask] / (maxc - minc)[mask] | |
hsv[..., 0] = np.select( | |
[r == maxc, g == maxc], [bc - gc, 2.0 + rc - bc], default=4.0 + gc - rc | |
) | |
hsv[..., 0] = (hsv[..., 0] / 6.0) % 1.0 | |
return hsv | |
def hsv_to_rgb(hsv): | |
# Translated from source of colorsys.hsv_to_rgb | |
# h,s should be a numpy arrays with values between 0.0 and 1.0 | |
# v should be a numpy array with values between 0.0 and 255.0 | |
# hsv_to_rgb returns an array of uints between 0 and 255. | |
rgb = np.empty_like(hsv) | |
rgb[..., 3:] = hsv[..., 3:] | |
h, s, v = hsv[..., 0], hsv[..., 1], hsv[..., 2] | |
i = (h * 6.0).astype("uint8") | |
f = (h * 6.0) - i | |
p = v * (1.0 - s) | |
q = v * (1.0 - s * f) | |
t = v * (1.0 - s * (1.0 - f)) | |
i = i % 6 | |
conditions = [s == 0.0, i == 1, i == 2, i == 3, i == 4, i == 5] | |
rgb[..., 0] = np.select(conditions, [v, q, p, p, t, v], default=v) | |
rgb[..., 1] = np.select(conditions, [v, v, v, q, p, p], default=t) | |
rgb[..., 2] = np.select(conditions, [v, p, t, v, v, q], default=p) | |
return rgb.astype("uint8") | |
def __init__(self, hue_max=0.5, saturation_max=0.2): | |
self.hue_max = hue_max | |
self.saturation_max = saturation_max | |
def __call__(self, data_dict): | |
if "color" in data_dict.keys(): | |
# Assume color[:, :3] is rgb | |
hsv = HueSaturationTranslation.rgb_to_hsv(data_dict["color"][:, :3]) | |
hue_val = (np.random.rand() - 0.5) * 2 * self.hue_max | |
sat_ratio = 1 + (np.random.rand() - 0.5) * 2 * self.saturation_max | |
hsv[..., 0] = np.remainder(hue_val + hsv[..., 0] + 1, 1) | |
hsv[..., 1] = np.clip(sat_ratio * hsv[..., 1], 0, 1) | |
data_dict["color"][:, :3] = np.clip( | |
HueSaturationTranslation.hsv_to_rgb(hsv), 0, 255 | |
) | |
return data_dict | |
class RandomColorDrop(object): | |
def __init__(self, p=0.2, color_augment=0.0): | |
self.p = p | |
self.color_augment = color_augment | |
def __call__(self, data_dict): | |
if "color" in data_dict.keys() and np.random.rand() < self.p: | |
data_dict["color"] *= self.color_augment | |
return data_dict | |
def __repr__(self): | |
return "RandomColorDrop(color_augment: {}, p: {})".format( | |
self.color_augment, self.p | |
) | |
class ElasticDistortion(object): | |
def __init__(self, distortion_params=None): | |
self.distortion_params = ( | |
[[0.2, 0.4], [0.8, 1.6]] if distortion_params is None else distortion_params | |
) | |
def elastic_distortion(coords, granularity, magnitude): | |
""" | |
Apply elastic distortion on sparse coordinate space. | |
pointcloud: numpy array of (number of points, at least 3 spatial dims) | |
granularity: size of the noise grid (in same scale[m/cm] as the voxel grid) | |
magnitude: noise multiplier | |
""" | |
blurx = np.ones((3, 1, 1, 1)).astype("float32") / 3 | |
blury = np.ones((1, 3, 1, 1)).astype("float32") / 3 | |
blurz = np.ones((1, 1, 3, 1)).astype("float32") / 3 | |
coords_min = coords.min(0) | |
# Create Gaussian noise tensor of the size given by granularity. | |
noise_dim = ((coords - coords_min).max(0) // granularity).astype(int) + 3 | |
noise = np.random.randn(*noise_dim, 3).astype(np.float32) | |
# Smoothing. | |
for _ in range(2): | |
noise = scipy.ndimage.filters.convolve( | |
noise, blurx, mode="constant", cval=0 | |
) | |
noise = scipy.ndimage.filters.convolve( | |
noise, blury, mode="constant", cval=0 | |
) | |
noise = scipy.ndimage.filters.convolve( | |
noise, blurz, mode="constant", cval=0 | |
) | |
# Trilinear interpolate noise filters for each spatial dimensions. | |
ax = [ | |
np.linspace(d_min, d_max, d) | |
for d_min, d_max, d in zip( | |
coords_min - granularity, | |
coords_min + granularity * (noise_dim - 2), | |
noise_dim, | |
) | |
] | |
interp = scipy.interpolate.RegularGridInterpolator( | |
ax, noise, bounds_error=False, fill_value=0 | |
) | |
coords += interp(coords) * magnitude | |
return coords | |
def __call__(self, data_dict): | |
if "coord" in data_dict.keys() and self.distortion_params is not None: | |
if random.random() < 0.95: | |
for granularity, magnitude in self.distortion_params: | |
data_dict["coord"] = self.elastic_distortion( | |
data_dict["coord"], granularity, magnitude | |
) | |
return data_dict | |
class GridSample(object): | |
def __init__( | |
self, | |
grid_size=0.05, | |
hash_type="fnv", | |
mode="train", | |
keys=("coord", "color", "normal", "segment"), | |
return_inverse=False, | |
return_grid_coord=False, | |
return_min_coord=False, | |
return_displacement=False, | |
project_displacement=False, | |
): | |
self.grid_size = grid_size | |
self.hash = self.fnv_hash_vec if hash_type == "fnv" else self.ravel_hash_vec | |
assert mode in ["train", "test"] | |
self.mode = mode | |
self.keys = keys | |
self.return_inverse = return_inverse | |
self.return_grid_coord = return_grid_coord | |
self.return_min_coord = return_min_coord | |
self.return_displacement = return_displacement | |
self.project_displacement = project_displacement | |
def __call__(self, data_dict): | |
assert "coord" in data_dict.keys() | |
scaled_coord = data_dict["coord"] / np.array(self.grid_size) | |
grid_coord = np.floor(scaled_coord).astype(int) | |
min_coord = grid_coord.min(0) | |
grid_coord -= min_coord | |
scaled_coord -= min_coord | |
min_coord = min_coord * np.array(self.grid_size) | |
key = self.hash(grid_coord) | |
idx_sort = np.argsort(key) | |
key_sort = key[idx_sort] | |
_, inverse, count = np.unique(key_sort, return_inverse=True, return_counts=True) | |
if self.mode == "train": # train mode | |
idx_select = ( | |
np.cumsum(np.insert(count, 0, 0)[0:-1]) | |
+ np.random.randint(0, count.max(), count.size) % count | |
) | |
idx_unique = idx_sort[idx_select] | |
if "sampled_index" in data_dict: | |
# for ScanNet data efficient, we need to make sure labeled point is sampled. | |
idx_unique = np.unique( | |
np.append(idx_unique, data_dict["sampled_index"]) | |
) | |
mask = np.zeros_like(data_dict["segment"]).astype(bool) | |
mask[data_dict["sampled_index"]] = True | |
data_dict["sampled_index"] = np.where(mask[idx_unique])[0] | |
if self.return_inverse: | |
data_dict["inverse"] = np.zeros_like(inverse) | |
data_dict["inverse"][idx_sort] = inverse | |
if self.return_grid_coord: | |
data_dict["grid_coord"] = grid_coord[idx_unique] | |
if self.return_min_coord: | |
data_dict["min_coord"] = min_coord.reshape([1, 3]) | |
if self.return_displacement: | |
displacement = ( | |
scaled_coord - grid_coord - 0.5 | |
) # [0, 1] -> [-0.5, 0.5] displacement to center | |
if self.project_displacement: | |
displacement = np.sum( | |
displacement * data_dict["normal"], axis=-1, keepdims=True | |
) | |
data_dict["displacement"] = displacement[idx_unique] | |
for key in self.keys: | |
data_dict[key] = data_dict[key][idx_unique] | |
return data_dict | |
elif self.mode == "test": # test mode | |
data_part_list = [] | |
for i in range(count.max()): | |
idx_select = np.cumsum(np.insert(count, 0, 0)[0:-1]) + i % count | |
idx_part = idx_sort[idx_select] | |
data_part = dict(index=idx_part) | |
if self.return_inverse: | |
data_dict["inverse"] = np.zeros_like(inverse) | |
data_dict["inverse"][idx_sort] = inverse | |
if self.return_grid_coord: | |
data_part["grid_coord"] = grid_coord[idx_part] | |
if self.return_min_coord: | |
data_part["min_coord"] = min_coord.reshape([1, 3]) | |
if self.return_displacement: | |
displacement = ( | |
scaled_coord - grid_coord - 0.5 | |
) # [0, 1] -> [-0.5, 0.5] displacement to center | |
if self.project_displacement: | |
displacement = np.sum( | |
displacement * data_dict["normal"], axis=-1, keepdims=True | |
) | |
data_dict["displacement"] = displacement[idx_part] | |
for key in data_dict.keys(): | |
if key in self.keys: | |
data_part[key] = data_dict[key][idx_part] | |
else: | |
data_part[key] = data_dict[key] | |
data_part_list.append(data_part) | |
return data_part_list | |
else: | |
raise NotImplementedError | |
def ravel_hash_vec(arr): | |
""" | |
Ravel the coordinates after subtracting the min coordinates. | |
""" | |
assert arr.ndim == 2 | |
arr = arr.copy() | |
arr -= arr.min(0) | |
arr = arr.astype(np.uint64, copy=False) | |
arr_max = arr.max(0).astype(np.uint64) + 1 | |
keys = np.zeros(arr.shape[0], dtype=np.uint64) | |
# Fortran style indexing | |
for j in range(arr.shape[1] - 1): | |
keys += arr[:, j] | |
keys *= arr_max[j + 1] | |
keys += arr[:, -1] | |
return keys | |
def fnv_hash_vec(arr): | |
""" | |
FNV64-1A | |
""" | |
assert arr.ndim == 2 | |
# Floor first for negative coordinates | |
arr = arr.copy() | |
arr = arr.astype(np.uint64, copy=False) | |
hashed_arr = np.uint64(14695981039346656037) * np.ones( | |
arr.shape[0], dtype=np.uint64 | |
) | |
for j in range(arr.shape[1]): | |
hashed_arr *= np.uint64(1099511628211) | |
hashed_arr = np.bitwise_xor(hashed_arr, arr[:, j]) | |
return hashed_arr | |
class SphereCrop(object): | |
def __init__(self, point_max=80000, sample_rate=None, mode="random"): | |
self.point_max = point_max | |
self.sample_rate = sample_rate | |
assert mode in ["random", "center", "all"] | |
self.mode = mode | |
def __call__(self, data_dict): | |
point_max = ( | |
int(self.sample_rate * data_dict["coord"].shape[0]) | |
if self.sample_rate is not None | |
else self.point_max | |
) | |
assert "coord" in data_dict.keys() | |
if self.mode == "all": | |
# TODO: Optimize | |
if "index" not in data_dict.keys(): | |
data_dict["index"] = np.arange(data_dict["coord"].shape[0]) | |
data_part_list = [] | |
# coord_list, color_list, dist2_list, idx_list, offset_list = [], [], [], [], [] | |
if data_dict["coord"].shape[0] > point_max: | |
coord_p, idx_uni = np.random.rand( | |
data_dict["coord"].shape[0] | |
) * 1e-3, np.array([]) | |
while idx_uni.size != data_dict["index"].shape[0]: | |
init_idx = np.argmin(coord_p) | |
dist2 = np.sum( | |
np.power(data_dict["coord"] - data_dict["coord"][init_idx], 2), | |
1, | |
) | |
idx_crop = np.argsort(dist2)[:point_max] | |
data_crop_dict = dict() | |
if "coord" in data_dict.keys(): | |
data_crop_dict["coord"] = data_dict["coord"][idx_crop] | |
if "grid_coord" in data_dict.keys(): | |
data_crop_dict["grid_coord"] = data_dict["grid_coord"][idx_crop] | |
if "normal" in data_dict.keys(): | |
data_crop_dict["normal"] = data_dict["normal"][idx_crop] | |
if "color" in data_dict.keys(): | |
data_crop_dict["color"] = data_dict["color"][idx_crop] | |
if "displacement" in data_dict.keys(): | |
data_crop_dict["displacement"] = data_dict["displacement"][ | |
idx_crop | |
] | |
if "strength" in data_dict.keys(): | |
data_crop_dict["strength"] = data_dict["strength"][idx_crop] | |
data_crop_dict["weight"] = dist2[idx_crop] | |
data_crop_dict["index"] = data_dict["index"][idx_crop] | |
data_part_list.append(data_crop_dict) | |
delta = np.square( | |
1 - data_crop_dict["weight"] / np.max(data_crop_dict["weight"]) | |
) | |
coord_p[idx_crop] += delta | |
idx_uni = np.unique( | |
np.concatenate((idx_uni, data_crop_dict["index"])) | |
) | |
else: | |
data_crop_dict = data_dict.copy() | |
data_crop_dict["weight"] = np.zeros(data_dict["coord"].shape[0]) | |
data_crop_dict["index"] = data_dict["index"] | |
data_part_list.append(data_crop_dict) | |
return data_part_list | |
# mode is "random" or "center" | |
elif data_dict["coord"].shape[0] > point_max: | |
if self.mode == "random": | |
center = data_dict["coord"][ | |
np.random.randint(data_dict["coord"].shape[0]) | |
] | |
elif self.mode == "center": | |
center = data_dict["coord"][data_dict["coord"].shape[0] // 2] | |
else: | |
raise NotImplementedError | |
idx_crop = np.argsort(np.sum(np.square(data_dict["coord"] - center), 1))[ | |
:point_max | |
] | |
if "coord" in data_dict.keys(): | |
data_dict["coord"] = data_dict["coord"][idx_crop] | |
if "origin_coord" in data_dict.keys(): | |
data_dict["origin_coord"] = data_dict["origin_coord"][idx_crop] | |
if "grid_coord" in data_dict.keys(): | |
data_dict["grid_coord"] = data_dict["grid_coord"][idx_crop] | |
if "color" in data_dict.keys(): | |
data_dict["color"] = data_dict["color"][idx_crop] | |
if "normal" in data_dict.keys(): | |
data_dict["normal"] = data_dict["normal"][idx_crop] | |
if "segment" in data_dict.keys(): | |
data_dict["segment"] = data_dict["segment"][idx_crop] | |
if "instance" in data_dict.keys(): | |
data_dict["instance"] = data_dict["instance"][idx_crop] | |
if "displacement" in data_dict.keys(): | |
data_dict["displacement"] = data_dict["displacement"][idx_crop] | |
if "strength" in data_dict.keys(): | |
data_dict["strength"] = data_dict["strength"][idx_crop] | |
return data_dict | |
class ShufflePoint(object): | |
def __call__(self, data_dict): | |
assert "coord" in data_dict.keys() | |
shuffle_index = np.arange(data_dict["coord"].shape[0]) | |
np.random.shuffle(shuffle_index) | |
if "coord" in data_dict.keys(): | |
data_dict["coord"] = data_dict["coord"][shuffle_index] | |
if "grid_coord" in data_dict.keys(): | |
data_dict["grid_coord"] = data_dict["grid_coord"][shuffle_index] | |
if "displacement" in data_dict.keys(): | |
data_dict["displacement"] = data_dict["displacement"][shuffle_index] | |
if "color" in data_dict.keys(): | |
data_dict["color"] = data_dict["color"][shuffle_index] | |
if "normal" in data_dict.keys(): | |
data_dict["normal"] = data_dict["normal"][shuffle_index] | |
if "segment" in data_dict.keys(): | |
data_dict["segment"] = data_dict["segment"][shuffle_index] | |
if "instance" in data_dict.keys(): | |
data_dict["instance"] = data_dict["instance"][shuffle_index] | |
return data_dict | |
class CropBoundary(object): | |
def __call__(self, data_dict): | |
assert "segment" in data_dict | |
segment = data_dict["segment"].flatten() | |
mask = (segment != 0) * (segment != 1) | |
if "coord" in data_dict.keys(): | |
data_dict["coord"] = data_dict["coord"][mask] | |
if "grid_coord" in data_dict.keys(): | |
data_dict["grid_coord"] = data_dict["grid_coord"][mask] | |
if "color" in data_dict.keys(): | |
data_dict["color"] = data_dict["color"][mask] | |
if "normal" in data_dict.keys(): | |
data_dict["normal"] = data_dict["normal"][mask] | |
if "segment" in data_dict.keys(): | |
data_dict["segment"] = data_dict["segment"][mask] | |
if "instance" in data_dict.keys(): | |
data_dict["instance"] = data_dict["instance"][mask] | |
return data_dict | |
class ContrastiveViewsGenerator(object): | |
def __init__( | |
self, | |
view_keys=("coord", "color", "normal", "origin_coord"), | |
view_trans_cfg=None, | |
): | |
self.view_keys = view_keys | |
self.view_trans = Compose(view_trans_cfg) | |
def __call__(self, data_dict): | |
view1_dict = dict() | |
view2_dict = dict() | |
for key in self.view_keys: | |
view1_dict[key] = data_dict[key].copy() | |
view2_dict[key] = data_dict[key].copy() | |
view1_dict = self.view_trans(view1_dict) | |
view2_dict = self.view_trans(view2_dict) | |
for key, value in view1_dict.items(): | |
data_dict["view1_" + key] = value | |
for key, value in view2_dict.items(): | |
data_dict["view2_" + key] = value | |
return data_dict | |
class InstanceParser(object): | |
def __init__(self, segment_ignore_index=(-1, 0, 1), instance_ignore_index=-1): | |
self.segment_ignore_index = segment_ignore_index | |
self.instance_ignore_index = instance_ignore_index | |
def __call__(self, data_dict): | |
coord = data_dict["coord"] | |
segment = data_dict["segment"] | |
instance = data_dict["instance"] | |
mask = ~np.in1d(segment, self.segment_ignore_index) | |
# mapping ignored instance to ignore index | |
instance[~mask] = self.instance_ignore_index | |
# reorder left instance | |
unique, inverse = np.unique(instance[mask], return_inverse=True) | |
instance_num = len(unique) | |
instance[mask] = inverse | |
# init instance information | |
centroid = np.ones((coord.shape[0], 3)) * self.instance_ignore_index | |
bbox = np.ones((instance_num, 8)) * self.instance_ignore_index | |
vacancy = [ | |
index for index in self.segment_ignore_index if index >= 0 | |
] # vacate class index | |
for instance_id in range(instance_num): | |
mask_ = instance == instance_id | |
coord_ = coord[mask_] | |
bbox_min = coord_.min(0) | |
bbox_max = coord_.max(0) | |
bbox_centroid = coord_.mean(0) | |
bbox_center = (bbox_max + bbox_min) / 2 | |
bbox_size = bbox_max - bbox_min | |
bbox_theta = np.zeros(1, dtype=coord_.dtype) | |
bbox_class = np.array([segment[mask_][0]], dtype=coord_.dtype) | |
# shift class index to fill vacate class index caused by segment ignore index | |
bbox_class -= np.greater(bbox_class, vacancy).sum() | |
centroid[mask_] = bbox_centroid | |
bbox[instance_id] = np.concatenate( | |
[bbox_center, bbox_size, bbox_theta, bbox_class] | |
) # 3 + 3 + 1 + 1 = 8 | |
data_dict["instance"] = instance | |
data_dict["instance_centroid"] = centroid | |
data_dict["bbox"] = bbox | |
return data_dict | |
class Compose(object): | |
def __init__(self, cfg=None): | |
self.cfg = cfg if cfg is not None else [] | |
self.transforms = [] | |
for t_cfg in self.cfg: | |
self.transforms.append(TRANSFORMS.build(t_cfg)) | |
def __call__(self, data_dict): | |
for t in self.transforms: | |
data_dict = t(data_dict) | |
return data_dict | |