import math from random import choice, choices, randint import cv2 import numpy as np from PIL import Image from torch.utils.data.dataset import Dataset from utils import USMSharp_npy, cvtColor, preprocess_input from .degradations import (circular_lowpass_kernel, random_add_gaussian_noise, random_add_poisson_noise, random_mixed_kernels) from .transforms import augment, paired_random_crop def cv_show(image): image = np.array(image) image = cv2.resize(image, (256, 128), interpolation=cv2.INTER_CUBIC) cv2.imshow('image', image) cv2.waitKey(0) cv2.destroyAllWindows() def get_new_img_size(width, height, img_min_side=600): if width <= height: f = float(img_min_side) / width resized_height = int(f * height) resized_width = int(img_min_side) else: f = float(img_min_side) / height resized_width = int(f * width) resized_height = int(img_min_side) return resized_width, resized_height class SRGANDataset(Dataset): def __init__(self, train_lines, lr_shape, hr_shape): super(SRGANDataset, self).__init__() self.train_lines = train_lines self.train_batches = len(train_lines) self.lr_shape = lr_shape self.hr_shape = hr_shape self.scale = int(hr_shape[0]/lr_shape[0]) self.usmsharp = USMSharp_npy() # 第一次滤波的参数 self.blur_kernel_size = 21 self.kernel_list = ['iso', 'aniso', 'generalized_iso', 'generalized_aniso', 'plateau_iso', 'plateau_aniso'] self.kernel_prob = [0.45, 0.25, 0.12, 0.03, 0.12, 0.03] self.sinc_prob = 0.1 self.blur_sigma = [0.2, 3] self.betag_range = [0.5, 4] self.betap_range = [1, 2] # 第二次滤波的参数 self.blur_kernel_size2 = 21 self.kernel_list2 = ['iso', 'aniso', 'generalized_iso', 'generalized_aniso', 'plateau_iso', 'plateau_aniso'] self.kernel_prob2 = [0.45, 0.25, 0.12, 0.03, 0.12, 0.03] self.sinc_prob2 = 0.1 self.blur_sigma2 = [0.2, 3] self.betag_range2 = [0.5, 4] self.betap_range2 = [1, 2] # 最后的sinc滤波 self.final_sinc_prob = 0.8 # 卷积核大小从7到21分布 self.kernel_range = [2 * v + 1 for v in range(3, 11)] # 使用脉冲张量进行卷积不会产生模糊效果 self.pulse_tensor = np.zeros(shape=[21, 21], dtype='float32') self.pulse_tensor[10, 10] = 1 # 第一次退化的参数 self.resize_prob = [0.2, 0.7, 0.1] # up, down, keep self.resize_range = [0.15, 1.5] self.gaussian_noise_prob = 0.5 self.noise_range = [1, 30] self.poisson_scale_range = [0.05, 3] self.gray_noise_prob = 0.4 self.jpeg_range = [30, 95] # 第二次退化的参数 self.second_blur_prob = 0.8 self.resize_prob2 = [0.3, 0.4, 0.3] # up, down, keep self.resize_range2 = [0.3, 1.2] self.gaussian_noise_prob2= 0.5 self.noise_range2 = [1, 25] self.poisson_scale_range2= [0.05, 2.5] self.gray_noise_prob2 = 0.4 self.jpeg_range2 = [30, 95] def __len__(self): return self.train_batches def __getitem__(self, index): index = index % self.train_batches image_origin = Image.open(self.train_lines[index].split()[0]) lq, gt = self.get_random_data(image_origin, self.hr_shape) gt = np.transpose(preprocess_input(np.array(gt, dtype=np.float32), [0.5,0.5,0.5], [0.5,0.5,0.5]), [2,0,1]) lq = np.transpose(preprocess_input(np.array(lq, dtype=np.float32), [0.5,0.5,0.5], [0.5,0.5,0.5]), [2,0,1]) return lq, gt def rand(self, a=0, b=1): return np.random.rand()*(b-a) + a def get_random_data(self, image, input_shape): #------------------------------# # 读取图像并转换成RGB图像 # cvtColor将np转Image #------------------------------# image = cvtColor(image) #------------------------------# # 获得图像的高宽与目标高宽 #------------------------------# iw, ih = image.size h, w = input_shape scale = min(w/iw, h/ih) nw = int(iw*scale) nh = int(ih*scale) dx = (w-nw)//2 dy = (h-nh)//2 #---------------------------------# # 将图像多余的部分加上灰条 #---------------------------------# image = image.resize((nw,nh), Image.BICUBIC) new_image = Image.new('RGB', (w,h), (128,128,128)) new_image.paste(image, (dx, dy)) image = np.array(new_image, np.float32) rotate = self.rand()<.5 if rotate: angle = np.random.randint(-15,15) a,b = w/2,h/2 M = cv2.getRotationMatrix2D((a,b),angle,1) image = cv2.warpAffine(np.array(image), M, (w,h), borderValue=[128,128,128]) # ------------------------ 生成卷积核以进行第一次退化处理 ------------------------ # kernel_size = choice(self.kernel_range) if np.random.uniform() < self.sinc_prob: # 此sinc过滤器设置适用于[7,21]范围内的内核 if kernel_size < 13: omega_c = np.random.uniform(np.pi / 3, np.pi) else: omega_c = np.random.uniform(np.pi / 5, np.pi) kernel = circular_lowpass_kernel(omega_c, kernel_size, pad_to=False) else: kernel = random_mixed_kernels( self.kernel_list, self.kernel_prob, kernel_size, self.blur_sigma, self.blur_sigma, [-math.pi, math.pi], self.betag_range, self.betap_range, noise_range=None) # pad kernel pad_size = (21 - kernel_size) // 2 kernel = np.pad(kernel, ((pad_size, pad_size), (pad_size, pad_size))) kernel = kernel.astype(np.float32) # ------------------------ 生成卷积核以进行第二次退化处理 ------------------------ # kernel_size = choice(self.kernel_range) if np.random.uniform() < self.sinc_prob2: if kernel_size < 13: omega_c = np.random.uniform(np.pi / 3, np.pi) else: omega_c = np.random.uniform(np.pi / 5, np.pi) kernel2 = circular_lowpass_kernel(omega_c, kernel_size, pad_to=False) else: kernel2 = random_mixed_kernels( self.kernel_list2, self.kernel_prob2, kernel_size, self.blur_sigma2, self.blur_sigma2, [-math.pi, math.pi], self.betag_range2, self.betap_range2, noise_range=None) # pad kernel pad_size = (21 - kernel_size) // 2 kernel2 = np.pad(kernel2, ((pad_size, pad_size), (pad_size, pad_size))) kernel2 = kernel2.astype(np.float32) # ----------------------the final sinc kernel ------------------------- # if np.random.uniform() < self.final_sinc_prob: kernel_size = choice(self.kernel_range) omega_c = np.random.uniform(np.pi / 3, np.pi) sinc_kernel = circular_lowpass_kernel(omega_c, kernel_size, pad_to=21) else: sinc_kernel = self.pulse_tensor sinc_kernel = sinc_kernel.astype(np.float32) lq, gt = self.feed_data(image, kernel, kernel2, sinc_kernel) return lq, gt def feed_data(self, img_gt, kernel1, kernel2, sinc_kernel): img_gt = np.array(img_gt, dtype=np.float32) # 对gt进行锐化 img_gt = np.clip(img_gt / 255, 0, 1) gt = self.usmsharp.filt(img_gt) [ori_w, ori_h, _] = gt.shape # ---------------------- 根据参数进行第一次退化 -------------------- # # 模糊处理 out = cv2.filter2D(img_gt, -1, kernel1) # 随机 resize updown_type = choices(['up', 'down', 'keep'], self.resize_prob)[0] if updown_type == 'up': scale = np.random.uniform(1, self.resize_range[1]) elif updown_type == 'down': scale = np.random.uniform(self.resize_range[0], 1) else: scale = 1 mode = choice(['area', 'bilinear', 'bicubic']) if mode=='area': out = cv2.resize(out, (int(ori_h * scale), int(ori_w * scale)), interpolation=cv2.INTER_AREA) elif mode=='bilinear': out = cv2.resize(out, (int(ori_h * scale), int(ori_w * scale)), interpolation=cv2.INTER_LINEAR) else: out = cv2.resize(out, (int(ori_h * scale), int(ori_w * scale)), interpolation=cv2.INTER_CUBIC) # 灰度噪声 gray_noise_prob = self.gray_noise_prob if np.random.uniform() < self.gaussian_noise_prob: out = random_add_gaussian_noise( out, sigma_range=self.noise_range, clip=True, rounds=False, gray_prob=gray_noise_prob) else: out = random_add_poisson_noise( out, scale_range=self.poisson_scale_range, gray_prob=gray_noise_prob, clip=True, rounds=False) # JPEG 压缩 jpeg_p = np.random.uniform(low=self.jpeg_range[0], high=self.jpeg_range[1]) jpeg_p = int(jpeg_p) out = np.clip(out, 0, 1) encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), jpeg_p] _, encimg = cv2.imencode('.jpg', out * 255., encode_param) out = np.float32(cv2.imdecode(encimg, 1))/255 # ---------------------- 根据参数进行第一次退化 -------------------- # # 模糊 if np.random.uniform() < self.second_blur_prob: out = cv2.filter2D(out, -1, kernel2) # 随机 resize updown_type = choices(['up', 'down', 'keep'], self.resize_prob2)[0] if updown_type == 'up': scale = np.random.uniform(1, self.resize_range2[1]) elif updown_type == 'down': scale = np.random.uniform(self.resize_range2[0], 1) else: scale = 1 mode = choice(['area', 'bilinear', 'bicubic']) if mode == 'area': out = cv2.resize(out, (int(ori_h / self.scale * scale), int(ori_w / self.scale * scale)), interpolation=cv2.INTER_AREA) elif mode == 'bilinear': out = cv2.resize(out, (int(ori_h / self.scale * scale), int(ori_w / self.scale * scale)), interpolation=cv2.INTER_LINEAR) else: out = cv2.resize(out, (int(ori_h / self.scale * scale), int(ori_w / self.scale * scale)), interpolation=cv2.INTER_CUBIC) # 灰度噪声 gray_noise_prob = self.gray_noise_prob2 if np.random.uniform() < self.gaussian_noise_prob2: out = random_add_gaussian_noise( out, sigma_range=self.noise_range2, clip=True, rounds=False, gray_prob=gray_noise_prob) else: out = random_add_poisson_noise( out, scale_range=self.poisson_scale_range2, gray_prob=gray_noise_prob, clip=True, rounds=False) # JPEG压缩+最后的sinc滤波器 # 我们还需要将图像的大小调整到所需的尺寸。我们把[调整大小+sinc过滤器]组合在一起 # 作为一个操作。 # 我们考虑两个顺序。 # 1. [调整大小+sinc filter] + JPEG压缩 # 2. 2. JPEG压缩+[调整大小+sinc过滤]。 # 根据经验,我们发现其他组合(sinc + JPEG + Resize)会引入扭曲的线条。 if np.random.uniform() < 0.5: # resize back + the final sinc filter mode = choice(['area', 'bilinear', 'bicubic']) if mode == 'area': out = cv2.resize(out, (ori_h // self.scale, ori_w // self.scale), interpolation=cv2.INTER_AREA) elif mode == 'bilinear': out = cv2.resize(out, (ori_h // self.scale, ori_w // self.scale), interpolation=cv2.INTER_LINEAR) else: out = cv2.resize(out, (ori_h // self.scale, ori_w // self.scale), interpolation=cv2.INTER_CUBIC) out = cv2.filter2D(out, -1, sinc_kernel) # JPEG 压缩 jpeg_p = np.random.uniform(low=self.jpeg_range[0], high=self.jpeg_range[1]) jpeg_p = jpeg_p out = np.clip(out, 0, 1) encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), jpeg_p] _, encimg = cv2.imencode('.jpg', out * 255., encode_param) out = np.float32(cv2.imdecode(encimg, 1)) / 255 else: # JPEG 压缩 jpeg_p = np.random.uniform(low=self.jpeg_range[0], high=self.jpeg_range[1]) jpeg_p = jpeg_p out = np.clip(out, 0, 1) encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), jpeg_p] _, encimg = cv2.imencode('.jpg', out * 255., encode_param) out = np.float32(cv2.imdecode(encimg, 1)) / 255 # resize back + the final sinc filter mode = choice(['area', 'bilinear', 'bicubic']) if mode == 'area': out = cv2.resize(out, (ori_h // self.scale, ori_w // self.scale),interpolation=cv2.INTER_AREA) elif mode == 'bilinear': out = cv2.resize(out, (ori_h // self.scale, ori_w // self.scale),interpolation=cv2.INTER_LINEAR) else: out = cv2.resize(out, (ori_h // self.scale, ori_w // self.scale),interpolation=cv2.INTER_CUBIC) lq = np.clip((out * 255.0), 0, 255) gt = np.clip((gt * 255.0), 0, 255) return Image.fromarray(np.uint8(lq)), Image.fromarray(np.uint8(gt)) def SRGAN_dataset_collate(batch): images_l = [] images_h = [] for img_l, img_h in batch: images_l.append(img_l) images_h.append(img_h) return np.array(images_l), np.array(images_h)