import torch.nn as nn import torch.nn.functional as F from torch.autograd import Variable import torch import numpy as np import os, time, random import argparse from torch.utils.data import Dataset, DataLoader from PIL import Image as PILImage from glob import glob from tqdm import tqdm from model.model import InvISPNet from dataset.FiveK_dataset import FiveKDatasetTest from config.config import get_arguments from utils.JPEG import DiffJPEG from utils.commons import denorm, preprocess_test_patch os.system("nvidia-smi -q -d Memory |grep -A4 GPU|grep Free >tmp") os.environ["CUDA_VISIBLE_DEVICES"] = str( np.argmax([int(x.split()[2]) for x in open("tmp", "r").readlines()]) ) # os.environ['CUDA_VISIBLE_DEVICES'] = '7' os.system("rm tmp") DiffJPEG = DiffJPEG(differentiable=True, quality=90).cuda() parser = get_arguments() parser.add_argument("--ckpt", type=str, help="Checkpoint path.") parser.add_argument( "--out_path", type=str, default="./exps/", help="Path to save checkpoint. " ) parser.add_argument( "--split_to_patch", dest="split_to_patch", action="store_true", help="Test on patch. ", ) args = parser.parse_args() print("Parsed arguments: {}".format(args)) ckpt_name = args.ckpt.split("/")[-1].split(".")[0] if args.split_to_patch: os.makedirs( args.out_path + "%s/results_metric_%s/" % (args.task, ckpt_name), exist_ok=True ) out_path = args.out_path + "%s/results_metric_%s/" % (args.task, ckpt_name) else: os.makedirs( args.out_path + "%s/results_%s/" % (args.task, ckpt_name), exist_ok=True ) out_path = args.out_path + "%s/results_%s/" % (args.task, ckpt_name) def main(args): # ======================================define the model============================================ net = InvISPNet(channel_in=3, channel_out=3, block_num=8) device = torch.device("cuda:0") net.to(device) net.eval() # load the pretrained weight if there exists one if os.path.isfile(args.ckpt): net.load_state_dict(torch.load(args.ckpt), strict=False) print("[INFO] Loaded checkpoint: {}".format(args.ckpt)) print("[INFO] Start data load and preprocessing") RAWDataset = FiveKDatasetTest(opt=args) dataloader = DataLoader( RAWDataset, batch_size=1, shuffle=False, num_workers=0, drop_last=True ) input_RGBs = sorted(glob(out_path + "pred*jpg")) input_RGBs_names = [path.split("/")[-1].split(".")[0][5:] for path in input_RGBs] print("[INFO] Start test...") for i_batch, sample_batched in enumerate(tqdm(dataloader)): step_time = time.time() input, target_rgb, target_raw = ( sample_batched["input_raw"].to(device), sample_batched["target_rgb"].to(device), sample_batched["target_raw"].to(device), ) file_name = sample_batched["file_name"][0] if args.split_to_patch: input_list, target_rgb_list, target_raw_list = preprocess_test_patch( input, target_rgb, target_raw ) else: # remove [:,:,::2,::2] if you have enough GPU memory to test the full resolution input_list, target_rgb_list, target_raw_list = ( [input[:, :, ::2, ::2]], [target_rgb[:, :, ::2, ::2]], [target_raw[:, :, ::2, ::2]], ) for i_patch in range(len(input_list)): file_name_patch = file_name + "_%05d" % i_patch idx = input_RGBs_names.index(file_name_patch) input_RGB_path = input_RGBs[idx] input_RGB = ( torch.from_numpy(np.array(PILImage.open(input_RGB_path)) / 255.0) .unsqueeze(0) .permute(0, 3, 1, 2) .float() .to(device) ) target_raw_patch = target_raw_list[i_patch] with torch.no_grad(): reconstruct_raw = net(input_RGB, rev=True) pred_raw = reconstruct_raw.detach().permute(0, 2, 3, 1) pred_raw = torch.clamp(pred_raw, 0, 1) target_raw_patch = target_raw_patch.permute(0, 2, 3, 1) pred_raw = denorm(pred_raw, 255) target_raw_patch = denorm(target_raw_patch, 255) pred_raw = pred_raw.cpu().numpy() target_raw_patch = target_raw_patch.cpu().numpy().astype(np.float32) raw_pred = PILImage.fromarray(np.uint8(pred_raw[0, :, :, 0])) raw_tar_pred = PILImage.fromarray( np.hstack( ( np.uint8(target_raw_patch[0, :, :, 0]), np.uint8(pred_raw[0, :, :, 0]), ) ) ) raw_tar = PILImage.fromarray(np.uint8(target_raw_patch[0, :, :, 0])) raw_pred.save(out_path + "raw_pred_%s_%05d.jpg" % (file_name, i_patch)) raw_tar.save(out_path + "raw_tar_%s_%05d.jpg" % (file_name, i_patch)) raw_tar_pred.save( out_path + "raw_gt_pred_%s_%05d.jpg" % (file_name, i_patch) ) np.save( out_path + "raw_pred_%s_%05d.npy" % (file_name, i_patch), pred_raw[0, :, :, :] / 255.0, ) np.save( out_path + "raw_tar_%s_%05d.npy" % (file_name, i_patch), target_raw_patch[0, :, :, :] / 255.0, ) del reconstruct_raw if __name__ == "__main__": torch.set_num_threads(4) main(args)