Spaces:
Sleeping
Sleeping
import os | |
import glob | |
import yaml | |
import random | |
import numpy as np | |
import cv2 | |
import torch | |
import torchvision.utils as tvutils | |
import zipfile | |
import argparse | |
from ..render.obj import write_obj, write_textured_obj | |
import einops | |
import torch.distributed as dist | |
def str2bool(v): | |
if isinstance(v, bool): | |
return v | |
if v.lower() in ('yes', 'true', 't', 'y', '1'): | |
return True | |
elif v.lower() in ('no', 'false', 'f', 'n', '0'): | |
return False | |
else: | |
raise argparse.ArgumentTypeError('Boolean value expected.') | |
def setup_runtime(args): | |
"""Load configs, initialize CUDA, CuDNN and the random seeds.""" | |
# Setup CUDA | |
cuda_device_id = args.gpu | |
if cuda_device_id is not None: | |
os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" | |
os.environ["CUDA_VISIBLE_DEVICES"] = str(cuda_device_id) | |
if torch.cuda.is_available(): | |
torch.backends.cudnn.enabled = True | |
torch.backends.cudnn.benchmark = True | |
torch.backends.cudnn.deterministic = False | |
# Setup random seeds for reproducibility | |
random.seed(args.seed) | |
np.random.seed(args.seed) | |
cv2.setRNGSeed(args.seed) | |
torch.manual_seed(args.seed) | |
if torch.cuda.is_available(): | |
torch.cuda.manual_seed_all(args.seed) | |
## Load config | |
cfgs = {} | |
if args.config is not None and os.path.isfile(args.config): | |
cfgs = load_yaml(args.config) | |
cfgs['config'] = args.config | |
cfgs['seed'] = args.seed | |
cfgs['num_workers'] = args.num_workers | |
cfgs['device'] = f"cuda:{args.rank}" if torch.cuda.is_available() and cuda_device_id is not None else 'cpu' | |
print(f"Environment: GPU {cuda_device_id} - seed {args.seed}") | |
return cfgs | |
def load_yaml(path): | |
print(f"Loading configs from {path}") | |
with open(path, 'r') as f: | |
return yaml.safe_load(f) | |
def dump_yaml(path, cfgs): | |
print(f"Saving configs to {path}") | |
xmkdir(os.path.dirname(path)) | |
with open(path, 'w') as f: | |
return yaml.safe_dump(cfgs, f) | |
def xmkdir(path): | |
"""Create directory PATH recursively if it does not exist.""" | |
os.makedirs(path, exist_ok=True) | |
def clean_checkpoint(checkpoint_dir, keep_num=2): | |
if keep_num > 0: | |
names = list(sorted( | |
glob.glob(os.path.join(checkpoint_dir, 'checkpoint*.pth')) | |
)) | |
if len(names) > keep_num: | |
for name in names[:-keep_num]: | |
print(f"Deleting obslete checkpoint file {name}") | |
os.remove(name) | |
def archive_code(arc_path, filetypes=['.py']): | |
print(f"Archiving code to {arc_path}") | |
xmkdir(os.path.dirname(arc_path)) | |
zipf = zipfile.ZipFile(arc_path, 'w', zipfile.ZIP_DEFLATED) | |
cur_dir = os.getcwd() | |
flist = [] | |
for ftype in filetypes: | |
flist.extend(glob.glob(os.path.join(cur_dir, '[!results]*', '**', '*'+ftype), recursive=True)) # ignore results folder | |
flist.extend(glob.glob(os.path.join(cur_dir, '*'+ftype))) | |
[zipf.write(f, arcname=f.replace(cur_dir,'archived_code', 1)) for f in flist] | |
zipf.close() | |
def get_model_device(model): | |
return next(model.parameters()).device | |
def set_requires_grad(nets, requires_grad=False): | |
if not isinstance(nets, list): | |
nets = [nets] | |
for net in nets: | |
if net is not None: | |
for param in net.parameters(): | |
param.requires_grad = requires_grad | |
def save_videos(out_fold, imgs, prefix='', suffix='', fnames=None, ext='.mp4', cycle=False): | |
prefix = prefix + '_' if prefix else '' | |
suffix = '_' + suffix if suffix else '' | |
imgs = imgs.transpose(0,1,3,4,2) # BxTxCxHxW -> BxTxHxWxC | |
for i, fs in enumerate(imgs): | |
if cycle: | |
fs = np.concatenate([fs, fs[::-1]], 0) | |
fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
# fourcc = cv2.VideoWriter_fourcc(*'avc1') | |
out_fold_i = out_fold[i] if isinstance(out_fold, list) else out_fold | |
xmkdir(out_fold_i) | |
if fnames is None: | |
idx = len(glob.glob(os.path.join(out_fold_i, prefix+'*'+suffix+ext))) +1 | |
fname = '%07d' % idx | |
else: | |
fname = fnames[i] | |
fpath = os.path.join(out_fold_i, prefix+fname+suffix+ext) | |
vid = cv2.VideoWriter(fpath, fourcc, 5, (fs.shape[2], fs.shape[1])) | |
[vid.write(np.uint8(f[...,::-1]*255.)) for f in fs] | |
vid.release() | |
def save_images(out_fold, imgs, prefix='', suffix='', fnames=None, ext='.png'): | |
prefix = prefix + '_' if prefix else '' | |
suffix = '_' + suffix if suffix else '' | |
imgs = imgs.transpose(0,2,3,1) | |
for i, img in enumerate(imgs): | |
img = np.concatenate([np.flip(img[...,:3], -1), img[...,3:]], -1) # RGBA to BGRA | |
if 'depth' in suffix: | |
im_out = np.uint16(img*65535.) | |
else: | |
im_out = np.uint8(img*255.) | |
out_fold_i = out_fold[i] if isinstance(out_fold, list) else out_fold | |
xmkdir(out_fold_i) | |
if fnames is None: | |
idx = len(glob.glob(os.path.join(out_fold_i, prefix+'*'+suffix+ext))) +1 | |
fname = '%07d' % idx | |
else: | |
fname = fnames[i] | |
fpath = os.path.join(out_fold_i, prefix+fname+suffix+ext) | |
cv2.imwrite(fpath, im_out) | |
def save_txt(out_fold, data, prefix='', suffix='', fnames=None, ext='.txt', fmt='%.6f'): | |
prefix = prefix + '_' if prefix else '' | |
suffix = '_' + suffix if suffix else '' | |
for i, d in enumerate(data): | |
out_fold_i = out_fold[i] if isinstance(out_fold, list) else out_fold | |
xmkdir(out_fold_i) | |
if fnames is None: | |
idx = len(glob.glob(os.path.join(out_fold_i, prefix+'*'+suffix+ext))) +1 | |
fname = '%07d' % idx | |
else: | |
fname = fnames[i] | |
fpath = os.path.join(out_fold_i, prefix+fname+suffix+ext) | |
np.savetxt(fpath, d, fmt=fmt, delimiter=', ') | |
def save_obj(out_fold, meshes=None, save_material=True, feat=None, prefix='', suffix='', fnames=None, resolution=[256, 256], prior_shape=None): | |
prefix = prefix + '_' if prefix else '' | |
suffix = '_' + suffix if suffix else '' | |
if meshes.v_pos is None: | |
return | |
batch_size = meshes.v_pos.shape[0] | |
for i in range(batch_size): | |
out_fold_i = out_fold[i] if isinstance(out_fold, list) else out_fold | |
xmkdir(out_fold_i) | |
if fnames is None: | |
idx = len(glob.glob(os.path.join(out_fold_i, prefix+'*'+suffix+".obj"))) + 1 | |
fname = '%07d' % idx | |
else: | |
fname = fnames[i] | |
if save_material: | |
os.makedirs(os.path.join(out_fold_i, fname), exist_ok=True) | |
write_textured_obj(out_fold_i, f'{fname}/{prefix+suffix}', meshes, i, save_material=save_material, feat=feat, resolution=resolution, prior_shape=prior_shape) | |
else: | |
write_obj(out_fold_i, prefix+fname+suffix, meshes, i, save_material=False, feat=feat, resolution=resolution) | |
def compute_sc_inv_err(d_pred, d_gt, mask=None): | |
b = d_pred.size(0) | |
diff = d_pred - d_gt | |
if mask is not None: | |
diff = diff * mask | |
avg = diff.view(b, -1).sum(1) / (mask.view(b, -1).sum(1)) | |
score = (diff - avg.view(b,1,1))**2 * mask | |
else: | |
avg = diff.view(b, -1).mean(1) | |
score = (diff - avg.view(b,1,1))**2 | |
return score # masked error maps | |
def compute_angular_distance(n1, n2, mask=None): | |
dist = (n1*n2).sum(3).clamp(-1,1).acos() /np.pi*180 | |
return dist*mask if mask is not None else dist | |
def save_scores(out_path, scores, header=''): | |
print('Saving scores to %s' %out_path) | |
np.savetxt(out_path, scores, fmt='%.8f', delimiter=',\t', header=header) | |
def image_grid(tensor, nrow=None): | |
# check if list -> stack to numpy array | |
if isinstance(tensor, list): | |
tensor = np.stack(tensor, 0) | |
# check if numpy array -> convert to torch tensor and swap axes | |
if isinstance(tensor, np.ndarray): | |
tensor = torch.from_numpy(tensor).permute(0, 3, 1, 2) | |
b, c, h, w = tensor.shape | |
if nrow is None: | |
nrow = int(np.ceil(b**0.5)) | |
if c == 1: | |
tensor = tensor.repeat(1, 3, 1, 1) | |
tensor = tvutils.make_grid(tensor, nrow=nrow, normalize=False) | |
return tensor | |
def video_grid(tensor, nrow=None): | |
return torch.stack([image_grid(t, nrow=nrow) for t in tensor.unbind(1)], 0) | |
class LazyClass(object): | |
def __init__(self, cls, *args, **kwargs): | |
self.cls = cls | |
self.args = args | |
self.kwargs = kwargs | |
self.instance = None | |
def get_instance(self): | |
if self.instance is None: | |
self.instance = self.cls(*self.args, **self.kwargs) | |
return self.instance | |
def __call__(self, *args, **kwargs): | |
return self.get_instance()(*args, **kwargs) | |
def __getattribute__(self, name): | |
if name in ['cls', 'args', 'kwargs', 'instance', 'get_instance']: | |
return super().__getattribute__(name) | |
else: | |
return getattr(self.get_instance(), name) | |
def add_text_to_image(img, text, pos=(12, 12), color=(1, 1, 1), font_scale=1, thickness=2): | |
if isinstance(img, torch.Tensor): | |
img = img.permute(1,2,0).cpu().numpy() | |
# if grayscale -> convert to RGB | |
if img.shape[2] == 1: | |
img = np.repeat(img, 3, 2) | |
img = cv2.putText(np.ascontiguousarray(img), text, pos, cv2.FONT_HERSHEY_SIMPLEX, font_scale, color, thickness) | |
return img | |
def image_grid_multi_channel(tensor, pca=False, texts=None, font_scale=0.5): | |
""" | |
visualize multi-channel image, each channel is a different greyscale image | |
tensor: (b, c, h, w) | |
texts: list of strings of length b | |
""" | |
# rescale to [0, 1] for per each sample in batch | |
tensor = tensor.detach().cpu() | |
min_ = einops.reduce(tensor, 'b c h w -> b 1 1 1', 'min') | |
max_ = einops.reduce(tensor, 'b c h w -> b 1 1 1', 'max') | |
tensor = (tensor - min_) / (max_ - min_) | |
if pca: | |
import faiss | |
(b, c, h, w) = tensor.shape | |
# reshape the tensor to (b, c*h*w) | |
# tensor = tensor.reshape(b, c*h*w) | |
tensor_flat = einops.rearrange(tensor, 'b c h w -> (b h w) c') | |
pca_mat = faiss.PCAMatrix(c, 3) | |
pca_mat.train(tensor_flat.numpy()) | |
assert pca_mat.is_trained | |
tensor_flat_pca = pca_mat.apply_py(tensor_flat.numpy()) | |
tensor = einops.rearrange(tensor_flat_pca, '(b h w) c -> b h w c', b=b, c=3, h=h, w=w) | |
else: | |
tensor = einops.rearrange(tensor, 'b c h w -> (b c) 1 h w') | |
if texts is not None: | |
# duplicate texts for each channel | |
texts = [text for text in texts for _ in range(tensor.shape[0] // len(texts))] | |
tensor = [add_text_to_image(img, text, font_scale=font_scale) for img, text in zip(tensor, texts)] | |
return image_grid(tensor) | |
########## DDP Part Taken from: https://github.com/fundamentalvision/Deformable-DETR/blob/main/util/misc.py | |
def is_main_process(): | |
return get_rank() == 0 | |
def get_rank(): | |
if not is_dist_avail_and_initialized(): | |
return 0 | |
return dist.get_rank() | |
def is_dist_avail_and_initialized(): | |
if not dist.is_available(): | |
return False | |
if not dist.is_initialized(): | |
return False | |
return True | |