|
import cv2 |
|
import argparse |
|
import numpy as np |
|
import copy |
|
import os, sys, copy, shutil |
|
from kornia import morphology as morph |
|
import math |
|
import gc, time |
|
import torch |
|
import torch.multiprocessing as mp |
|
from torch.nn import functional as F |
|
from multiprocessing import set_start_method |
|
|
|
|
|
root_path = os.path.abspath('.') |
|
sys.path.append(root_path) |
|
from opt import opt |
|
from degradation.ESR.utils import filter2D, np2tensor, tensor2np |
|
|
|
|
|
|
|
|
|
|
|
XDoG_config = dict( |
|
size=0, |
|
sigma=0.6, |
|
eps=-15, |
|
phi=10e8, |
|
k=2.5, |
|
gamma=0.97 |
|
) |
|
|
|
|
|
|
|
XDoG_config['gamma'] += 0.01 * np.random.rand(1) |
|
dilation_kernel = torch.tensor([[1, 1, 1],[1, 1, 1],[1, 1, 1]]).cuda() |
|
white_color_value = 1 |
|
|
|
|
|
|
|
def DoG(image, size, sigma, k=1.6, gamma=1.): |
|
g1 = cv2.GaussianBlur(image, (size, size), sigma) |
|
g2 = cv2.GaussianBlur(image, (size, size), sigma*k) |
|
return g1 - gamma * g2 |
|
|
|
|
|
def XDoG(image, size, sigma, eps, phi, k=1.6, gamma=1.): |
|
eps /= 255 |
|
d = DoG(image, size, sigma, k, gamma) |
|
d /= d.max() |
|
e = 1 + np.tanh(phi * (d - eps)) |
|
e[e >= 1] = 1 |
|
return e |
|
|
|
|
|
|
|
class USMSharp(torch.nn.Module): |
|
''' |
|
Basically, the same as Real-ESRGAN |
|
''' |
|
|
|
def __init__(self, type, radius=50, sigma=0): |
|
|
|
super(USMSharp, self).__init__() |
|
if radius % 2 == 0: |
|
radius += 1 |
|
self.radius = radius |
|
kernel = cv2.getGaussianKernel(radius, sigma) |
|
kernel = torch.FloatTensor(np.dot(kernel, kernel.transpose())).unsqueeze_(0).cuda() |
|
self.register_buffer('kernel', kernel) |
|
|
|
self.type = type |
|
|
|
|
|
def forward(self, img, weight=0.5, threshold=10, store=False): |
|
|
|
|
|
if self.type == "cv2": |
|
|
|
img = np2tensor(img) |
|
|
|
blur = filter2D(img, self.kernel.cuda()) |
|
if store: |
|
cv2.imwrite("blur.png", tensor2np(blur)) |
|
|
|
residual = img - blur |
|
if store: |
|
cv2.imwrite("residual.png", tensor2np(residual)) |
|
|
|
mask = torch.abs(residual) * 255 > threshold |
|
if store: |
|
cv2.imwrite("mask.png", tensor2np(mask)) |
|
|
|
|
|
mask = mask.float() |
|
soft_mask = filter2D(mask, self.kernel.cuda()) |
|
if store: |
|
cv2.imwrite("soft_mask.png", tensor2np(soft_mask)) |
|
|
|
sharp = img + weight * residual |
|
sharp = torch.clip(sharp, 0, 1) |
|
if store: |
|
cv2.imwrite("sharp.png", tensor2np(sharp)) |
|
|
|
output = soft_mask * sharp + (1 - soft_mask) * img |
|
if self.type == "cv2": |
|
output = tensor2np(output) |
|
|
|
return output |
|
|
|
|
|
|
|
def get_xdog_sketch_map(img_bgr, outlier_threshold): |
|
|
|
gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY) |
|
sketch_map = gen_xdog_image(gray, outlier_threshold) |
|
sketch_map = np.stack((sketch_map, sketch_map, sketch_map), axis=2) |
|
|
|
return np.uint8(sketch_map) |
|
|
|
|
|
def process_single_img(queue, usm_sharper, extra_sharpen_time, outlier_threshold): |
|
|
|
counter = 0 |
|
while True: |
|
counter += 1 |
|
if counter == 10: |
|
counter = 0 |
|
gc.collect() |
|
print("We will sleep here to clear memory") |
|
time.sleep(5) |
|
info = queue[0] |
|
queue = queue[1:] |
|
if info == None: |
|
break |
|
|
|
img_dir, store_path = info |
|
print("We are processing ", img_dir) |
|
img = cv2.imread(img_dir) |
|
|
|
img = usm_sharper(img, store=False, threshold=10) |
|
first_sharpened_img = copy.deepcopy(img) |
|
|
|
for _ in range(extra_sharpen_time): |
|
|
|
img = usm_sharper(img, store=False, threshold=10) |
|
|
|
|
|
sketch_map = get_xdog_sketch_map(img, outlier_threshold) |
|
img = (img * sketch_map) + (first_sharpened_img * (1-sketch_map)) |
|
|
|
|
|
cv2.imwrite(store_path, img) |
|
|
|
print("Finish all program") |
|
|
|
|
|
|
|
def outlier_removal(img, outlier_threshold): |
|
''' Remove outlier pixel after finding the sketch |
|
Here, black(0) means background information; white(1) means hand-drawn line |
|
''' |
|
|
|
global_list = set() |
|
h,w = img.shape |
|
|
|
def dfs(i, j): |
|
''' |
|
Using Depth First Search to find the full area of mapping |
|
''' |
|
if (i,j) in visited: |
|
|
|
return |
|
|
|
if (i,j) in global_list: |
|
|
|
return |
|
|
|
if i >= h or j >= w or i < 0 or j < 0: |
|
|
|
return |
|
|
|
if img[i][j] == white_color_value: |
|
visited.add((i,j)) |
|
|
|
|
|
if len(visited) >= 100: |
|
return |
|
|
|
dfs(i+1, j) |
|
dfs(i, j+1) |
|
dfs(i-1, j) |
|
dfs(i, j-1) |
|
dfs(i-1, j-1) |
|
dfs(i+1, j+1) |
|
dfs(i-1, j+1) |
|
dfs(i+1, j-1) |
|
|
|
return |
|
|
|
def bfs(i, j): |
|
''' |
|
Using Breadth First Search to find the full area of mapping |
|
''' |
|
if (i,j) in visited: |
|
|
|
return |
|
|
|
if (i,j) in global_list: |
|
|
|
return |
|
|
|
visited.add((i,j)) |
|
if img[i][j] != white_color_value: |
|
return |
|
|
|
queue = [(i, j)] |
|
while queue: |
|
base_row, base_col = queue.pop(0) |
|
|
|
for dx, dy in [(-1, -1), (-1, 0), (-1, 1), (0, -1), (0, 1), (1, -1), (1, 0), (1, 1)]: |
|
row, col = base_row+dx, base_col+dy |
|
|
|
if (row, col) in visited: |
|
|
|
continue |
|
|
|
if (row, col) in global_list: |
|
|
|
continue |
|
|
|
if row >= h or col >= w or row < 0 or col < 0: |
|
|
|
continue |
|
|
|
if img[row][col] == white_color_value: |
|
visited.add((row, col)) |
|
queue.append((row, col)) |
|
|
|
|
|
temp = np.copy(img) |
|
for i in range(h): |
|
for j in range(w): |
|
if (i,j) in global_list: |
|
continue |
|
if temp[i][j] != white_color_value: |
|
|
|
continue |
|
|
|
global visited |
|
visited = set() |
|
|
|
|
|
bfs(i, j) |
|
|
|
if len(visited) < outlier_threshold: |
|
|
|
for u, v in visited: |
|
temp[u][v] = 0 |
|
|
|
|
|
for u, v in visited: |
|
global_list.add((u, v)) |
|
|
|
return temp |
|
|
|
|
|
def active_dilate(img): |
|
def np2tensor(np_frame): |
|
return torch.from_numpy(np.transpose(np_frame, (2, 0, 1))).unsqueeze(0).cuda().float()/255 |
|
def tensor2np(tensor): |
|
|
|
return (np.transpose(tensor.detach().cpu().numpy(), (1, 2, 0))) * 255 |
|
|
|
dilated_edge_map = morph.dilation(np2tensor(np.expand_dims(img, 2)), dilation_kernel) |
|
|
|
return tensor2np(dilated_edge_map[0]).squeeze(2) |
|
|
|
|
|
def passive_dilate(img): |
|
|
|
h,w = img.shape |
|
|
|
def detect_fill(i, j): |
|
if img[i][j] == white_color_value: |
|
return False |
|
|
|
def sanity_check(i, j): |
|
if i >= h or j >= w or i < 0 or j < 0: |
|
return False |
|
|
|
if img[i][j] == white_color_value: |
|
return True |
|
return False |
|
|
|
|
|
num_white = sanity_check(i-1,j-1) + sanity_check(i-1,j) + sanity_check(i-1,j+1) + sanity_check(i,j-1) + sanity_check(i,j+1) + sanity_check(i+1,j-1) + sanity_check(i+1,j) + sanity_check(i+1,j+1) |
|
if num_white >= 3: |
|
return True |
|
|
|
|
|
temp = np.copy(img) |
|
for i in range(h): |
|
for j in range(w): |
|
global visited |
|
visited = set() |
|
|
|
should_fill = detect_fill(i, j) |
|
if should_fill: |
|
temp[i][j] = 1 |
|
|
|
|
|
return temp |
|
|
|
|
|
def gen_xdog_image(gray, outlier_threshold): |
|
''' |
|
Returns: |
|
dogged (numpy): binary map in range (1 stands for white pixel) |
|
''' |
|
|
|
dogged = XDoG(gray, **XDoG_config) |
|
dogged = 1 - dogged |
|
|
|
|
|
|
|
dogged = outlier_removal(dogged, outlier_threshold) |
|
|
|
|
|
dogged = passive_dilate(dogged) |
|
|
|
|
|
return dogged |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
|
|
parser = argparse.ArgumentParser() |
|
parser.add_argument('-i', '--input_dir', type = str) |
|
parser.add_argument('-o', '--store_dir', type = str) |
|
parser.add_argument('--outlier_threshold', type = int, default=32) |
|
args = parser.parse_args() |
|
|
|
input_dir = args.input_dir |
|
store_dir = args.store_dir |
|
outlier_threshold = args.outlier_threshold |
|
|
|
|
|
print("We are handling Strong USM sharpening on hand-drawn line for Anime images!") |
|
|
|
|
|
num_workers = 8 |
|
extra_sharpen_time = 2 |
|
|
|
|
|
if os.path.exists(store_dir): |
|
shutil.rmtree(store_dir) |
|
os.makedirs(store_dir) |
|
|
|
|
|
dir_list = [] |
|
for img_name in sorted(os.listdir(input_dir)): |
|
input_path = os.path.join(input_dir, img_name) |
|
output_path = os.path.join(store_dir, img_name) |
|
dir_list.append((input_path, output_path)) |
|
|
|
length = len(dir_list) |
|
|
|
|
|
|
|
usm_sharper = USMSharp(type="cv2").cuda() |
|
usm_sharper.share_memory() |
|
|
|
for idx in range(num_workers): |
|
set_start_method('spawn', force=True) |
|
|
|
num = math.ceil(length / num_workers) |
|
request_list = dir_list[:num] |
|
request_list.append(None) |
|
dir_list = dir_list[num:] |
|
|
|
|
|
p = mp.Process(target=process_single_img, args=(request_list, usm_sharper, extra_sharpen_time, outlier_threshold)) |
|
p.start() |
|
|
|
print("Submitted all jobs!") |