Spaces:
Runtime error
Runtime error
| import os | |
| import glob | |
| import yaml | |
| import random | |
| import numpy as np | |
| import cv2 | |
| import torch | |
| import torchvision.utils as tvutils | |
| import zipfile | |
| import argparse | |
| from ..render.obj import write_obj, write_textured_obj | |
| import einops | |
| import torch.distributed as dist | |
| def str2bool(v): | |
| if isinstance(v, bool): | |
| return v | |
| if v.lower() in ('yes', 'true', 't', 'y', '1'): | |
| return True | |
| elif v.lower() in ('no', 'false', 'f', 'n', '0'): | |
| return False | |
| else: | |
| raise argparse.ArgumentTypeError('Boolean value expected.') | |
| def setup_runtime(args): | |
| """Load configs, initialize CUDA, CuDNN and the random seeds.""" | |
| # Setup CUDA | |
| cuda_device_id = args.gpu | |
| if cuda_device_id is not None: | |
| os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" | |
| os.environ["CUDA_VISIBLE_DEVICES"] = str(cuda_device_id) | |
| if torch.cuda.is_available(): | |
| torch.backends.cudnn.enabled = True | |
| torch.backends.cudnn.benchmark = True | |
| torch.backends.cudnn.deterministic = False | |
| # Setup random seeds for reproducibility | |
| random.seed(args.seed) | |
| np.random.seed(args.seed) | |
| cv2.setRNGSeed(args.seed) | |
| torch.manual_seed(args.seed) | |
| if torch.cuda.is_available(): | |
| torch.cuda.manual_seed_all(args.seed) | |
| ## Load config | |
| cfgs = {} | |
| if args.config is not None and os.path.isfile(args.config): | |
| cfgs = load_yaml(args.config) | |
| cfgs['config'] = args.config | |
| cfgs['seed'] = args.seed | |
| cfgs['num_workers'] = args.num_workers | |
| cfgs['device'] = f"cuda:{args.rank}" if torch.cuda.is_available() and cuda_device_id is not None else 'cpu' | |
| print(f"Environment: GPU {cuda_device_id} - seed {args.seed}") | |
| return cfgs | |
| def load_yaml(path): | |
| print(f"Loading configs from {path}") | |
| with open(path, 'r') as f: | |
| return yaml.safe_load(f) | |
| def dump_yaml(path, cfgs): | |
| print(f"Saving configs to {path}") | |
| xmkdir(os.path.dirname(path)) | |
| with open(path, 'w') as f: | |
| return yaml.safe_dump(cfgs, f) | |
| def xmkdir(path): | |
| """Create directory PATH recursively if it does not exist.""" | |
| os.makedirs(path, exist_ok=True) | |
| def clean_checkpoint(checkpoint_dir, keep_num=2): | |
| if keep_num > 0: | |
| names = list(sorted( | |
| glob.glob(os.path.join(checkpoint_dir, 'checkpoint*.pth')) | |
| )) | |
| if len(names) > keep_num: | |
| for name in names[:-keep_num]: | |
| print(f"Deleting obslete checkpoint file {name}") | |
| os.remove(name) | |
| def archive_code(arc_path, filetypes=['.py']): | |
| print(f"Archiving code to {arc_path}") | |
| xmkdir(os.path.dirname(arc_path)) | |
| zipf = zipfile.ZipFile(arc_path, 'w', zipfile.ZIP_DEFLATED) | |
| cur_dir = os.getcwd() | |
| flist = [] | |
| for ftype in filetypes: | |
| flist.extend(glob.glob(os.path.join(cur_dir, '[!results]*', '**', '*'+ftype), recursive=True)) # ignore results folder | |
| flist.extend(glob.glob(os.path.join(cur_dir, '*'+ftype))) | |
| [zipf.write(f, arcname=f.replace(cur_dir,'archived_code', 1)) for f in flist] | |
| zipf.close() | |
| def get_model_device(model): | |
| return next(model.parameters()).device | |
| def set_requires_grad(nets, requires_grad=False): | |
| if not isinstance(nets, list): | |
| nets = [nets] | |
| for net in nets: | |
| if net is not None: | |
| for param in net.parameters(): | |
| param.requires_grad = requires_grad | |
| def save_videos(out_fold, imgs, prefix='', suffix='', fnames=None, ext='.mp4', cycle=False): | |
| prefix = prefix + '_' if prefix else '' | |
| suffix = '_' + suffix if suffix else '' | |
| imgs = imgs.transpose(0,1,3,4,2) # BxTxCxHxW -> BxTxHxWxC | |
| for i, fs in enumerate(imgs): | |
| if cycle: | |
| fs = np.concatenate([fs, fs[::-1]], 0) | |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') | |
| # fourcc = cv2.VideoWriter_fourcc(*'avc1') | |
| out_fold_i = out_fold[i] if isinstance(out_fold, list) else out_fold | |
| xmkdir(out_fold_i) | |
| if fnames is None: | |
| idx = len(glob.glob(os.path.join(out_fold_i, prefix+'*'+suffix+ext))) +1 | |
| fname = '%07d' % idx | |
| else: | |
| fname = fnames[i] | |
| fpath = os.path.join(out_fold_i, prefix+fname+suffix+ext) | |
| vid = cv2.VideoWriter(fpath, fourcc, 5, (fs.shape[2], fs.shape[1])) | |
| [vid.write(np.uint8(f[...,::-1]*255.)) for f in fs] | |
| vid.release() | |
| def save_images(out_fold, imgs, prefix='', suffix='', fnames=None, ext='.png'): | |
| prefix = prefix + '_' if prefix else '' | |
| suffix = '_' + suffix if suffix else '' | |
| imgs = imgs.transpose(0,2,3,1) | |
| for i, img in enumerate(imgs): | |
| img = np.concatenate([np.flip(img[...,:3], -1), img[...,3:]], -1) # RGBA to BGRA | |
| if 'depth' in suffix: | |
| im_out = np.uint16(img*65535.) | |
| else: | |
| im_out = np.uint8(img*255.) | |
| out_fold_i = out_fold[i] if isinstance(out_fold, list) else out_fold | |
| xmkdir(out_fold_i) | |
| if fnames is None: | |
| idx = len(glob.glob(os.path.join(out_fold_i, prefix+'*'+suffix+ext))) +1 | |
| fname = '%07d' % idx | |
| else: | |
| fname = fnames[i] | |
| fpath = os.path.join(out_fold_i, prefix+fname+suffix+ext) | |
| cv2.imwrite(fpath, im_out) | |
| def save_txt(out_fold, data, prefix='', suffix='', fnames=None, ext='.txt', fmt='%.6f'): | |
| prefix = prefix + '_' if prefix else '' | |
| suffix = '_' + suffix if suffix else '' | |
| for i, d in enumerate(data): | |
| out_fold_i = out_fold[i] if isinstance(out_fold, list) else out_fold | |
| xmkdir(out_fold_i) | |
| if fnames is None: | |
| idx = len(glob.glob(os.path.join(out_fold_i, prefix+'*'+suffix+ext))) +1 | |
| fname = '%07d' % idx | |
| else: | |
| fname = fnames[i] | |
| fpath = os.path.join(out_fold_i, prefix+fname+suffix+ext) | |
| np.savetxt(fpath, d, fmt=fmt, delimiter=', ') | |
| def save_obj(out_fold, meshes=None, save_material=True, feat=None, prefix='', suffix='', fnames=None, resolution=[256, 256], prior_shape=None): | |
| prefix = prefix + '_' if prefix else '' | |
| suffix = '_' + suffix if suffix else '' | |
| if meshes.v_pos is None: | |
| return | |
| batch_size = meshes.v_pos.shape[0] | |
| for i in range(batch_size): | |
| out_fold_i = out_fold[i] if isinstance(out_fold, list) else out_fold | |
| xmkdir(out_fold_i) | |
| if fnames is None: | |
| idx = len(glob.glob(os.path.join(out_fold_i, prefix+'*'+suffix+".obj"))) + 1 | |
| fname = '%07d' % idx | |
| else: | |
| fname = fnames[i] | |
| if save_material: | |
| os.makedirs(os.path.join(out_fold_i, fname), exist_ok=True) | |
| write_textured_obj(out_fold_i, f'{fname}/{prefix+suffix}', meshes, i, save_material=save_material, feat=feat, resolution=resolution, prior_shape=prior_shape) | |
| else: | |
| write_obj(out_fold_i, prefix+fname+suffix, meshes, i, save_material=False, feat=feat, resolution=resolution) | |
| def compute_sc_inv_err(d_pred, d_gt, mask=None): | |
| b = d_pred.size(0) | |
| diff = d_pred - d_gt | |
| if mask is not None: | |
| diff = diff * mask | |
| avg = diff.view(b, -1).sum(1) / (mask.view(b, -1).sum(1)) | |
| score = (diff - avg.view(b,1,1))**2 * mask | |
| else: | |
| avg = diff.view(b, -1).mean(1) | |
| score = (diff - avg.view(b,1,1))**2 | |
| return score # masked error maps | |
| def compute_angular_distance(n1, n2, mask=None): | |
| dist = (n1*n2).sum(3).clamp(-1,1).acos() /np.pi*180 | |
| return dist*mask if mask is not None else dist | |
| def save_scores(out_path, scores, header=''): | |
| print('Saving scores to %s' %out_path) | |
| np.savetxt(out_path, scores, fmt='%.8f', delimiter=',\t', header=header) | |
| def image_grid(tensor, nrow=None): | |
| # check if list -> stack to numpy array | |
| if isinstance(tensor, list): | |
| tensor = np.stack(tensor, 0) | |
| # check if numpy array -> convert to torch tensor and swap axes | |
| if isinstance(tensor, np.ndarray): | |
| tensor = torch.from_numpy(tensor).permute(0, 3, 1, 2) | |
| b, c, h, w = tensor.shape | |
| if nrow is None: | |
| nrow = int(np.ceil(b**0.5)) | |
| if c == 1: | |
| tensor = tensor.repeat(1, 3, 1, 1) | |
| tensor = tvutils.make_grid(tensor, nrow=nrow, normalize=False) | |
| return tensor | |
| def video_grid(tensor, nrow=None): | |
| return torch.stack([image_grid(t, nrow=nrow) for t in tensor.unbind(1)], 0) | |
| class LazyClass(object): | |
| def __init__(self, cls, *args, **kwargs): | |
| self.cls = cls | |
| self.args = args | |
| self.kwargs = kwargs | |
| self.instance = None | |
| def get_instance(self): | |
| if self.instance is None: | |
| self.instance = self.cls(*self.args, **self.kwargs) | |
| return self.instance | |
| def __call__(self, *args, **kwargs): | |
| return self.get_instance()(*args, **kwargs) | |
| def __getattribute__(self, name): | |
| if name in ['cls', 'args', 'kwargs', 'instance', 'get_instance']: | |
| return super().__getattribute__(name) | |
| else: | |
| return getattr(self.get_instance(), name) | |
| def add_text_to_image(img, text, pos=(12, 12), color=(1, 1, 1), font_scale=1, thickness=2): | |
| if isinstance(img, torch.Tensor): | |
| img = img.permute(1,2,0).cpu().numpy() | |
| # if grayscale -> convert to RGB | |
| if img.shape[2] == 1: | |
| img = np.repeat(img, 3, 2) | |
| img = cv2.putText(np.ascontiguousarray(img), text, pos, cv2.FONT_HERSHEY_SIMPLEX, font_scale, color, thickness) | |
| return img | |
| def image_grid_multi_channel(tensor, pca=False, texts=None, font_scale=0.5): | |
| """ | |
| visualize multi-channel image, each channel is a different greyscale image | |
| tensor: (b, c, h, w) | |
| texts: list of strings of length b | |
| """ | |
| # rescale to [0, 1] for per each sample in batch | |
| tensor = tensor.detach().cpu() | |
| min_ = einops.reduce(tensor, 'b c h w -> b 1 1 1', 'min') | |
| max_ = einops.reduce(tensor, 'b c h w -> b 1 1 1', 'max') | |
| tensor = (tensor - min_) / (max_ - min_) | |
| if pca: | |
| import faiss | |
| (b, c, h, w) = tensor.shape | |
| # reshape the tensor to (b, c*h*w) | |
| # tensor = tensor.reshape(b, c*h*w) | |
| tensor_flat = einops.rearrange(tensor, 'b c h w -> (b h w) c') | |
| pca_mat = faiss.PCAMatrix(c, 3) | |
| pca_mat.train(tensor_flat.numpy()) | |
| assert pca_mat.is_trained | |
| tensor_flat_pca = pca_mat.apply_py(tensor_flat.numpy()) | |
| tensor = einops.rearrange(tensor_flat_pca, '(b h w) c -> b h w c', b=b, c=3, h=h, w=w) | |
| else: | |
| tensor = einops.rearrange(tensor, 'b c h w -> (b c) 1 h w') | |
| if texts is not None: | |
| # duplicate texts for each channel | |
| texts = [text for text in texts for _ in range(tensor.shape[0] // len(texts))] | |
| tensor = [add_text_to_image(img, text, font_scale=font_scale) for img, text in zip(tensor, texts)] | |
| return image_grid(tensor) | |
| ########## DDP Part Taken from: https://github.com/fundamentalvision/Deformable-DETR/blob/main/util/misc.py | |
| def is_main_process(): | |
| return get_rank() == 0 | |
| def get_rank(): | |
| if not is_dist_avail_and_initialized(): | |
| return 0 | |
| return dist.get_rank() | |
| def is_dist_avail_and_initialized(): | |
| if not dist.is_available(): | |
| return False | |
| if not dist.is_initialized(): | |
| return False | |
| return True | |