# Copyright Niantic 2021. Patent Pending. All rights reserved. # # This software is licensed under the terms of the ManyDepth licence # which allows for non-commercial use only, the full terms of which are made # available in the LICENSE file. import os import time import json import argparse import numpy as np from PIL import Image import matplotlib as mpl import matplotlib.cm as cm import torch from torch import Tensor import torchvision from torchvision import transforms import torch.nn.functional as F from src.networks import * from utils import transformation_from_parameters, disp_to_depth, line def load_and_preprocess_image(image, resize_width, resize_height): image_ori = image.convert('RGB') W, H = image_ori.size W_resized = W - W % 32 H_resized = H - H % 32 img_ori_npy = np.array(image_ori)[0:H_resized, 0:W_resized] image = image_ori.resize((resize_width, resize_height), Image.Resampling.LANCZOS) image = transforms.ToTensor()(image) image_ori = transforms.ToTensor()(img_ori_npy).unsqueeze(0) image = line(image).unsqueeze(0) if torch.cuda.is_available(): return image_ori.cuda(), image.cuda(), (H, W) return image_ori, image, (H, W) def load_and_preprocess_intrinsics(intrinsics_path, resize_width, resize_height): K = np.eye(4) with open(intrinsics_path, 'r') as f: K[:3, :3] = np.array(json.load(f)) # Convert normalised intrinsics to 1/4 size unnormalised intrinsics. # (The cost volume construction expects the intrinsics corresponding to 1/4 size images) K[0, :] *= resize_width // 4 K[1, :] *= resize_height // 4 invK = torch.Tensor(np.linalg.pinv(K)).unsqueeze(0) K = torch.Tensor(K).unsqueeze(0) if torch.cuda.is_available(): return K.cuda(), invK.cuda() return K, invK def tensor2img(img: Tensor) -> np.ndarray: return (255.0 * img.permute(1, 2, 0).cpu().detach().numpy()).astype(np.uint8) def test_simple(image: Image): """Function to predict for a single image or folder of images """ device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") # Loading pretrained model encoder_dict = torch.load("src/weights/encoder.pth", map_location=device) encoder = ResnetEncoderMatching(18, False, input_width=encoder_dict['width'], input_height=encoder_dict['height'], adaptive_bins=True, min_depth_bin=encoder_dict['min_depth_bin'], max_depth_bin=encoder_dict['max_depth_bin'], depth_binning='linear', num_depth_bins=96) filtered_dict_enc = {k: v for k, v in encoder_dict.items() if k in encoder.state_dict()} encoder.load_state_dict(filtered_dict_enc) depth_decoder = DepthDecoder(num_ch_enc=encoder.num_ch_enc, scales=range(4)) loaded_dict = torch.load("src/weights/depth.pth", map_location=device) depth_decoder.load_state_dict(loaded_dict) pose_enc_dict = torch.load("src/weights/pose_encoder.pth", map_location=device) pose_dec_dict = torch.load("src/weights/pose.pth", map_location=device) pose_enc = ResnetEncoder(18, False, num_input_images=2) pose_dec = PoseDecoder(pose_enc.num_ch_enc, num_input_features=1, num_frames_to_predict_for=2) pose_enc.load_state_dict(pose_enc_dict, strict=True) pose_dec.load_state_dict(pose_dec_dict, strict=True) restoration_dict = torch.load("src/weights/uie_model.pth", map_location=device) uie_model = MainModel() uie_model.load_state_dict(restoration_dict, strict=False) # Setting states of networks encoder.eval() depth_decoder.eval() pose_enc.eval() pose_dec.eval() uie_model.eval() if torch.cuda.is_available(): encoder.cuda() depth_decoder.cuda() pose_enc.cuda() pose_dec.cuda() uie_model.cuda() # Load input data input_image_ori, input_image, original_size = load_and_preprocess_image(image, resize_width=encoder_dict['width'], resize_height=encoder_dict['height']) source_image_ori, source_image, _ = load_and_preprocess_image(image, resize_width=encoder_dict['width'], resize_height=encoder_dict['height']) K, invK = load_and_preprocess_intrinsics('canyons_intrinsics.json', resize_width=encoder_dict['width'], resize_height=encoder_dict['height']) with torch.no_grad(): # Estimate poses pose_inputs = [source_image, input_image] pose_inputs = [pose_enc(torch.cat(pose_inputs, 1))] axisangle, translation = pose_dec(pose_inputs) pose = transformation_from_parameters(axisangle[:, 0], translation[:, 0], invert=True) pose *= 0 # zero poses are a signal to the encoder not to construct a cost volume source_image *= 0 # Estimate depth output, lowest_cost, _ = encoder(current_image=input_image, lookup_images=source_image.unsqueeze(1), poses=pose.unsqueeze(1), K=K, invK=invK, min_depth_bin=encoder_dict['min_depth_bin'], max_depth_bin=encoder_dict['max_depth_bin']) output = depth_decoder(output) sigmoid_output = output[("disp", 0)] _, depth_output = disp_to_depth(sigmoid_output, min_depth=0.1, max_depth=20) sigmoid_output_resized = F.interpolate( sigmoid_output, original_size, mode="bilinear", align_corners=False) sigmoid_output_resized = sigmoid_output_resized.cpu().numpy()[:, 0] depth = F.interpolate( depth_output, input_image_ori.shape[2:], mode="bilinear", align_corners=False) beta, J, A = uie_model(input_image_ori) beta[0] = 5.0 * beta[0] beta[1] = 5.0 * beta[1] t1 = torch.exp(-beta[0] * depth) D1 = J * t1 B1 = (1 - torch.exp(-beta[1] * depth)) * A I_rec = D1 + B1 J_out = Image.open(tensor2img(J[0])) return J_out