import os
import glob
import yaml
import random
import numpy as np
import cv2
import torch
import torchvision.utils as tvutils
import zipfile
import argparse
from ..render.obj import write_obj, write_textured_obj
import einops
import torch.distributed as dist
def str2bool(v):
if isinstance(v, bool):
return v
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
return False
raise argparse.ArgumentTypeError('Boolean value expected.')
def setup_runtime(args):
"""Load configs, initialize CUDA, CuDNN and the random seeds."""
# Setup CUDA
cuda_device_id = args.gpu
if cuda_device_id is not None:
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"] = str(cuda_device_id)
if torch.cuda.is_available():
torch.backends.cudnn.enabled = True
torch.backends.cudnn.benchmark = True
torch.backends.cudnn.deterministic = False
# Setup random seeds for reproducibility
if torch.cuda.is_available():
## Load config
cfgs = {}
if args.config is not None and os.path.isfile(args.config):
cfgs = load_yaml(args.config)
cfgs['config'] = args.config
cfgs['seed'] = args.seed
cfgs['num_workers'] = args.num_workers
cfgs['device'] = f"cuda:{args.rank}" if torch.cuda.is_available() and cuda_device_id is not None else 'cpu'
print(f"Environment: GPU {cuda_device_id} - seed {args.seed}")
return cfgs
def load_yaml(path):
print(f"Loading configs from {path}")
with open(path, 'r') as f:
return yaml.safe_load(f)
def dump_yaml(path, cfgs):
print(f"Saving configs to {path}")
with open(path, 'w') as f:
return yaml.safe_dump(cfgs, f)
def xmkdir(path):
"""Create directory PATH recursively if it does not exist."""
os.makedirs(path, exist_ok=True)
def clean_checkpoint(checkpoint_dir, keep_num=2):
if keep_num > 0:
names = list(sorted(
glob.glob(os.path.join(checkpoint_dir, 'checkpoint*.pth'))
if len(names) > keep_num:
for name in names[:-keep_num]:
print(f"Deleting obslete checkpoint file {name}")
def archive_code(arc_path, filetypes=['.py']):
print(f"Archiving code to {arc_path}")
zipf = zipfile.ZipFile(arc_path, 'w', zipfile.ZIP_DEFLATED)
cur_dir = os.getcwd()
flist = []
for ftype in filetypes:
flist.extend(glob.glob(os.path.join(cur_dir, '[!results]*', '**', '*'+ftype), recursive=True)) # ignore results folder
flist.extend(glob.glob(os.path.join(cur_dir, '*'+ftype)))
[zipf.write(f, arcname=f.replace(cur_dir,'archived_code', 1)) for f in flist]
def get_model_device(model):
return next(model.parameters()).device
def set_requires_grad(nets, requires_grad=False):
if not isinstance(nets, list):
nets = [nets]
for net in nets:
if net is not None:
for param in net.parameters():
param.requires_grad = requires_grad
def save_videos(out_fold, imgs, prefix='', suffix='', fnames=None, ext='.mp4', cycle=False):
prefix = prefix + '_' if prefix else ''
suffix = '_' + suffix if suffix else ''
imgs = imgs.transpose(0,1,3,4,2) # BxTxCxHxW -> BxTxHxWxC
for i, fs in enumerate(imgs):
if cycle:
fs = np.concatenate([fs, fs[::-1]], 0)
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
# fourcc = cv2.VideoWriter_fourcc(*'avc1')
out_fold_i = out_fold[i] if isinstance(out_fold, list) else out_fold
if fnames is None:
idx = len(glob.glob(os.path.join(out_fold_i, prefix+'*'+suffix+ext))) +1
fname = '%07d' % idx
fname = fnames[i]
fpath = os.path.join(out_fold_i, prefix+fname+suffix+ext)
vid = cv2.VideoWriter(fpath, fourcc, 5, (fs.shape[2], fs.shape[1]))
[vid.write(np.uint8(f[...,::-1]*255.)) for f in fs]
def save_images(out_fold, imgs, prefix='', suffix='', fnames=None, ext='.png'):
prefix = prefix + '_' if prefix else ''
suffix = '_' + suffix if suffix else ''
imgs = imgs.transpose(0,2,3,1)
for i, img in enumerate(imgs):
img = np.concatenate([np.flip(img[...,:3], -1), img[...,3:]], -1) # RGBA to BGRA
if 'depth' in suffix:
im_out = np.uint16(img*65535.)
im_out = np.uint8(img*255.)
out_fold_i = out_fold[i] if isinstance(out_fold, list) else out_fold
if fnames is None:
idx = len(glob.glob(os.path.join(out_fold_i, prefix+'*'+suffix+ext))) +1
fname = '%07d' % idx
fname = fnames[i]
fpath = os.path.join(out_fold_i, prefix+fname+suffix+ext)
cv2.imwrite(fpath, im_out)
def save_txt(out_fold, data, prefix='', suffix='', fnames=None, ext='.txt', fmt='%.6f'):
prefix = prefix + '_' if prefix else ''
suffix = '_' + suffix if suffix else ''
for i, d in enumerate(data):
out_fold_i = out_fold[i] if isinstance(out_fold, list) else out_fold
if fnames is None:
idx = len(glob.glob(os.path.join(out_fold_i, prefix+'*'+suffix+ext))) +1
fname = '%07d' % idx
fname = fnames[i]
fpath = os.path.join(out_fold_i, prefix+fname+suffix+ext)
np.savetxt(fpath, d, fmt=fmt, delimiter=', ')
def save_obj(out_fold, meshes=None, save_material=True, feat=None, prefix='', suffix='', fnames=None, resolution=[256, 256], prior_shape=None):
prefix = prefix + '_' if prefix else ''
suffix = '_' + suffix if suffix else ''
if meshes.v_pos is None:
batch_size = meshes.v_pos.shape[0]
for i in range(batch_size):
out_fold_i = out_fold[i] if isinstance(out_fold, list) else out_fold
if fnames is None:
idx = len(glob.glob(os.path.join(out_fold_i, prefix+'*'+suffix+".obj"))) + 1
fname = '%07d' % idx
fname = fnames[i]
if save_material:
os.makedirs(os.path.join(out_fold_i, fname), exist_ok=True)
write_textured_obj(out_fold_i, f'{fname}/{prefix+suffix}', meshes, i, save_material=save_material, feat=feat, resolution=resolution, prior_shape=prior_shape)
write_obj(out_fold_i, prefix+fname+suffix, meshes, i, save_material=False, feat=feat, resolution=resolution)
def compute_sc_inv_err(d_pred, d_gt, mask=None):
b = d_pred.size(0)
diff = d_pred - d_gt
if mask is not None:
diff = diff * mask
avg = diff.view(b, -1).sum(1) / (mask.view(b, -1).sum(1))
score = (diff - avg.view(b,1,1))**2 * mask
avg = diff.view(b, -1).mean(1)
score = (diff - avg.view(b,1,1))**2
return score # masked error maps
def compute_angular_distance(n1, n2, mask=None):
dist = (n1*n2).sum(3).clamp(-1,1).acos() /np.pi*180
return dist*mask if mask is not None else dist
def save_scores(out_path, scores, header=''):
print('Saving scores to %s' %out_path)
np.savetxt(out_path, scores, fmt='%.8f', delimiter=',\t', header=header)
def image_grid(tensor, nrow=None):
# check if list -> stack to numpy array
if isinstance(tensor, list):
tensor = np.stack(tensor, 0)
# check if numpy array -> convert to torch tensor and swap axes
if isinstance(tensor, np.ndarray):
tensor = torch.from_numpy(tensor).permute(0, 3, 1, 2)
b, c, h, w = tensor.shape
if nrow is None:
nrow = int(np.ceil(b**0.5))
if c == 1:
tensor = tensor.repeat(1, 3, 1, 1)
tensor = tvutils.make_grid(tensor, nrow=nrow, normalize=False)
return tensor
def video_grid(tensor, nrow=None):
return torch.stack([image_grid(t, nrow=nrow) for t in tensor.unbind(1)], 0)
class LazyClass(object):
def __init__(self, cls, *args, **kwargs):
self.cls = cls
self.args = args
self.kwargs = kwargs
self.instance = None
def get_instance(self):
if self.instance is None:
self.instance = self.cls(*self.args, **self.kwargs)
return self.instance
def __call__(self, *args, **kwargs):
return self.get_instance()(*args, **kwargs)
def __getattribute__(self, name):
if name in ['cls', 'args', 'kwargs', 'instance', 'get_instance']:
return super().__getattribute__(name)
return getattr(self.get_instance(), name)
def add_text_to_image(img, text, pos=(12, 12), color=(1, 1, 1), font_scale=1, thickness=2):
if isinstance(img, torch.Tensor):
img = img.permute(1,2,0).cpu().numpy()
# if grayscale -> convert to RGB
if img.shape[2] == 1:
img = np.repeat(img, 3, 2)
img = cv2.putText(np.ascontiguousarray(img), text, pos, cv2.FONT_HERSHEY_SIMPLEX, font_scale, color, thickness)
return img
def image_grid_multi_channel(tensor, pca=False, texts=None, font_scale=0.5):
visualize multi-channel image, each channel is a different greyscale image
tensor: (b, c, h, w)
texts: list of strings of length b
# rescale to [0, 1] for per each sample in batch
tensor = tensor.detach().cpu()
min_ = einops.reduce(tensor, 'b c h w -> b 1 1 1', 'min')
max_ = einops.reduce(tensor, 'b c h w -> b 1 1 1', 'max')
tensor = (tensor - min_) / (max_ - min_)
if pca:
import faiss
(b, c, h, w) = tensor.shape
# reshape the tensor to (b, c*h*w)
# tensor = tensor.reshape(b, c*h*w)
tensor_flat = einops.rearrange(tensor, 'b c h w -> (b h w) c')
pca_mat = faiss.PCAMatrix(c, 3)
assert pca_mat.is_trained
tensor_flat_pca = pca_mat.apply_py(tensor_flat.numpy())
tensor = einops.rearrange(tensor_flat_pca, '(b h w) c -> b h w c', b=b, c=3, h=h, w=w)
tensor = einops.rearrange(tensor, 'b c h w -> (b c) 1 h w')
if texts is not None:
# duplicate texts for each channel
texts = [text for text in texts for _ in range(tensor.shape[0] // len(texts))]
tensor = [add_text_to_image(img, text, font_scale=font_scale) for img, text in zip(tensor, texts)]
return image_grid(tensor)
########## DDP Part Taken from:
def is_main_process():
return get_rank() == 0
def get_rank():
if not is_dist_avail_and_initialized():
return 0
return dist.get_rank()
def is_dist_avail_and_initialized():
if not dist.is_available():
return False
if not dist.is_initialized():
return False
return True