Spaces:
Runtime error
Runtime error
# Copyright Niantic 2021. Patent Pending. All rights reserved. | |
# | |
# This software is licensed under the terms of the ManyDepth licence | |
# which allows for non-commercial use only, the full terms of which are made | |
# available in the LICENSE file. | |
import os | |
import time | |
import json | |
import argparse | |
import numpy as np | |
from PIL import Image | |
import matplotlib as mpl | |
import matplotlib.cm as cm | |
import torch | |
from torch import Tensor | |
import torchvision | |
from torchvision import transforms | |
import torch.nn.functional as F | |
from src.networks import * | |
from utils import transformation_from_parameters, disp_to_depth, line | |
def load_and_preprocess_image(image, resize_width, resize_height): | |
image_ori = image.convert('RGB') | |
W, H = image_ori.size | |
W_resized = W - W % 32 | |
H_resized = H - H % 32 | |
img_ori_npy = np.array(image_ori)[0:H_resized, 0:W_resized] | |
image = image_ori.resize((resize_width, resize_height), Image.Resampling.LANCZOS) | |
image = transforms.ToTensor()(image) | |
image_ori = transforms.ToTensor()(img_ori_npy).unsqueeze(0) | |
image = line(image).unsqueeze(0) | |
if torch.cuda.is_available(): | |
return image_ori.cuda(), image.cuda(), (H, W) | |
return image_ori, image, (H, W) | |
def load_and_preprocess_intrinsics(intrinsics_path, resize_width, resize_height): | |
K = np.eye(4) | |
with open(intrinsics_path, 'r') as f: | |
K[:3, :3] = np.array(json.load(f)) | |
# Convert normalised intrinsics to 1/4 size unnormalised intrinsics. | |
# (The cost volume construction expects the intrinsics corresponding to 1/4 size images) | |
K[0, :] *= resize_width // 4 | |
K[1, :] *= resize_height // 4 | |
invK = torch.Tensor(np.linalg.pinv(K)).unsqueeze(0) | |
K = torch.Tensor(K).unsqueeze(0) | |
if torch.cuda.is_available(): | |
return K.cuda(), invK.cuda() | |
return K, invK | |
def tensor2img(img: Tensor) -> np.ndarray: | |
return (255.0 * img.permute(1, 2, 0).cpu().detach().numpy()).astype(np.uint8) | |
def test_simple(image: Image): | |
"""Function to predict for a single image or folder of images | |
""" | |
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") | |
# Loading pretrained model | |
encoder_dict = torch.load("src/weights/encoder.pth", map_location=device) | |
encoder = ResnetEncoderMatching(18, False, | |
input_width=encoder_dict['width'], | |
input_height=encoder_dict['height'], | |
adaptive_bins=True, | |
min_depth_bin=encoder_dict['min_depth_bin'], | |
max_depth_bin=encoder_dict['max_depth_bin'], | |
depth_binning='linear', | |
num_depth_bins=96) | |
filtered_dict_enc = {k: v for k, v in encoder_dict.items() if k in encoder.state_dict()} | |
encoder.load_state_dict(filtered_dict_enc) | |
depth_decoder = DepthDecoder(num_ch_enc=encoder.num_ch_enc, scales=range(4)) | |
loaded_dict = torch.load("src/weights/depth.pth", map_location=device) | |
depth_decoder.load_state_dict(loaded_dict) | |
pose_enc_dict = torch.load("src/weights/pose_encoder.pth", map_location=device) | |
pose_dec_dict = torch.load("src/weights/pose.pth", map_location=device) | |
pose_enc = ResnetEncoder(18, False, num_input_images=2) | |
pose_dec = PoseDecoder(pose_enc.num_ch_enc, | |
num_input_features=1, | |
num_frames_to_predict_for=2) | |
pose_enc.load_state_dict(pose_enc_dict, strict=True) | |
pose_dec.load_state_dict(pose_dec_dict, strict=True) | |
restoration_dict = torch.load("src/weights/uie_model.pth", map_location=device) | |
uie_model = MainModel() | |
uie_model.load_state_dict(restoration_dict, strict=False) | |
# Setting states of networks | |
encoder.eval() | |
depth_decoder.eval() | |
pose_enc.eval() | |
pose_dec.eval() | |
uie_model.eval() | |
if torch.cuda.is_available(): | |
encoder.cuda() | |
depth_decoder.cuda() | |
pose_enc.cuda() | |
pose_dec.cuda() | |
uie_model.cuda() | |
# Load input data | |
input_image_ori, input_image, original_size = load_and_preprocess_image(image, | |
resize_width=encoder_dict['width'], | |
resize_height=encoder_dict['height']) | |
source_image_ori, source_image, _ = load_and_preprocess_image(image, | |
resize_width=encoder_dict['width'], | |
resize_height=encoder_dict['height']) | |
K, invK = load_and_preprocess_intrinsics('canyons_intrinsics.json', | |
resize_width=encoder_dict['width'], | |
resize_height=encoder_dict['height']) | |
with torch.no_grad(): | |
# Estimate poses | |
pose_inputs = [source_image, input_image] | |
pose_inputs = [pose_enc(torch.cat(pose_inputs, 1))] | |
axisangle, translation = pose_dec(pose_inputs) | |
pose = transformation_from_parameters(axisangle[:, 0], translation[:, 0], invert=True) | |
pose *= 0 # zero poses are a signal to the encoder not to construct a cost volume | |
source_image *= 0 | |
# Estimate depth | |
output, lowest_cost, _ = encoder(current_image=input_image, | |
lookup_images=source_image.unsqueeze(1), | |
poses=pose.unsqueeze(1), | |
K=K, | |
invK=invK, | |
min_depth_bin=encoder_dict['min_depth_bin'], | |
max_depth_bin=encoder_dict['max_depth_bin']) | |
output = depth_decoder(output) | |
sigmoid_output = output[("disp", 0)] | |
_, depth_output = disp_to_depth(sigmoid_output, min_depth=0.1, max_depth=20) | |
sigmoid_output_resized = F.interpolate( | |
sigmoid_output, original_size, mode="bilinear", align_corners=False) | |
sigmoid_output_resized = sigmoid_output_resized.cpu().numpy()[:, 0] | |
depth = F.interpolate( | |
depth_output, input_image_ori.shape[2:], mode="bilinear", align_corners=False) | |
beta, J, A = uie_model(input_image_ori) | |
beta[0] = 5.0 * beta[0] | |
beta[1] = 5.0 * beta[1] | |
t1 = torch.exp(-beta[0] * depth) | |
D1 = J * t1 | |
B1 = (1 - torch.exp(-beta[1] * depth)) * A | |
I_rec = D1 + B1 | |
J_out = Image.open(tensor2img(J[0])) | |
return J_out | |