|
""" |
|
https://github.com/Trinkle23897/Fast-Poisson-Image-Editing |
|
MIT License |
|
|
|
Copyright (c) 2022 Jiayi Weng |
|
|
|
Permission is hereby granted, free of charge, to any person obtaining a copy |
|
of this software and associated documentation files (the "Software"), to deal |
|
in the Software without restriction, including without limitation the rights |
|
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
|
copies of the Software, and to permit persons to whom the Software is |
|
furnished to do so, subject to the following conditions: |
|
|
|
The above copyright notice and this permission notice shall be included in all |
|
copies or substantial portions of the Software. |
|
|
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
|
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE |
|
SOFTWARE. |
|
""" |
|
|
|
import time |
|
import argparse |
|
import os |
|
import fpie |
|
from process import ALL_BACKEND, CPU_COUNT, DEFAULT_BACKEND |
|
from fpie.io import read_images, write_image |
|
from process import BaseProcessor, EquProcessor, GridProcessor |
|
|
|
from PIL import Image |
|
import numpy as np |
|
import skimage |
|
import skimage.measure |
|
import scipy |
|
import scipy.signal |
|
|
|
|
|
class PhotometricCorrection: |
|
def __init__(self,quite=False): |
|
self.get_parser("cli") |
|
args=self.parser.parse_args(["--method","grid","-g","src","-s","a","-t","a","-o","a"]) |
|
args.mpi_sync_interval = getattr(args, "mpi_sync_interval", 0) |
|
self.backend=args.backend |
|
self.args=args |
|
self.quite=quite |
|
proc: BaseProcessor |
|
proc = GridProcessor( |
|
args.gradient, |
|
args.backend, |
|
args.cpu, |
|
args.mpi_sync_interval, |
|
args.block_size, |
|
args.grid_x, |
|
args.grid_y, |
|
) |
|
print( |
|
f"[PIE]Successfully initialize PIE {args.method} solver " |
|
f"with {args.backend} backend" |
|
) |
|
self.proc=proc |
|
|
|
def run(self, original_image, inpainted_image, mode="mask_mode"): |
|
print(f"[PIE] start") |
|
if mode=="disabled": |
|
return inpainted_image |
|
input_arr=np.array(original_image) |
|
if input_arr[:,:,-1].sum()<1: |
|
return inpainted_image |
|
output_arr=np.array(inpainted_image) |
|
mask=input_arr[:,:,-1] |
|
mask=255-mask |
|
if mask.sum()<1 and mode=="mask_mode": |
|
mode="" |
|
if mode=="mask_mode": |
|
mask = skimage.measure.block_reduce(mask, (8, 8), np.max) |
|
mask = mask.repeat(8, axis=0).repeat(8, axis=1) |
|
else: |
|
mask[8:-9,8:-9]=255 |
|
mask = mask[:,:,np.newaxis].repeat(3,axis=2) |
|
nmask=mask.copy() |
|
output_arr2=output_arr[:,:,0:3].copy() |
|
input_arr2=input_arr[:,:,0:3].copy() |
|
output_arr2[nmask<128]=0 |
|
input_arr2[nmask>=128]=0 |
|
output_arr2+=input_arr2 |
|
src = output_arr2[:,:,0:3] |
|
tgt = src.copy() |
|
proc=self.proc |
|
args=self.args |
|
if proc.root: |
|
n = proc.reset(src, mask, tgt, (args.h0, args.w0), (args.h1, args.w1)) |
|
proc.sync() |
|
if proc.root: |
|
result = tgt |
|
t = time.time() |
|
if args.p == 0: |
|
args.p = args.n |
|
|
|
for i in range(0, args.n, args.p): |
|
if proc.root: |
|
result, err = proc.step(args.p) |
|
print(f"[PIE] Iter {i + args.p}, abs_err {err}") |
|
else: |
|
proc.step(args.p) |
|
|
|
if proc.root: |
|
dt = time.time() - t |
|
print(f"[PIE] Time elapsed: {dt:.4f}s") |
|
|
|
return Image.fromarray(result) |
|
|
|
|
|
def get_parser(self,gen_type: str) -> argparse.Namespace: |
|
parser = argparse.ArgumentParser() |
|
parser.add_argument( |
|
"-v", "--version", action="store_true", help="show the version and exit" |
|
) |
|
parser.add_argument( |
|
"--check-backend", action="store_true", help="print all available backends" |
|
) |
|
if gen_type == "gui" and "mpi" in ALL_BACKEND: |
|
|
|
ALL_BACKEND.remove("mpi") |
|
parser.add_argument( |
|
"-b", |
|
"--backend", |
|
type=str, |
|
choices=ALL_BACKEND, |
|
default=DEFAULT_BACKEND, |
|
help="backend choice", |
|
) |
|
parser.add_argument( |
|
"-c", |
|
"--cpu", |
|
type=int, |
|
default=CPU_COUNT, |
|
help="number of CPU used", |
|
) |
|
parser.add_argument( |
|
"-z", |
|
"--block-size", |
|
type=int, |
|
default=1024, |
|
help="cuda block size (only for equ solver)", |
|
) |
|
parser.add_argument( |
|
"--method", |
|
type=str, |
|
choices=["equ", "grid"], |
|
default="equ", |
|
help="how to parallelize computation", |
|
) |
|
parser.add_argument("-s", "--source", type=str, help="source image filename") |
|
if gen_type == "cli": |
|
parser.add_argument( |
|
"-m", |
|
"--mask", |
|
type=str, |
|
help="mask image filename (default is to use the whole source image)", |
|
default="", |
|
) |
|
parser.add_argument("-t", "--target", type=str, help="target image filename") |
|
parser.add_argument("-o", "--output", type=str, help="output image filename") |
|
if gen_type == "cli": |
|
parser.add_argument( |
|
"-h0", type=int, help="mask position (height) on source image", default=0 |
|
) |
|
parser.add_argument( |
|
"-w0", type=int, help="mask position (width) on source image", default=0 |
|
) |
|
parser.add_argument( |
|
"-h1", type=int, help="mask position (height) on target image", default=0 |
|
) |
|
parser.add_argument( |
|
"-w1", type=int, help="mask position (width) on target image", default=0 |
|
) |
|
parser.add_argument( |
|
"-g", |
|
"--gradient", |
|
type=str, |
|
choices=["max", "src", "avg"], |
|
default="max", |
|
help="how to calculate gradient for PIE", |
|
) |
|
parser.add_argument( |
|
"-n", |
|
type=int, |
|
help="how many iteration would you perfer, the more the better", |
|
default=5000, |
|
) |
|
if gen_type == "cli": |
|
parser.add_argument( |
|
"-p", type=int, help="output result every P iteration", default=0 |
|
) |
|
if "mpi" in ALL_BACKEND: |
|
parser.add_argument( |
|
"--mpi-sync-interval", |
|
type=int, |
|
help="MPI sync iteration interval", |
|
default=100, |
|
) |
|
parser.add_argument( |
|
"--grid-x", type=int, help="x axis stride for grid solver", default=8 |
|
) |
|
parser.add_argument( |
|
"--grid-y", type=int, help="y axis stride for grid solver", default=8 |
|
) |
|
self.parser=parser |
|
|
|
if __name__ =="__main__": |
|
import sys |
|
import io |
|
import base64 |
|
from PIL import Image |
|
def base64_to_pil(base64_str): |
|
data = base64.b64decode(str(base64_str)) |
|
pil = Image.open(io.BytesIO(data)) |
|
return pil |
|
|
|
def pil_to_base64(out_pil): |
|
out_buffer = io.BytesIO() |
|
out_pil.save(out_buffer, format="PNG") |
|
out_buffer.seek(0) |
|
base64_bytes = base64.b64encode(out_buffer.read()) |
|
base64_str = base64_bytes.decode("ascii") |
|
return base64_str |
|
correction_func=PhotometricCorrection(quite=True) |
|
while True: |
|
buffer = sys.stdin.readline() |
|
print(f"[PIE] suprocess {len(buffer)} {type(buffer)} ") |
|
if len(buffer)==0: |
|
break |
|
if isinstance(buffer,str): |
|
lst=buffer.strip().split(",") |
|
else: |
|
lst=buffer.decode("ascii").strip().split(",") |
|
img0=base64_to_pil(lst[0]) |
|
img1=base64_to_pil(lst[1]) |
|
ret=correction_func.run(img0,img1,mode=lst[2]) |
|
ret_base64=pil_to_base64(ret) |
|
if isinstance(buffer,str): |
|
sys.stdout.write(f"{ret_base64}\n") |
|
else: |
|
sys.stdout.write(f"{ret_base64}\n".encode()) |
|
sys.stdout.flush() |