ICON / lib /common /seg3d_lossless.py
Yuliang's picture
fewer cloth_loop and use GPU-MC
9bb001d
# -*- coding: utf-8 -*-
# Max-Planck-Gesellschaft zur Förderung der Wissenschaften e.V. (MPG) is
# holder of all proprietary rights on this computer program.
# You can only use this computer program if you have closed
# a license agreement with MPG or you get the right to use the computer
# program from someone who is authorized to grant you that right.
# Any use of the computer program without a valid license is prohibited and
# liable to prosecution.
#
# Copyright©2019 Max-Planck-Gesellschaft zur Förderung
# der Wissenschaften e.V. (MPG). acting on behalf of its Max Planck Institute
# for Intelligent Systems. All rights reserved.
#
# Contact: ps-license@tuebingen.mpg.de
from .seg3d_utils import (
create_grid3D,
plot_mask3D,
SmoothConv3D,
)
import torch
import torch.nn as nn
import numpy as np
import torch.nn.functional as F
import mcubes
from kaolin.ops.conversions import voxelgrids_to_trianglemeshes
import logging
logging.getLogger("lightning").setLevel(logging.ERROR)
class Seg3dLossless(nn.Module):
def __init__(self,
query_func,
b_min,
b_max,
resolutions,
channels=1,
balance_value=0.5,
align_corners=False,
visualize=False,
debug=False,
use_cuda_impl=False,
faster=False,
use_shadow=False,
**kwargs):
"""
align_corners: same with how you process gt. (grid_sample / interpolate)
"""
super().__init__()
self.query_func = query_func
self.register_buffer(
'b_min',
torch.tensor(b_min).float().unsqueeze(1)) # [bz, 1, 3]
self.register_buffer(
'b_max',
torch.tensor(b_max).float().unsqueeze(1)) # [bz, 1, 3]
# ti.init(arch=ti.cuda)
# self.mciso_taichi = MCISO(dim=3, N=resolutions[-1]-1)
if type(resolutions[0]) is int:
resolutions = torch.tensor([(res, res, res)
for res in resolutions])
else:
resolutions = torch.tensor(resolutions)
self.register_buffer('resolutions', resolutions)
self.batchsize = self.b_min.size(0)
assert self.batchsize == 1
self.balance_value = balance_value
self.channels = channels
assert self.channels == 1
self.align_corners = align_corners
self.visualize = visualize
self.debug = debug
self.use_cuda_impl = use_cuda_impl
self.faster = faster
self.use_shadow = use_shadow
for resolution in resolutions:
assert resolution[0] % 2 == 1 and resolution[1] % 2 == 1, \
f"resolution {resolution} need to be odd becuase of align_corner."
# init first resolution
init_coords = create_grid3D(0,
resolutions[-1] - 1,
steps=resolutions[0]) # [N, 3]
init_coords = init_coords.unsqueeze(0).repeat(self.batchsize, 1,
1) # [bz, N, 3]
self.register_buffer('init_coords', init_coords)
# some useful tensors
calculated = torch.zeros(
(self.resolutions[-1][2], self.resolutions[-1][1],
self.resolutions[-1][0]),
dtype=torch.bool)
self.register_buffer('calculated', calculated)
gird8_offsets = torch.stack(
torch.meshgrid([
torch.tensor([-1, 0, 1]),
torch.tensor([-1, 0, 1]),
torch.tensor([-1, 0, 1])
])).int().view(3, -1).t() # [27, 3]
self.register_buffer('gird8_offsets', gird8_offsets)
# smooth convs
self.smooth_conv3x3 = SmoothConv3D(in_channels=1,
out_channels=1,
kernel_size=3)
self.smooth_conv5x5 = SmoothConv3D(in_channels=1,
out_channels=1,
kernel_size=5)
self.smooth_conv7x7 = SmoothConv3D(in_channels=1,
out_channels=1,
kernel_size=7)
self.smooth_conv9x9 = SmoothConv3D(in_channels=1,
out_channels=1,
kernel_size=9)
def batch_eval(self, coords, **kwargs):
"""
coords: in the coordinates of last resolution
**kwargs: for query_func
"""
coords = coords.detach()
# normalize coords to fit in [b_min, b_max]
if self.align_corners:
coords2D = coords.float() / (self.resolutions[-1] - 1)
else:
step = 1.0 / self.resolutions[-1].float()
coords2D = coords.float() / self.resolutions[-1] + step / 2
coords2D = coords2D * (self.b_max - self.b_min) + self.b_min
# query function
occupancys = self.query_func(**kwargs, points=coords2D)
if type(occupancys) is list:
occupancys = torch.stack(occupancys) # [bz, C, N]
assert len(occupancys.size()) == 3, \
"query_func should return a occupancy with shape of [bz, C, N]"
return occupancys
def forward(self, **kwargs):
if self.faster:
return self._forward_faster(**kwargs)
else:
return self._forward(**kwargs)
def _forward_faster(self, **kwargs):
"""
In faster mode, we make following changes to exchange accuracy for speed:
1. no conflict checking: 4.88 fps -> 6.56 fps
2. smooth_conv9x9 ~ smooth_conv3x3 for different resolution
3. last step no examine
"""
final_W = self.resolutions[-1][0]
final_H = self.resolutions[-1][1]
final_D = self.resolutions[-1][2]
for resolution in self.resolutions:
W, H, D = resolution
stride = (self.resolutions[-1] - 1) / (resolution - 1)
# first step
if torch.equal(resolution, self.resolutions[0]):
coords = self.init_coords.clone() # torch.long
occupancys = self.batch_eval(coords, **kwargs)
occupancys = occupancys.view(self.batchsize, self.channels, D,
H, W)
if (occupancys > 0.5).sum() == 0:
# return F.interpolate(
# occupancys, size=(final_D, final_H, final_W),
# mode="linear", align_corners=True)
return None
if self.visualize:
self.plot(occupancys, coords, final_D, final_H, final_W)
with torch.no_grad():
coords_accum = coords / stride
# last step
elif torch.equal(resolution, self.resolutions[-1]):
with torch.no_grad():
# here true is correct!
valid = F.interpolate(
(occupancys > self.balance_value).float(),
size=(D, H, W),
mode="trilinear",
align_corners=True)
# here true is correct!
occupancys = F.interpolate(occupancys.float(),
size=(D, H, W),
mode="trilinear",
align_corners=True)
# is_boundary = (valid > 0.0) & (valid < 1.0)
is_boundary = valid == 0.5
# next steps
else:
coords_accum *= 2
with torch.no_grad():
# here true is correct!
valid = F.interpolate(
(occupancys > self.balance_value).float(),
size=(D, H, W),
mode="trilinear",
align_corners=True)
# here true is correct!
occupancys = F.interpolate(occupancys.float(),
size=(D, H, W),
mode="trilinear",
align_corners=True)
is_boundary = (valid > 0.0) & (valid < 1.0)
with torch.no_grad():
if torch.equal(resolution, self.resolutions[1]):
is_boundary = (self.smooth_conv9x9(is_boundary.float())
> 0)[0, 0]
elif torch.equal(resolution, self.resolutions[2]):
is_boundary = (self.smooth_conv7x7(is_boundary.float())
> 0)[0, 0]
else:
is_boundary = (self.smooth_conv3x3(is_boundary.float())
> 0)[0, 0]
coords_accum = coords_accum.long()
is_boundary[coords_accum[0, :, 2], coords_accum[0, :, 1],
coords_accum[0, :, 0]] = False
point_coords = is_boundary.permute(
2, 1, 0).nonzero(as_tuple=False).unsqueeze(0)
point_indices = (point_coords[:, :, 2] * H * W +
point_coords[:, :, 1] * W +
point_coords[:, :, 0])
R, C, D, H, W = occupancys.shape
# inferred value
coords = point_coords * stride
if coords.size(1) == 0:
continue
occupancys_topk = self.batch_eval(coords, **kwargs)
# put mask point predictions to the right places on the upsampled grid.
R, C, D, H, W = occupancys.shape
point_indices = point_indices.unsqueeze(1).expand(-1, C, -1)
occupancys = (occupancys.reshape(R, C, D * H * W).scatter_(
2, point_indices, occupancys_topk).view(R, C, D, H, W))
with torch.no_grad():
voxels = coords / stride
coords_accum = torch.cat([voxels, coords_accum],
dim=1).unique(dim=1)
return occupancys[0, 0]
def _forward(self, **kwargs):
"""
output occupancy field would be:
(bz, C, res, res)
"""
final_W = self.resolutions[-1][0]
final_H = self.resolutions[-1][1]
final_D = self.resolutions[-1][2]
calculated = self.calculated.clone()
for resolution in self.resolutions:
W, H, D = resolution
stride = (self.resolutions[-1] - 1) / (resolution - 1)
if self.visualize:
this_stage_coords = []
# first step
if torch.equal(resolution, self.resolutions[0]):
coords = self.init_coords.clone() # torch.long
occupancys = self.batch_eval(coords, **kwargs)
occupancys = occupancys.view(self.batchsize, self.channels, D,
H, W)
if self.visualize:
self.plot(occupancys, coords, final_D, final_H, final_W)
with torch.no_grad():
coords_accum = coords / stride
calculated[coords[0, :, 2], coords[0, :, 1],
coords[0, :, 0]] = True
# next steps
else:
coords_accum *= 2
with torch.no_grad():
# here true is correct!
valid = F.interpolate(
(occupancys > self.balance_value).float(),
size=(D, H, W),
mode="trilinear",
align_corners=True)
# here true is correct!
occupancys = F.interpolate(occupancys.float(),
size=(D, H, W),
mode="trilinear",
align_corners=True)
is_boundary = (valid > 0.0) & (valid < 1.0)
with torch.no_grad():
# TODO
if self.use_shadow and torch.equal(resolution,
self.resolutions[-1]):
# larger z means smaller depth here
depth_res = resolution[2].item()
depth_index = torch.linspace(0,
depth_res - 1,
steps=depth_res).type_as(
occupancys.device)
depth_index_max = torch.max(
(occupancys > self.balance_value) *
(depth_index + 1),
dim=-1,
keepdim=True)[0] - 1
shadow = depth_index < depth_index_max
is_boundary[shadow] = False
is_boundary = is_boundary[0, 0]
else:
is_boundary = (self.smooth_conv3x3(is_boundary.float())
> 0)[0, 0]
# is_boundary = is_boundary[0, 0]
is_boundary[coords_accum[0, :, 2], coords_accum[0, :, 1],
coords_accum[0, :, 0]] = False
point_coords = is_boundary.permute(
2, 1, 0).nonzero(as_tuple=False).unsqueeze(0)
point_indices = (point_coords[:, :, 2] * H * W +
point_coords[:, :, 1] * W +
point_coords[:, :, 0])
R, C, D, H, W = occupancys.shape
# interpolated value
occupancys_interp = torch.gather(
occupancys.reshape(R, C, D * H * W), 2,
point_indices.unsqueeze(1))
# inferred value
coords = point_coords * stride
if coords.size(1) == 0:
continue
occupancys_topk = self.batch_eval(coords, **kwargs)
if self.visualize:
this_stage_coords.append(coords)
# put mask point predictions to the right places on the upsampled grid.
R, C, D, H, W = occupancys.shape
point_indices = point_indices.unsqueeze(1).expand(-1, C, -1)
occupancys = (occupancys.reshape(R, C, D * H * W).scatter_(
2, point_indices, occupancys_topk).view(R, C, D, H, W))
with torch.no_grad():
# conflicts
conflicts = ((occupancys_interp - self.balance_value) *
(occupancys_topk - self.balance_value) < 0)[0,
0]
if self.visualize:
self.plot(occupancys, coords, final_D, final_H,
final_W)
voxels = coords / stride
coords_accum = torch.cat([voxels, coords_accum],
dim=1).unique(dim=1)
calculated[coords[0, :, 2], coords[0, :, 1],
coords[0, :, 0]] = True
while conflicts.sum() > 0:
if self.use_shadow and torch.equal(resolution,
self.resolutions[-1]):
break
with torch.no_grad():
conflicts_coords = coords[0, conflicts, :]
if self.debug:
self.plot(occupancys,
conflicts_coords.unsqueeze(0),
final_D,
final_H,
final_W,
title='conflicts')
conflicts_boundary = (conflicts_coords.int() +
self.gird8_offsets.unsqueeze(1) *
stride.int()).reshape(
-1, 3).long().unique(dim=0)
conflicts_boundary[:, 0] = (
conflicts_boundary[:, 0].clamp(
0,
calculated.size(2) - 1))
conflicts_boundary[:, 1] = (
conflicts_boundary[:, 1].clamp(
0,
calculated.size(1) - 1))
conflicts_boundary[:, 2] = (
conflicts_boundary[:, 2].clamp(
0,
calculated.size(0) - 1))
coords = conflicts_boundary[calculated[
conflicts_boundary[:, 2], conflicts_boundary[:, 1],
conflicts_boundary[:, 0]] == False]
if self.debug:
self.plot(occupancys,
coords.unsqueeze(0),
final_D,
final_H,
final_W,
title='coords')
coords = coords.unsqueeze(0)
point_coords = coords / stride
point_indices = (point_coords[:, :, 2] * H * W +
point_coords[:, :, 1] * W +
point_coords[:, :, 0])
R, C, D, H, W = occupancys.shape
# interpolated value
occupancys_interp = torch.gather(
occupancys.reshape(R, C, D * H * W), 2,
point_indices.unsqueeze(1))
# inferred value
coords = point_coords * stride
if coords.size(1) == 0:
break
occupancys_topk = self.batch_eval(coords, **kwargs)
if self.visualize:
this_stage_coords.append(coords)
with torch.no_grad():
# conflicts
conflicts = ((occupancys_interp - self.balance_value) *
(occupancys_topk - self.balance_value) <
0)[0, 0]
# put mask point predictions to the right places on the upsampled grid.
point_indices = point_indices.unsqueeze(1).expand(
-1, C, -1)
occupancys = (occupancys.reshape(R, C, D * H * W).scatter_(
2, point_indices, occupancys_topk).view(R, C, D, H, W))
with torch.no_grad():
voxels = coords / stride
coords_accum = torch.cat([voxels, coords_accum],
dim=1).unique(dim=1)
calculated[coords[0, :, 2], coords[0, :, 1],
coords[0, :, 0]] = True
if self.visualize:
this_stage_coords = torch.cat(this_stage_coords, dim=1)
self.plot(occupancys, this_stage_coords, final_D, final_H,
final_W)
return occupancys[0, 0]
def plot(self,
occupancys,
coords,
final_D,
final_H,
final_W,
title='',
**kwargs):
final = F.interpolate(occupancys.float(),
size=(final_D, final_H, final_W),
mode="trilinear",
align_corners=True) # here true is correct!
x = coords[0, :, 0].to("cpu")
y = coords[0, :, 1].to("cpu")
z = coords[0, :, 2].to("cpu")
plot_mask3D(final[0, 0].to("cpu"), title, (x, y, z), **kwargs)
def find_vertices(self, sdf, direction="front"):
'''
- direction: "front" | "back" | "left" | "right"
'''
resolution = sdf.size(2)
if direction == "front":
pass
elif direction == "left":
sdf = sdf.permute(2, 1, 0)
elif direction == "back":
inv_idx = torch.arange(sdf.size(2) - 1, -1, -1).long()
sdf = sdf[inv_idx, :, :]
elif direction == "right":
inv_idx = torch.arange(sdf.size(2) - 1, -1, -1).long()
sdf = sdf[:, :, inv_idx]
sdf = sdf.permute(2, 1, 0)
inv_idx = torch.arange(sdf.size(2) - 1, -1, -1).long()
sdf = sdf[inv_idx, :, :]
sdf_all = sdf.permute(2, 1, 0)
# shadow
grad_v = (sdf_all > 0.5) * torch.linspace(
resolution, 1, steps=resolution).to(sdf.device)
grad_c = torch.ones_like(sdf_all) * torch.linspace(
0, resolution - 1, steps=resolution).to(sdf.device)
max_v, max_c = grad_v.max(dim=2)
shadow = grad_c > max_c.view(resolution, resolution, 1)
keep = (sdf_all > 0.5) & (~shadow)
p1 = keep.nonzero(as_tuple=False).t() # [3, N]
p2 = p1.clone() # z
p2[2, :] = (p2[2, :] - 2).clamp(0, resolution)
p3 = p1.clone() # y
p3[1, :] = (p3[1, :] - 2).clamp(0, resolution)
p4 = p1.clone() # x
p4[0, :] = (p4[0, :] - 2).clamp(0, resolution)
v1 = sdf_all[p1[0, :], p1[1, :], p1[2, :]]
v2 = sdf_all[p2[0, :], p2[1, :], p2[2, :]]
v3 = sdf_all[p3[0, :], p3[1, :], p3[2, :]]
v4 = sdf_all[p4[0, :], p4[1, :], p4[2, :]]
X = p1[0, :].long() # [N,]
Y = p1[1, :].long() # [N,]
Z = p2[2, :].float() * (0.5 - v1) / (v2 - v1) + \
p1[2, :].float() * (v2 - 0.5) / (v2 - v1) # [N,]
Z = Z.clamp(0, resolution)
# normal
norm_z = v2 - v1
norm_y = v3 - v1
norm_x = v4 - v1
# print (v2.min(dim=0)[0], v2.max(dim=0)[0], v3.min(dim=0)[0], v3.max(dim=0)[0])
norm = torch.stack([norm_x, norm_y, norm_z], dim=1)
norm = norm / torch.norm(norm, p=2, dim=1, keepdim=True)
return X, Y, Z, norm
def render_normal(self, resolution, X, Y, Z, norm):
image = torch.ones((1, 3, resolution, resolution),
dtype=torch.float32).to(norm.device)
color = (norm + 1) / 2.0
color = color.clamp(0, 1)
image[0, :, Y, X] = color.t()
return image
def display(self, sdf):
# render
X, Y, Z, norm = self.find_vertices(sdf, direction="front")
image1 = self.render_normal(self.resolutions[-1, -1], X, Y, Z, norm)
X, Y, Z, norm = self.find_vertices(sdf, direction="left")
image2 = self.render_normal(self.resolutions[-1, -1], X, Y, Z, norm)
X, Y, Z, norm = self.find_vertices(sdf, direction="right")
image3 = self.render_normal(self.resolutions[-1, -1], X, Y, Z, norm)
X, Y, Z, norm = self.find_vertices(sdf, direction="back")
image4 = self.render_normal(self.resolutions[-1, -1], X, Y, Z, norm)
image = torch.cat([image1, image2, image3, image4], axis=3)
image = image.detach().cpu().numpy()[0].transpose(1, 2, 0) * 255.0
return np.uint8(image)
def export_mesh(self, occupancys):
final = occupancys[1:, 1:, 1:].contiguous()
if final.shape[0] > 256:
# for voxelgrid larger than 256^3, the required GPU memory will be > 9GB
# thus we use CPU marching_cube to avoid "CUDA out of memory"
occu_arr = final.detach().cpu().numpy() # non-smooth surface
# occu_arr = mcubes.smooth(final.detach().cpu().numpy()) # smooth surface
vertices, triangles = mcubes.marching_cubes(
occu_arr, self.balance_value)
verts = torch.as_tensor(vertices[:, [2, 1, 0]])
faces = torch.as_tensor(triangles.astype(
np.long), dtype=torch.long)[:, [0, 2, 1]]
else:
torch.cuda.empty_cache()
vertices, triangles = voxelgrids_to_trianglemeshes(
final.unsqueeze(0))
verts = vertices[0][:, [2, 1, 0]].cpu()
faces = triangles[0][:, [0, 2, 1]].cpu()
return verts, faces