Spaces:
Runtime error
Runtime error
import torch | |
from torch.nn import init | |
import torch.nn as nn | |
import torch.nn.functional as F | |
import functools | |
import numpy as np | |
from .mesh_util import * | |
from .sample_util import * | |
from .geometry import index | |
import cv2 | |
from PIL import Image | |
from tqdm import tqdm | |
def reshape_multiview_tensors(image_tensor, calib_tensor): | |
# Careful here! Because we put single view and multiview together, | |
# the returned tensor.shape is 5-dim: [B, num_views, C, W, H] | |
# So we need to convert it back to 4-dim [B*num_views, C, W, H] | |
# Don't worry classifier will handle multi-view cases | |
image_tensor = image_tensor.view( | |
image_tensor.shape[0] * image_tensor.shape[1], | |
image_tensor.shape[2], | |
image_tensor.shape[3], | |
image_tensor.shape[4] | |
) | |
calib_tensor = calib_tensor.view( | |
calib_tensor.shape[0] * calib_tensor.shape[1], | |
calib_tensor.shape[2], | |
calib_tensor.shape[3] | |
) | |
return image_tensor, calib_tensor | |
def reshape_sample_tensor(sample_tensor, num_views): | |
if num_views == 1: | |
return sample_tensor | |
# Need to repeat sample_tensor along the batch dim num_views times | |
sample_tensor = sample_tensor.unsqueeze(dim=1) | |
sample_tensor = sample_tensor.repeat(1, num_views, 1, 1) | |
sample_tensor = sample_tensor.view( | |
sample_tensor.shape[0] * sample_tensor.shape[1], | |
sample_tensor.shape[2], | |
sample_tensor.shape[3] | |
) | |
return sample_tensor | |
def gen_mesh(opt, net, cuda, data, save_path, use_octree=True): | |
image_tensor = data['img'].to(device=cuda) | |
calib_tensor = data['calib'].to(device=cuda) | |
net.filter(image_tensor) | |
b_min = data['b_min'] | |
b_max = data['b_max'] | |
try: | |
save_img_path = save_path[:-4] + '.png' | |
save_img_list = [] | |
for v in range(image_tensor.shape[0]): | |
save_img = (np.transpose(image_tensor[v].detach().cpu().numpy(), (1, 2, 0)) * 0.5 + 0.5)[:, :, ::-1] * 255.0 | |
save_img_list.append(save_img) | |
save_img = np.concatenate(save_img_list, axis=1) | |
Image.fromarray(np.uint8(save_img[:,:,::-1])).save(save_img_path) | |
verts, faces, _, _ = reconstruction( | |
net, cuda, calib_tensor, opt.resolution, b_min, b_max, use_octree=use_octree) | |
verts_tensor = torch.from_numpy(verts.T).unsqueeze(0).to(device=cuda).float() | |
xyz_tensor = net.projection(verts_tensor, calib_tensor[:1]) | |
uv = xyz_tensor[:, :2, :] | |
color = index(image_tensor[:1], uv).detach().cpu().numpy()[0].T | |
color = color * 0.5 + 0.5 | |
save_obj_mesh_with_color(save_path, verts, faces, color) | |
except Exception as e: | |
print(e) | |
print('Can not create marching cubes at this time.') | |
def gen_mesh_color(opt, netG, netC, cuda, data, save_path, use_octree=True): | |
image_tensor = data['img'].to(device=cuda) | |
calib_tensor = data['calib'].to(device=cuda) | |
netG.filter(image_tensor) | |
netC.filter(image_tensor) | |
netC.attach(netG.get_im_feat()) | |
b_min = data['b_min'] | |
b_max = data['b_max'] | |
try: | |
save_img_path = save_path[:-4] + '.png' | |
save_img_list = [] | |
for v in range(image_tensor.shape[0]): | |
save_img = (np.transpose(image_tensor[v].detach().cpu().numpy(), (1, 2, 0)) * 0.5 + 0.5)[:, :, ::-1] * 255.0 | |
save_img_list.append(save_img) | |
save_img = np.concatenate(save_img_list, axis=1) | |
Image.fromarray(np.uint8(save_img[:,:,::-1])).save(save_img_path) | |
verts, faces, _, _ = reconstruction( | |
netG, cuda, calib_tensor, opt.resolution, b_min, b_max, use_octree=use_octree) | |
# Now Getting colors | |
verts_tensor = torch.from_numpy(verts.T).unsqueeze(0).to(device=cuda).float() | |
verts_tensor = reshape_sample_tensor(verts_tensor, opt.num_views) | |
color = np.zeros(verts.shape) | |
interval = opt.num_sample_color | |
for i in range(len(color) // interval): | |
left = i * interval | |
right = i * interval + interval | |
if i == len(color) // interval - 1: | |
right = -1 | |
netC.query(verts_tensor[:, :, left:right], calib_tensor) | |
rgb = netC.get_preds()[0].detach().cpu().numpy() * 0.5 + 0.5 | |
color[left:right] = rgb.T | |
save_obj_mesh_with_color(save_path, verts, faces, color) | |
except Exception as e: | |
print(e) | |
print('Can not create marching cubes at this time.') | |
def adjust_learning_rate(optimizer, epoch, lr, schedule, gamma): | |
"""Sets the learning rate to the initial LR decayed by schedule""" | |
if epoch in schedule: | |
lr *= gamma | |
for param_group in optimizer.param_groups: | |
param_group['lr'] = lr | |
return lr | |
def compute_acc(pred, gt, thresh=0.5): | |
''' | |
return: | |
IOU, precision, and recall | |
''' | |
with torch.no_grad(): | |
vol_pred = pred > thresh | |
vol_gt = gt > thresh | |
union = vol_pred | vol_gt | |
inter = vol_pred & vol_gt | |
true_pos = inter.sum().float() | |
union = union.sum().float() | |
if union == 0: | |
union = 1 | |
vol_pred = vol_pred.sum().float() | |
if vol_pred == 0: | |
vol_pred = 1 | |
vol_gt = vol_gt.sum().float() | |
if vol_gt == 0: | |
vol_gt = 1 | |
return true_pos / union, true_pos / vol_pred, true_pos / vol_gt | |
def calc_error(opt, net, cuda, dataset, num_tests): | |
if num_tests > len(dataset): | |
num_tests = len(dataset) | |
with torch.no_grad(): | |
erorr_arr, IOU_arr, prec_arr, recall_arr = [], [], [], [] | |
for idx in tqdm(range(num_tests)): | |
data = dataset[idx * len(dataset) // num_tests] | |
# retrieve the data | |
image_tensor = data['img'].to(device=cuda) | |
calib_tensor = data['calib'].to(device=cuda) | |
sample_tensor = data['samples'].to(device=cuda).unsqueeze(0) | |
if opt.num_views > 1: | |
sample_tensor = reshape_sample_tensor(sample_tensor, opt.num_views) | |
label_tensor = data['labels'].to(device=cuda).unsqueeze(0) | |
res, error = net.forward(image_tensor, sample_tensor, calib_tensor, labels=label_tensor) | |
IOU, prec, recall = compute_acc(res, label_tensor) | |
# print( | |
# '{0}/{1} | Error: {2:06f} IOU: {3:06f} prec: {4:06f} recall: {5:06f}' | |
# .format(idx, num_tests, error.item(), IOU.item(), prec.item(), recall.item())) | |
erorr_arr.append(error.item()) | |
IOU_arr.append(IOU.item()) | |
prec_arr.append(prec.item()) | |
recall_arr.append(recall.item()) | |
return np.average(erorr_arr), np.average(IOU_arr), np.average(prec_arr), np.average(recall_arr) | |
def calc_error_color(opt, netG, netC, cuda, dataset, num_tests): | |
if num_tests > len(dataset): | |
num_tests = len(dataset) | |
with torch.no_grad(): | |
error_color_arr = [] | |
for idx in tqdm(range(num_tests)): | |
data = dataset[idx * len(dataset) // num_tests] | |
# retrieve the data | |
image_tensor = data['img'].to(device=cuda) | |
calib_tensor = data['calib'].to(device=cuda) | |
color_sample_tensor = data['color_samples'].to(device=cuda).unsqueeze(0) | |
if opt.num_views > 1: | |
color_sample_tensor = reshape_sample_tensor(color_sample_tensor, opt.num_views) | |
rgb_tensor = data['rgbs'].to(device=cuda).unsqueeze(0) | |
netG.filter(image_tensor) | |
_, errorC = netC.forward(image_tensor, netG.get_im_feat(), color_sample_tensor, calib_tensor, labels=rgb_tensor) | |
# print('{0}/{1} | Error inout: {2:06f} | Error color: {3:06f}' | |
# .format(idx, num_tests, errorG.item(), errorC.item())) | |
error_color_arr.append(errorC.item()) | |
return np.average(error_color_arr) | |
def conv3x3(in_planes, out_planes, strd=1, padding=1, bias=False): | |
"3x3 convolution with padding" | |
return nn.Conv2d(in_planes, out_planes, kernel_size=3, | |
stride=strd, padding=padding, bias=bias) | |
def init_weights(net, init_type='normal', init_gain=0.02): | |
"""Initialize network weights. | |
Parameters: | |
net (network) -- network to be initialized | |
init_type (str) -- the name of an initialization method: normal | xavier | kaiming | orthogonal | |
init_gain (float) -- scaling factor for normal, xavier and orthogonal. | |
We use 'normal' in the original pix2pix and CycleGAN paper. But xavier and kaiming might | |
work better for some applications. Feel free to try yourself. | |
""" | |
def init_func(m): # define the initialization function | |
classname = m.__class__.__name__ | |
if hasattr(m, 'weight') and (classname.find('Conv') != -1 or classname.find('Linear') != -1): | |
if init_type == 'normal': | |
init.normal_(m.weight.data, 0.0, init_gain) | |
elif init_type == 'xavier': | |
init.xavier_normal_(m.weight.data, gain=init_gain) | |
elif init_type == 'kaiming': | |
init.kaiming_normal_(m.weight.data, a=0, mode='fan_in') | |
elif init_type == 'orthogonal': | |
init.orthogonal_(m.weight.data, gain=init_gain) | |
else: | |
raise NotImplementedError('initialization method [%s] is not implemented' % init_type) | |
if hasattr(m, 'bias') and m.bias is not None: | |
init.constant_(m.bias.data, 0.0) | |
elif classname.find( | |
'BatchNorm2d') != -1: # BatchNorm Layer's weight is not a matrix; only normal distribution applies. | |
init.normal_(m.weight.data, 1.0, init_gain) | |
init.constant_(m.bias.data, 0.0) | |
print('initialize network with %s' % init_type) | |
net.apply(init_func) # apply the initialization function <init_func> | |
def init_net(net, init_type='normal', init_gain=0.02, gpu_ids=[]): | |
"""Initialize a network: 1. register CPU/GPU device (with multi-GPU support); 2. initialize the network weights | |
Parameters: | |
net (network) -- the network to be initialized | |
init_type (str) -- the name of an initialization method: normal | xavier | kaiming | orthogonal | |
gain (float) -- scaling factor for normal, xavier and orthogonal. | |
gpu_ids (int list) -- which GPUs the network runs on: e.g., 0,1,2 | |
Return an initialized network. | |
""" | |
if len(gpu_ids) > 0: | |
assert (torch.cuda.is_available()) | |
net.to(gpu_ids[0]) | |
net = torch.nn.DataParallel(net, gpu_ids) # multi-GPUs | |
init_weights(net, init_type, init_gain=init_gain) | |
return net | |
def imageSpaceRotation(xy, rot): | |
''' | |
args: | |
xy: (B, 2, N) input | |
rot: (B, 2) x,y axis rotation angles | |
rotation center will be always image center (other rotation center can be represented by additional z translation) | |
''' | |
disp = rot.unsqueeze(2).sin().expand_as(xy) | |
return (disp * xy).sum(dim=1) | |
def cal_gradient_penalty(netD, real_data, fake_data, device, type='mixed', constant=1.0, lambda_gp=10.0): | |
"""Calculate the gradient penalty loss, used in WGAN-GP paper https://arxiv.org/abs/1704.00028 | |
Arguments: | |
netD (network) -- discriminator network | |
real_data (tensor array) -- real images | |
fake_data (tensor array) -- generated images from the generator | |
device (str) -- GPU / CPU: from torch.device('cuda:{}'.format(self.gpu_ids[0])) if self.gpu_ids else torch.device('cpu') | |
type (str) -- if we mix real and fake data or not [real | fake | mixed]. | |
constant (float) -- the constant used in formula ( | |gradient||_2 - constant)^2 | |
lambda_gp (float) -- weight for this loss | |
Returns the gradient penalty loss | |
""" | |
if lambda_gp > 0.0: | |
if type == 'real': # either use real images, fake images, or a linear interpolation of two. | |
interpolatesv = real_data | |
elif type == 'fake': | |
interpolatesv = fake_data | |
elif type == 'mixed': | |
alpha = torch.rand(real_data.shape[0], 1) | |
alpha = alpha.expand(real_data.shape[0], real_data.nelement() // real_data.shape[0]).contiguous().view( | |
*real_data.shape) | |
alpha = alpha.to(device) | |
interpolatesv = alpha * real_data + ((1 - alpha) * fake_data) | |
else: | |
raise NotImplementedError('{} not implemented'.format(type)) | |
interpolatesv.requires_grad_(True) | |
disc_interpolates = netD(interpolatesv) | |
gradients = torch.autograd.grad(outputs=disc_interpolates, inputs=interpolatesv, | |
grad_outputs=torch.ones(disc_interpolates.size()).to(device), | |
create_graph=True, retain_graph=True, only_inputs=True) | |
gradients = gradients[0].view(real_data.size(0), -1) # flat the data | |
gradient_penalty = (((gradients + 1e-16).norm(2, dim=1) - constant) ** 2).mean() * lambda_gp # added eps | |
return gradient_penalty, gradients | |
else: | |
return 0.0, None | |
def get_norm_layer(norm_type='instance'): | |
"""Return a normalization layer | |
Parameters: | |
norm_type (str) -- the name of the normalization layer: batch | instance | none | |
For BatchNorm, we use learnable affine parameters and track running statistics (mean/stddev). | |
For InstanceNorm, we do not use learnable affine parameters. We do not track running statistics. | |
""" | |
if norm_type == 'batch': | |
norm_layer = functools.partial(nn.BatchNorm2d, affine=True, track_running_stats=True) | |
elif norm_type == 'instance': | |
norm_layer = functools.partial(nn.InstanceNorm2d, affine=False, track_running_stats=False) | |
elif norm_type == 'group': | |
norm_layer = functools.partial(nn.GroupNorm, 32) | |
elif norm_type == 'none': | |
norm_layer = None | |
else: | |
raise NotImplementedError('normalization layer [%s] is not found' % norm_type) | |
return norm_layer | |
class Flatten(nn.Module): | |
def forward(self, input): | |
return input.view(input.size(0), -1) | |
class ConvBlock(nn.Module): | |
def __init__(self, in_planes, out_planes, norm='batch'): | |
super(ConvBlock, self).__init__() | |
self.conv1 = conv3x3(in_planes, int(out_planes / 2)) | |
self.conv2 = conv3x3(int(out_planes / 2), int(out_planes / 4)) | |
self.conv3 = conv3x3(int(out_planes / 4), int(out_planes / 4)) | |
if norm == 'batch': | |
self.bn1 = nn.BatchNorm2d(in_planes) | |
self.bn2 = nn.BatchNorm2d(int(out_planes / 2)) | |
self.bn3 = nn.BatchNorm2d(int(out_planes / 4)) | |
self.bn4 = nn.BatchNorm2d(in_planes) | |
elif norm == 'group': | |
self.bn1 = nn.GroupNorm(32, in_planes) | |
self.bn2 = nn.GroupNorm(32, int(out_planes / 2)) | |
self.bn3 = nn.GroupNorm(32, int(out_planes / 4)) | |
self.bn4 = nn.GroupNorm(32, in_planes) | |
if in_planes != out_planes: | |
self.downsample = nn.Sequential( | |
self.bn4, | |
nn.ReLU(True), | |
nn.Conv2d(in_planes, out_planes, | |
kernel_size=1, stride=1, bias=False), | |
) | |
else: | |
self.downsample = None | |
def forward(self, x): | |
residual = x | |
out1 = self.bn1(x) | |
out1 = F.relu(out1, True) | |
out1 = self.conv1(out1) | |
out2 = self.bn2(out1) | |
out2 = F.relu(out2, True) | |
out2 = self.conv2(out2) | |
out3 = self.bn3(out2) | |
out3 = F.relu(out3, True) | |
out3 = self.conv3(out3) | |
out3 = torch.cat((out1, out2, out3), 1) | |
if self.downsample is not None: | |
residual = self.downsample(residual) | |
out3 += residual | |
return out3 | |