# References # https://sashamaps.net/docs/resources/20-colors/ import numpy as np import cv2 from scipy import ndimage as ndi from PIL import Image, ImageDraw, ImageCms, ExifTags, ImageEnhance import requests from pathlib import Path import pandas as pd from scipy.sparse import coo_matrix from skimage.feature import peak_local_max from skimage.morphology import local_maxima from skimage.segmentation import watershed from moviepy.video.io.bindings import mplfig_to_npimage import io import os from enum import Enum COLORS = ( (230, 25, 75), (60, 180, 75), (255, 255, 25), (0, 130, 200), (245, 130, 48), (145, 30, 180), (70, 240, 250), (240, 50, 230), (210, 255, 60), (250, 190, 212), (0, 128, 128), (220, 190, 255), (170, 110, 40), (255, 250, 200), (128, 0, 0), (170, 255, 195), (128, 128, 0), (255, 215, 180), (0, 0, 128), (128, 128, 128), ) class PC_TYPE(Enum): HARRIS = 1 EDGES_CONTOURS = 2 GFTT = 3 FAST = 4 KAZE = 5 def _to_2d(img): # it use just first channel. if you want rgb2gray, use _to_grayscale if img.ndim == 3: return img[:, :, 0] else: return img def _to_3d(img): if img.ndim == 2: return np.dstack([img, img, img]) else: return img def _to_byte(img: Image, format) -> bytes: # BytesIO is a file-like buffer stored in memory imgByteArr = io.BytesIO() # image.save expects a file-like as a argument img.save(imgByteArr, format=format) # Turn the BytesIO object back into a bytes object imgByteArr = imgByteArr.getvalue() return imgByteArr def _get_width_and_height(img): if img.ndim == 2: h, w = img.shape else: h, w, _ = img.shape return w, h def _get_resolution(img): w, h = _get_width_and_height(img) res = w * h return res def _to_pil(img): if not isinstance(img, Image.Image): img = Image.fromarray(img, mode="RGB") return img def _to_array(img): img = np.array(img) return img def _bool_to_uint8(img): uint8 = img.astype("uint8") if ( np.array_equal(np.unique(uint8), np.array([0, 1])) or np.array_equal(np.unique(uint8), np.array([0])) or np.array_equal(np.unique(uint8), np.array([1])) ): return uint8 * 255 else: return uint8 def _figure_to_array(fig): arr = mplfig_to_npimage(fig) return arr def _preprocess_image(img): if img.dtype == "int32": img = _repaint_segmentation_map(img) if img.dtype == "bool": img = img.astype("uint8") * 255 if img.ndim == 2: if ( np.array_equal(np.unique(img), np.array([0, 255])) or np.array_equal(np.unique(img), np.array([0])) or np.array_equal(np.unique(img), np.array([255])) ): img = _to_3d(img) else: img = _apply_jet_colormap(img) return img def _blend_two_images(img1, img2, alpha=0.5): img1 = _to_pil(img1) img2 = _to_pil(img2) img_blended = Image.blend(im1=img1, im2=img2, alpha=alpha) return _to_array(img_blended) def _repaint_segmentation_map(seg_map): canvas_r = _get_canvas_same_size_as_image(seg_map, black=True) canvas_g = _get_canvas_same_size_as_image(seg_map, black=True) canvas_b = _get_canvas_same_size_as_image(seg_map, black=True) remainder_map = seg_map % len(COLORS) + 1 for remainder, (r, g, b) in enumerate(COLORS, start=1): canvas_r[remainder_map == remainder] = r canvas_g[remainder_map == remainder] = g canvas_b[remainder_map == remainder] = b canvas_r[seg_map == 0] = 0 canvas_g[seg_map == 0] = 0 canvas_b[seg_map == 0] = 0 dstacked = np.dstack([canvas_r, canvas_g, canvas_b]) return dstacked def _get_canvas_same_size_as_image(img, black=False): if black: return np.zeros_like(img).astype("uint8") else: return (np.ones_like(img) * 255).astype("uint8") def _get_canvas(w, h, black=False): if black: return np.zeros((h, w, 3)).astype("uint8") else: return (np.ones((h, w, 3)) * 255).astype("uint8") def _invert_image(mask): return cv2.bitwise_not(mask.astype("uint8")) def _to_grayscale(img): gray_img = cv2.cvtColor(src=img, code=cv2.COLOR_RGB2GRAY) return gray_img def _erode_mask(mask, kernel_size=3): kernel = cv2.getStructuringElement( shape=cv2.MORPH_RECT, ksize=(kernel_size, kernel_size) ) if mask.dtype == "bool": mask = mask.astype("uint8") * 255 mask = cv2.erode(src=mask, kernel=kernel) return mask def _dilate_mask(mask, kernel_size=3): if kernel_size == 0: return mask kernel = cv2.getStructuringElement( shape=cv2.MORPH_RECT, ksize=(kernel_size, kernel_size) ) if mask.dtype == "bool": mask = mask.astype("uint8") * 255 mask = cv2.dilate(src=mask, kernel=kernel) return mask def _gaussian_blur_mask(mask, kernel_size=5): blurred_mask = cv2.GaussianBlur( src=mask, ksize=(kernel_size, kernel_size), sigmaX=0 ) # mask = (blurred_mask >= 32).astype("uint8") * 255 mask = (blurred_mask != 0).astype("uint8") * 255 return mask def _blur(img, v=0.04): w, h = _get_width_and_height(img) kernel_size = round(min(w, h) * v) bl = cv2.GaussianBlur( src=img.copy(order="C"), ksize=(kernel_size // 2 * 2 + 1, kernel_size // 2 * 2 + 1), sigmaX=0, ) return bl def _get_adaptive_thresholded_image(img, invert=False, block_size=3): gray_img = cv2.cvtColor(src=img, code=cv2.COLOR_RGB2GRAY) thrsh_type = cv2.THRESH_BINARY if not invert else cv2.THRESH_BINARY_INV img_thr = cv2.adaptiveThreshold( src=gray_img, maxValue=255, adaptiveMethod=cv2.ADAPTIVE_THRESH_MEAN_C, thresholdType=thrsh_type, blockSize=block_size, C=0, ) return img_thr def _make_segmentation_map_rectangle(seg_map): seg_map_copied = seg_map.copy(order="C") for idx in range(1, np.max(seg_map_copied) + 1): seg_map_sub = seg_map_copied == idx nonzero_x = np.where((seg_map_sub != 0).any(axis=0))[0] nonzero_y = np.where((seg_map_sub != 0).any(axis=1))[0] if nonzero_x.size != 0 and nonzero_y.size != 0: seg_map_copied[ nonzero_y[0] : nonzero_y[-1], nonzero_x[0] : nonzero_x[-1] ] = idx return seg_map_copied def _apply_jet_colormap(img): img_jet = cv2.applyColorMap(src=(255 - img), colormap=cv2.COLORMAP_JET) return img_jet def _reverse_jet_colormap(img): gray_values = np.arange(256, dtype=np.uint8) color_values = list(map(tuple, _apply_jet_colormap(gray_values).reshape(256, 3))) color_to_gray_map = dict(zip(color_values, gray_values)) out = np.apply_along_axis( lambda bgr: color_to_gray_map[tuple(bgr)], axis=2, arr=img ) return out def _get_pixel_counts(arr, sort=False, include_zero=False): unique, cnts = np.unique(arr, return_counts=True) idx2cnt = dict(zip(unique, cnts)) if not include_zero: if 0 in idx2cnt: idx2cnt.pop(0) if not sort: return idx2cnt else: return dict(sorted(idx2cnt.items(), key=lambda x: x[1], reverse=True)) def _combine_masks(masks): canvas = _get_canvas_same_size_as_image(img=masks[0], black=True) for mask in masks: canvas = np.maximum(_to_3d(canvas), _to_3d(mask)) return canvas def _get_local_maxima_coordinates(region_score_map, region_seg_map=None, th=150): # `src_lang="ja"`일 때 `150`이 더 잘 작동함. if region_seg_map is None: _, region_mask = cv2.threshold( src=region_score_map, thresh=th, maxval=255, type=cv2.THRESH_BINARY ) _, region_seg_map = cv2.connectedComponents(image=region_mask, connectivity=4) local_max = peak_local_max( image=region_score_map, min_distance=5, labels=region_seg_map, num_peaks_per_label=24, ) local_max = local_max[:, ::-1] # yx to xy return local_max def _get_local_maxima_array(region_score_map, region_seg_map=None, th=150): local_max_coor = _get_local_maxima_coordinates( region_score_map, region_seg_map=None, th=th ) _, h = _get_width_and_height(local_max_coor) vals = np.array([1] * h) rows = local_max_coor[:, 1] cols = local_max_coor[:, 0] local_max = ( coo_matrix((vals, (rows, cols)), shape=region_score_map.shape) .toarray() .astype("bool") ) return local_max def _mask_image(img, mask, invert=False): """img에서 mask 영역에 해당하는 부분만 추출 Args: img (_PIL or np.ndarray_): 이미지 mask (_PIL or np.ndarray_): 마스크 (H,W,C)일경우 흑백으로 변환 후 or (H,W) invert (bool, optional): invert_mask로 추출할지. Returns: _np.ndarray_: 결과 이미지 """ img = _to_array(img) mask = _to_2d(_to_array(mask)) if invert: mask = _invert_image(mask) return cv2.bitwise_and(src1=img, src2=img, mask=mask.astype("uint8")) def _ignore_small_regions_in_mask(mask, area_thresh=10): mask = _to_2d(mask) _, seg_map, stats, _ = cv2.connectedComponentsWithStats( mask.astype("uint8"), connectivity=4 ) bool = np.isin(seg_map, np.where(stats[:, cv2.CC_STAT_AREA] >= area_thresh)[0][1:]) new_mask = bool.astype("uint8") * 255 new_mask = _to_3d(new_mask) return new_mask def _crop_image(img, l, t, r, b): w, h = _get_width_and_height(img) return img[ int(max(0, t)) : int(min(h, b)), int(max(0, l)) : int(min(w, r)), ..., ] def _bboxes_to_mask(img, bboxes): canvas = _get_canvas_same_size_as_image(img=img, black=True) for row in bboxes.itertuples(): canvas[row.bbox_y1 : row.bbox_y2, row.bbox_x1 : row.bbox_x2] = 255 return _to_3d(canvas) def _apply_watershed(mask, region_score_map, th=150): local_max_arr = _get_local_maxima_array(region_score_map, th=th) _, markers = cv2.connectedComponents( image=local_max_arr.astype("uint8"), connectivity=4 ) seg_map = watershed(image=-region_score_map, markers=markers, mask=_to_2d(mask)) return seg_map def _perform_watershed(score_map, score_thresh=80): trimmed_score_map = score_map.copy() trimmed_score_map[trimmed_score_map < 190] = 0 markers = local_maxima(image=trimmed_score_map, allow_borders=False) _, markers = cv2.connectedComponents(image=markers.astype("int8"), connectivity=8) _, region_mask = cv2.threshold( src=score_map, thresh=score_thresh, maxval=255, type=cv2.THRESH_BINARY ) watersheded = watershed(image=-score_map, markers=markers, mask=_to_2d(region_mask)) return watersheded def _get_region_segmentation_map(region_score_map, region_thresh=30): _, region_mask = cv2.threshold( src=region_score_map, thresh=region_thresh, maxval=255, type=cv2.THRESH_BINARY ) region_seg_map = _apply_watershed( region_score_map=region_score_map, mask=region_mask ) return region_seg_map def _combine_two_segmentation_maps(seg_map1, seg_map2): seg_map = seg_map1 + _mask_image( img=seg_map2 + len(np.unique(seg_map1)) - 1, mask=(seg_map2 != 0) ) px_cnts = _get_pixel_counts(seg_map, sort=True, include_zero=True) seg_map = _mask_image(img=seg_map, mask=(seg_map != list(px_cnts)[0])) return seg_map def _get_image_segmentation_map(img, region_score_map=None, block_size=3): if region_score_map is not None: _, region_mask = cv2.threshold( src=region_score_map, thresh=20, maxval=255, type=cv2.THRESH_BINARY ) region_mask = _dilate_mask(img=region_mask, kernel_size=16) img_masked = _mask_image(img=img, mask=region_mask) else: img_masked = img img_thr1 = _get_adaptive_thresholded_image( img=img_masked, invert=False, block_size=block_size ) img_thr2 = _get_adaptive_thresholded_image( img=img_masked, invert=True, block_size=block_size ) _, seg_map1 = cv2.connectedComponents(image=img_thr1, connectivity=4) _, seg_map2 = cv2.connectedComponents(image=img_thr2, connectivity=4) seg_map = _combine_two_segmentation_maps(seg_map1=seg_map1, seg_map2=seg_map2) return seg_map def _get_segmentation_map_overlapping_mask(seg_map, mask, overlap_thresh=0.6): img_pixel_counts = _get_pixel_counts(seg_map, sort=True, include_zero=False) overlapping_seg_map = _mask_image(img=seg_map, mask=(mask != 0)) overlapping_counts = _get_pixel_counts( overlapping_seg_map, sort=False, include_zero=False ) df_counts = pd.DataFrame.from_dict( img_pixel_counts, orient="index", columns=["total_pixel_count"] ) df_counts["overlap_pixel_count"] = df_counts.apply( lambda x: overlapping_counts.get(x.name, 0), axis=1 ) df_counts["ratio"] = ( df_counts["overlap_pixel_count"] / df_counts["total_pixel_count"] ) region_is_inside = df_counts[df_counts["ratio"] > overlap_thresh].index.tolist() mask = np.isin(seg_map, region_is_inside).astype("uint8") mask = _to_3d(mask * 255) return mask def _split_segmentation_map(seg_map, pccs): ls_idx = ( pccs[pccs["inside"]] .apply(lambda x: seg_map[x["y"], x["x"]], axis=1) .values.tolist() ) seg_map1 = _mask_image(img=seg_map, mask=np.isin(seg_map, ls_idx)) seg_map2 = _mask_image(img=seg_map, mask=~np.isin(seg_map, ls_idx)) return seg_map1, seg_map2 def _segmentation_map_to_mask(seg_map): return _to_3d((seg_map != 0).astype("uint8") * 255) def _get_pseudo_character_centers_from_mask(mask, bboxes: pd.DataFrame = None): """Mask 이미지로부터 label(글자)의 중심 좌표를 구하는 함수""" center_coords = [] num_labels, labels, stats, centroids = cv2.connectedComponentsWithStats( image=_to_2d(mask), connectivity=8 ) for i in range(1, num_labels): center_coords.append((int(centroids[i][0]), int(centroids[i][1]))) pccs = pd.DataFrame( center_coords, columns=[ "x", "y", ], ) if not bboxes.empty: # 벡터화 연산으로 bbox 안에 있는지 검사 pccs["inside"] = ( (pccs["x"].values[:, None] > bboxes["bbox_x1"].values) & (pccs["x"].values[:, None] < bboxes["bbox_x2"].values) & (pccs["y"].values[:, None] > bboxes["bbox_y1"].values) & (pccs["y"].values[:, None] < bboxes["bbox_y2"].values) ).any(axis=1) else: pccs["inside"] = True return pccs def _get_pseudo_character_centers( region_score_map, region_seg_map=None, bboxes=pd.DataFrame() ): local_max_coor = _get_local_maxima_coordinates( region_score_map, region_seg_map=region_seg_map ) pccs = pd.DataFrame(local_max_coor, columns=["x", "y"]) if not bboxes.empty: # 벡터화 연산으로 bbox 안에 있는지 검사 pccs["inside"] = ( (pccs["x"].values[:, None] > bboxes["bbox_x1"].values) & (pccs["x"].values[:, None] < bboxes["bbox_x2"].values) & (pccs["y"].values[:, None] > bboxes["bbox_y1"].values) & (pccs["y"].values[:, None] < bboxes["bbox_y2"].values) ).any(axis=1) else: pccs["inside"] = True return pccs def _convert_region_score_map_to_region_mask(region_score_map, region_score_thresh=170): _, region_mask = cv2.threshold( src=region_score_map, thresh=30, maxval=255, type=cv2.THRESH_BINARY ) new_mask = _get_canvas_same_size_as_image(img=region_mask, black=True) n_labels, seg_map, _, _ = cv2.connectedComponentsWithStats( image=_to_2d(region_mask), connectivity=4 ) for k in range(1, n_labels): if np.max(region_score_map[seg_map == k]) < region_score_thresh: continue new_mask[seg_map == k] = 255 new_mask = _to_3d(new_mask) return new_mask def _split_mask(mask, region_score_map=None, bboxes=pd.DataFrame(), th=30): """mask를 두 종류로 나눕니다. 각각 inpainting과정에서 지워야할 mask와 복구해야할 mask 영역을 의미합니다. mask1과 mask2는 서로 겹칠수도 있습니다. 동작원리 : region_score_map(이 안주어질 경우 dst_mask_map)을 th로 이진화 및 segmap으로 변형(Connected components)후 label영역 별 Local maximum 포인트를 watershed의 marker로 여겨 watershed를 진행한 결과를 segmap으로 여기고, pccs를 peak_loacl_max(skimage)함수로 region_scoremap과 segmap을 이용해 구한다. 이때 bbox정보도 포함시켜, 각 pccs가 box안에 들어 오는지 확인한 후 bbox안에 있는 pccs에 대해 각 pccs가 속한 segmap의 label영역(seg_map1)과 속하지 못한 label 영역(seg_map2)로 나눈다. Args: mask (_np.ndarray_): (H,W,3)의 mask. values : (0 or 255) region_score_map (_np.ndarray_): region_score_map, craft의 결과. 글의 중심을 강조하는 Heat map bboxes (_pd.DataFrame_): 박스 좌표정보(bbox_x1,bbox_y1,bbox_x2,bbox_y2)가 포함된 dataFrame. Returns: _np.ndarray_: 지워야 하는 부분인 mask1. 복구해야 하는 부분인 mask2. """ if region_score_map is None: dst_mask_map = _to_2d(get_dst_mask(mask)) seg_map = _apply_watershed(mask=mask, region_score_map=dst_mask_map, th=th) pccs = _get_pseudo_character_centers( region_score_map=dst_mask_map, region_seg_map=seg_map, bboxes=bboxes ) else: seg_map = _apply_watershed(mask, region_score_map, th=th) pccs = _get_pseudo_character_centers( region_score_map=region_score_map, region_seg_map=seg_map, bboxes=bboxes ) box_mask = _bboxes_to_mask(seg_map, bboxes) seg_map1, seg_map2 = _split_segmentation_map(seg_map=seg_map, pccs=pccs) mask1 = _segmentation_map_to_mask(seg_map1) mask2 = _segmentation_map_to_mask(seg_map2) mask3 = _to_3d(_mask_image(mask1, box_mask, invert=True)) mask2 = _combine_masks([mask2, mask3]) return mask1, mask2 def get_word_segmentation_map(region_score_map, affinity_score_map): _, region_mask = cv2.threshold( src=region_score_map, thresh=70, maxval=255, type=cv2.THRESH_BINARY ) _, affinity_mask = cv2.threshold( src=affinity_score_map, thresh=70, maxval=255, type=cv2.THRESH_BINARY ) word_mask = region_mask + affinity_mask _, segmentation_map_word = cv2.connectedComponents(image=word_mask, connectivity=4) return segmentation_map_word def get_line_segmentation_map(line_score_map): _, line_mask = cv2.threshold( src=line_score_map, thresh=130, maxval=255, type=cv2.THRESH_BINARY ) _, line_segmentation_map = cv2.connectedComponents(image=line_mask, connectivity=4) return line_segmentation_map def _get_3d_block_segmentation_map(img, bboxes): segmentation_map_block = np.zeros( shape=(img.shape[0], img.shape[1], len(bboxes) + 1) ) for idx, (xmin, ymin, xmax, ymax) in enumerate( bboxes[["xmin", "ymin", "xmax", "ymax"]].values, start=1 ): segmentation_map_block[ymin:ymax, xmin:xmax, idx] = 255 return segmentation_map_block def compare_images(img1, img2, flag=cv2.CMP_EQ): # 두 이미지가 같은 영역을 255 아닌 영역을 0. flag는 cv2.CMP_XX참고(EQ==같으면1,NE==다르면1) return cv2.compare(img1, img2, flag) def convert_webp_png_get_data(img: np.ndarray): pil_img = _to_pil(img) convert_pil_img = pil_img.convert("RGB") convert_pil_img.save("temp.png") _, byte, format = load_image("temp.png", with_byte=True, with_format=True) os.remove("temp.png") return byte def add_water_mark(original_img, water_mark_img_path): if isinstance(original_img, np.ndarray): original_img = _to_pil(original_img) return_np = True else: return_np = False watermark = Image.open(water_mark_img_path).convert("RGBA") width_o, height_o = original_img.size width_wm, height_wm = watermark.size position = ((width_o - width_wm) // 2, (height_o - height_wm) // 2) # 원본 이미지보다 크기가 작은 경우에만 워터마크 이미지를 비율에 맞게 조정 if width_wm > width_o or height_wm > height_o: # 워터마크 이미지의 가로 세로 비율 계산 ratio_w = width_o / width_wm ratio_h = height_o / height_wm # 더 작은 비율을 선택하여 워터마크 이미지를 조정 ratio = min(ratio_w, ratio_h) new_width = int(width_wm * ratio) new_height = int(height_wm * ratio) watermark = watermark.resize((new_width, new_height), Image.Resampling.LANCZOS) width_wm, height_wm = watermark.size # 새로 계산된 위치 position = ((width_o - width_wm) // 2, (height_o - height_wm) // 2) original_img.paste(watermark, position, watermark) rgb_image = original_img.convert("RGB") if return_np: return _to_array(rgb_image) return rgb_image def load_image(url_or_path, with_byte=False, with_format=False): if "http" in url_or_path: url_or_path = str(url_or_path) response = requests.get(url_or_path) PIL_image = Image.open(io.BytesIO(response.content)) format = PIL_image.format image_bytes = response.content if format == "GIF": img_exif = None else: img_exif = PIL_image._getexif() if PIL_image.mode in ["L", "P", "PA", "RGBA"]: PIL_image = Image.open(io.BytesIO(response.content)).convert("RGB") if img_exif: for k in img_exif.keys(): attr = ExifTags.TAGS.get(k, "no_key") if attr != "no_key": if ExifTags.TAGS[k] == "Orientation": if img_exif[k] == 3: PIL_image = PIL_image.rotate(180, expand=True) elif img_exif[k] == 6: PIL_image = PIL_image.rotate(270, expand=True) elif img_exif[k] == 8: PIL_image = PIL_image.rotate(90, expand=True) break if PIL_image.mode == "CMYK": cmyk_profile = ImageCms.ImageCmsProfile("resources/USWebCoatedSWOP.icc") srgb_profile = ImageCms.ImageCmsProfile( "resources/sRGB Color Space Profile.icm" ) PIL_image = ImageCms.profileToProfile( PIL_image, cmyk_profile, srgb_profile, outputMode="RGB" ) img = np.array(PIL_image) else: img = np.array(PIL_image) else: # img = cv2.imread(url_or_path, flags=cv2.IMREAD_COLOR) # img = cv2.cvtColor(src=img, code=cv2.COLOR_BGR2RGB) PIL_image = Image.open(url_or_path) format = PIL_image.format byte_arr = io.BytesIO() if PIL_image.mode == "RGBA": PIL_image = PIL_image.convert("RGB") PIL_image.save(byte_arr, format="JPEG") image_bytes = byte_arr.getvalue() img = np.array(PIL_image) # if "http" in url_or_path: # img = cv2.imdecode( # np.asarray(bytearray(requests.get(url_or_path).content), dtype="uint8"), flags=cv2.IMREAD_COLOR # ) # else: # img = cv2.imread(url_or_path, flags=cv2.IMREAD_COLOR) # img = cv2.cvtColor(src=img, code=cv2.COLOR_BGR2RGB) if with_byte: if with_format: return img, image_bytes, format else: return img, image_bytes return img def save_image(img1, img2=None, alpha=0.5, path="") -> None: copied_img1 = _preprocess_image(_to_array(img1.copy(order="C"))) if img2 is None: img_arr = copied_img1 else: copied_img2 = _to_array(_preprocess_image(_to_array(img2.copy(order="C")))) img_arr = _to_array( _blend_two_images(img1=copied_img1, img2=copied_img2, alpha=alpha) ) path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) if os.path.splitext(str(path))[1] == ".gif": pil = _to_pil(img1) pil.save(str(path)) return True if img_arr.ndim == 3: cv2.imwrite( filename=str(path), img=img_arr[:, :, ::-1], params=[cv2.IMWRITE_JPEG_QUALITY, 100], ) elif img_arr.ndim == 2: cv2.imwrite( filename=str(path), img=img_arr, params=[cv2.IMWRITE_JPEG_QUALITY, 100] ) def show_image(img1, img2=None, alpha=0.5): img1 = _to_pil(_preprocess_image(_to_array(img1))) if img2 is None: img1.show() else: img2 = _to_pil(_preprocess_image(_to_array(img2))) img_blended = Image.blend(im1=img1, im2=img2, alpha=alpha) img_blended.show() def draw_bboxes(img, bboxes: pd.DataFrame, index=False): """속성추출전 원본 이미지와 bboxes정보를 가지고 이미지위에 bboxes를 시각화 해주는 함수.""" canvas = _to_pil(_get_canvas_same_size_as_image(img=img, black=True)) draw = ImageDraw.Draw(canvas) dic = dict() for row in bboxes.itertuples(): h = row.bbox_y2 - row.bbox_y1 w = row.bbox_x2 - row.bbox_x1 smaller = min(w, h) thickness = max(1, smaller // 22) dic[row.Index] = ((0, 255, 0), (0, 100, 0), thickness) for row in bboxes.itertuples(): _, fill, thickness = dic[row.Index] draw.rectangle( xy=(row.bbox_x1, row.bbox_y1, row.bbox_x2, row.bbox_y2), outline=None, fill=fill, width=thickness, ) for row in bboxes.itertuples(): outline, _, thickness = dic[row.Index] draw.rectangle( xy=(row.bbox_x1, row.bbox_y1, row.bbox_x2, row.bbox_y2), outline=outline, fill=None, width=thickness, ) if index: from data_utils.rendering_utils import _get_font max_len = max(map(len, map(str, bboxes.index))) for row in bboxes.itertuples(): h = row.bbox_y2 - row.bbox_y1 w = row.bbox_x2 - row.bbox_x1 smaller = min(w, h) font_size = max(10, min(40, smaller // 4)) draw.text( xy=(row.bbox_x1, row.bbox_y1 - 4), text=str(row.Index).zfill(max_len), fill="white", stroke_fill="black", stroke_width=2, font=_get_font(lang="en", font_size=font_size), anchor="ls", ) return _blend_two_images(img1=canvas, img2=img, alpha=0.4) def visualize_clusters(img, bboxes, index=False): from data_utils.rendering_utils import _get_font canvas = _to_pil(_get_canvas_same_size_as_image(img=img, black=True)) draw = ImageDraw.Draw(canvas) dic = dict() for row in bboxes.itertuples(): h = row.bbox_y2 - row.bbox_y1 w = row.bbox_x2 - row.bbox_x1 smaller = min(w, h) thickness = max(1, smaller // 22) dic[row.Index] = ((255, 255, 255), COLORS[row.cluster], thickness) for row in bboxes.itertuples(): _, fill, thickness = dic[row.Index] draw.rectangle( xy=(row.bbox_x1, row.bbox_y1, row.bbox_x2, row.bbox_y2), outline=None, fill=fill, width=1, ) for row in bboxes.itertuples(): outline, _, thickness = dic[row.Index] draw.rectangle( xy=(row.bbox_x1, row.bbox_y1, row.bbox_x2, row.bbox_y2), outline=outline, fill=None, width=1, ) if index: for row in bboxes.itertuples(): h = row.bbox_y2 - row.bbox_y1 w = row.bbox_x2 - row.bbox_x1 smaller = min(w, h) font_size = max(14, min(40, smaller * 0.35)) draw.text( xy=(row.bbox_x1, row.bbox_y1 - 4), text=str(row.cluster), fill="white", stroke_fill="black", stroke_width=2, font=_get_font(lang="en", font_size=font_size), anchor="ls", ) return _blend_two_images(img1=canvas, img2=img, alpha=0.25) def draw_bboxes_and_textboxes(bboxes, img): canvas = img.copy(order="C") for row in bboxes.itertuples(): cv2.rectangle( img=canvas, pt1=(row.bbox_x1, row.bbox_y1), pt2=(row.bbox_x2, row.bbox_y2), color=(0, 255, 0), thickness=4, ) cv2.rectangle( img=canvas, pt1=(row.tbox_x1, row.tbox_y1), pt2=(row.tbox_x2, row.tbox_y2), color=(255, 0, 0), thickness=2, ) return canvas def draw_pseudo_character_centers(img, pccs, margin=4): canvas = _to_pil(_get_canvas_same_size_as_image(img=img, black=True)) draw = ImageDraw.Draw(canvas) for row in pccs.itertuples(): draw.ellipse( xy=(row.x - margin, row.y - margin, row.x + margin, row.y + margin), outline=(255, 0, 0), fill=(100, 0, 0), ) return _blend_two_images(img1=canvas, img2=img, alpha=0.3) def _resize_image(img, w, h): ori_w, ori_h = _get_width_and_height(img) if w < ori_w or h < ori_h: interpolation = cv2.INTER_AREA else: interpolation = cv2.INTER_LANCZOS4 resized_img = cv2.resize(src=img, dsize=(w, h), interpolation=interpolation) return resized_img def _resize_image_using_shorter_side(img, img_size=1530): ori_w, ori_h = _get_width_and_height(img) shorter = min(ori_w, ori_h) if shorter <= img_size: return img if ori_w < ori_h: resized_img = cv2.resize( src=img, dsize=(img_size, round(ori_h * (img_size / ori_w))), interpolation=cv2.INTER_AREA, ) else: resized_img = cv2.resize( src=img, dsize=(round(ori_w * (img_size / ori_h)), img_size), interpolation=cv2.INTER_AREA, ) return resized_img def _resize_image_using_longer_side(img, img_size=2560): ori_w, ori_h = _get_width_and_height(img) longer = max(ori_w, ori_h) if longer <= img_size: return img if ori_w < ori_h: resized_img = cv2.resize( src=img, dsize=(round(ori_w * (img_size / ori_h)), img_size), interpolation=cv2.INTER_AREA, ) else: resized_img = cv2.resize( src=img, dsize=(img_size, round(ori_h * (img_size / ori_w))), interpolation=cv2.INTER_AREA, ) return resized_img def _split_image_3(img, print=False): if img.ndim == 2: is_2d = True else: is_2d = False img = _to_3d(img) w, h = _get_width_and_height(img) if h >= w: if print: print(f"Resolution: {w}, {h} -> {w}, {h // 2}") img1 = img[: h // 2, :, :] img2 = img[h // 4 : h // 4 + h // 2, :, :] img3 = img[-h // 2 :, :, :] else: if print: print(f"Resolution: {w}, {h} -> {w // 2}, {h}") img1 = img[:, : w // 2, :] img2 = img[:, w // 2 // 2 : w // 2 // 2 + w // 2, :] img3 = img[:, -w // 2 :, :] if is_2d: img1 = _to_2d(img1) img2 = _to_2d(img2) img3 = _to_2d(img3) return img1, img2, img3 def _split_image_2(img, print=False): if img.ndim == 2: is_2d = True else: is_2d = False img = _to_3d(img) w, h = _get_width_and_height(img) if h >= w: if print: print(f"Resolution: {w}, {h} -> {w}, {h // 2}") img1 = img[: h // 2, :, :] img3 = img[-h // 2 :, :, :] else: if print: print(f"Resolution: {w}, {h} -> {w // 2}, {h}") img1 = img[:, : w // 2, :] img3 = img[:, -w // 2 :, :] if is_2d: img1 = _to_2d(img1) img3 = _to_2d(img3) return img1, img3 def _combine_images_3(img, img1, img2, img3): if (img1 is None) and (img2 is None) and (img3 is None): canvas = None else: img1 = _to_2d(img1) img2 = _to_2d(img2) img3 = _to_2d(img3) canvas = _get_canvas_same_size_as_image(_to_2d(img), black=True) w, h = _get_width_and_height(img) if h >= w: canvas[: h // 2, :] = img1 canvas[h // 2 // 2 : h // 2 // 2 + h // 2, :] = np.maximum( canvas[h // 2 // 2 : h // 2 // 2 + h // 2, :], img2 ) canvas[-h // 2 :, :] = np.maximum(canvas[-h // 2 :, :], img3) else: canvas[:, : w // 2] = img1 canvas[:, w // 2 // 2 : w // 2 // 2 + w // 2] = np.maximum( canvas[:, w // 2 // 2 : w // 2 // 2 + w // 2], img2 ) canvas[:, -w // 2 :] = np.maximum(canvas[:, -w // 2 :], img3) return canvas def _combine_images_2(img, img1, img2): if (img1 is None) and (img2 is None): canvas = None else: canvas = _get_canvas_same_size_as_image(img, black=True) w, h = _get_width_and_height(img) if h >= w: canvas[: h // 2, :] = img1 canvas[-h // 2 :, :] = np.maximum(canvas[-h // 2 :, :], img2) else: canvas[:, : w // 2] = img1 canvas[:, -w // 2 :] = np.maximum(canvas[:, -w // 2 :], img2) return canvas def _rotate_90_degrees(img, counterclockwise=False): return cv2.rotate( src=img, rotateCode=cv2.ROTATE_90_COUNTERCLOCKWISE if counterclockwise else cv2.ROTATE_90_CLOCKWISE, ) def save_image_patches(img, bboxes, dir): for row in bboxes.itertuples(): patch = _crop_image( img=img, l=row.bbox_x1, t=row.bbox_y1, r=row.bbox_x2, b=row.bbox_y2, ) patch_w = row.bbox_x2 - row.bbox_x1 patch_h = row.bbox_y2 - row.bbox_y1 if patch_h > patch_w: patch = _rotate_90_degrees(patch, counterclockwise=False) save_image(img1=patch, path=Path(dir) / f"{str(row.Index).zfill(4)}.jpg") def get_minimum_area_bounding_rectangle(mask): bool = _to_2d(mask.astype("uint8")) != 0 nonzero_x = np.where(bool.any(axis=0))[0] nonzero_y = np.where(bool.any(axis=1))[0] if len(nonzero_x) != 0 and len(nonzero_y) != 0: bbox_x1 = nonzero_x[0] bbox_x2 = nonzero_x[-1] bbox_y1 = nonzero_y[0] bbox_y2 = nonzero_y[-1] return int(bbox_x1), int(bbox_y1), int(bbox_x2), int(bbox_y2) else: return 0, 0, 0, 0 def get_minimum_area_bounding_rectangle2(mask, l, t, r, b): bool = _to_2d(mask.astype("uint8")) != 0 nonzero_x = np.where(bool.any(axis=0))[0] nonzero_y = np.where(bool.any(axis=1))[0] try: new_l = nonzero_x[np.where(l < nonzero_x)][0] except Exception: new_l = l try: new_t = nonzero_y[np.where(t < nonzero_y)][0] except Exception: new_t = t try: new_r = nonzero_x[np.where(nonzero_x < r)][-1] except Exception: new_r = r try: new_b = nonzero_y[np.where(nonzero_y < b)][-1] except Exception: new_b = b return new_l, new_t, new_r, new_b def _downsample_image(img): ori_w, ori_h = _get_width_and_height(img) resized = _resize_image(img, w=ori_w // 2, h=ori_h // 2) return resized def _upsample_image(img): ori_w, ori_h = _get_width_and_height(img) resized = _resize_image(img, w=ori_w * 2, h=ori_h * 2) return resized def _get_pseudo_image(img, mask, invert=False): if invert: mask = _invert_image(mask) rows, cols = np.nonzero(_to_2d(mask)) pseudo_outer = img[rows, cols, :].reshape((1, -1, 3)) return pseudo_outer def resize_coordinates_and_image_to_fit_to_maximum_pixel_counts( bboxes, img, max_pixel_counts=1530 ): w, h = _get_width_and_height(img) ratio = min(max_pixel_counts / h, max_pixel_counts / w) if ratio < 1: for col in ["xmin", "ymin", "xmax", "ymax"]: bboxes[col] = bboxes[col].apply(lambda x: int(x * ratio)) img = cv2.resize( src=img, dsize=(int(w * ratio), int(h * ratio)), interpolation=cv2.INTER_LANCZOS4, ) return bboxes, img def get_image_patches_3(img, text_stroke_mask, mask1, mask2): splitting_mask = get_splitting_mask(text_stroke_mask) _, _, stats, _ = cv2.connectedComponentsWithStats( image=_to_2d(splitting_mask), connectivity=4 ) ls_patches = list() for xmin, ymin, width, height, px_cnt in stats[1:, :]: xmax = xmin + width ymax = ymin + height cropped_img = _crop_image(img=img, l=xmin, t=ymin, r=xmax, b=ymax) cropped_mask1 = _crop_image(img=mask1, l=xmin, t=ymin, r=xmax, b=ymax) cropped_mask2 = _crop_image(img=mask2, l=xmin, t=ymin, r=xmax, b=ymax) ls_patches.append( { "xmin": xmin, "ymin": ymin, "xmax": xmax, "ymax": ymax, "img": cropped_img, "mask1": cropped_mask1, "mask2": cropped_mask2, } ) return ls_patches def get_image_patches_2(img, mask1, mask2): splitting_mask = get_splitting_mask(mask1) _, _, stats, _ = cv2.connectedComponentsWithStats( image=_to_2d(splitting_mask), connectivity=4 ) ls_patches = list() for x1, y1, w, h, _ in stats[1:, :]: x2 = x1 + w y2 = y1 + h cropped_img = _crop_image(img=img, l=x1, t=y1, r=x2, b=y2) cropped_mask1 = _crop_image(img=mask1, l=x1, t=y1, r=x2, b=y2) cropped_mask2 = _crop_image(img=mask2, l=x1, t=y1, r=x2, b=y2) ls_patches.append( { "x1": x1, "y1": y1, "x2": x2, "y2": y2, "img": cropped_img, "mask1": cropped_mask1, "mask2": cropped_mask2, } ) return ls_patches def get_splitting_mask(text_stroke_mask): splitting_mask = _dilate_mask(text_stroke_mask, kernel_size=200) return splitting_mask def enhance_sharpness(img): """img의 선명도를 높임. 3가지 방법이 있음(sharpening filter, unsharpening mask, pil sharpening) 3 방법 중 PIL 이 가장 원본의 색변화가 적음 Args: img (_np.ndarray_): 이미지 Returns: _np.ndarray_: 결과 이미지 """ # sharpening_k = np.array([[0, -1, 0], [-1, 5, -1], [0, -1, 0]]) # hsv = cv2.cvtColor(img, cv2.COLOR_RGB2HSV) # sharpened_v = cv2.filter2D(hsv[..., 2], -1, sharpening_k) # hsv[..., 2] = sharpened_v # img_patch2 = cv2.cvtColor(hsv, cv2.COLOR_HSV2RGB) # src_ycrcb = cv2.cvtColor(img, cv2.COLOR_RGB2YCrCb) # src_f = src_ycrcb[:, :, 0].astype(np.float32) # blr = cv2.GaussianBlur(src_f, (0, 0), 2.0) # src_ycrcb[:, :, 0] = np.clip(2. * src_f - blr, 0, 255).astype(np.uint8) # img_patch3 = cv2.cvtColor(src_ycrcb, cv2.COLOR_YCrCb2RGB) pil_img = _to_pil(img) sharpness_img = ImageEnhance.Sharpness(pil_img).enhance(2) result_img = _to_array(sharpness_img) return result_img def mask2point(mask): # mask (H,W,3) 0 or 255 -> (N,2) mask = _to_2d(mask) indices = np.argwhere(mask == 255) return indices def get_corner(corner_coords): # corner_coords (N,2) each point means (y,x) cy, cx = np.mean(corner_coords, axis=0) quadrant_1 = corner_coords[(corner_coords[:, 0] < cy) & (corner_coords[:, 1] >= cx)] rt = quadrant_1[:, 1].max(), quadrant_1[:, 0].min() quadrant_2 = corner_coords[(corner_coords[:, 0] < cy) & (corner_coords[:, 1] < cx)] lt = quadrant_2[:, 1].min(), quadrant_2[:, 0].min() quadrant_3 = corner_coords[(corner_coords[:, 0] >= cy) & (corner_coords[:, 1] < cx)] lb = quadrant_3[:, 1].min(), quadrant_3[:, 0].max() quadrant_4 = corner_coords[ (corner_coords[:, 0] >= cy) & (corner_coords[:, 1] >= cx) ] rb = quadrant_4[:, 1].max(), quadrant_4[:, 0].max() return lt, rt, rb, lb def get_dst_mask(mask): mask = _to_2d(mask) dst = cv2.distanceTransform(mask, cv2.DIST_L2, 5) # 거리 값을 0 ~ 255 범위로 정규화 ---② dist_transform_normalized = cv2.normalize( dst, None, 0, 255, cv2.NORM_MINMAX, dtype=cv2.CV_8U ) return _to_3d(dist_transform_normalized) def unwarp(img, src, dst): h, w = img.shape[:2] # use cv2.getPerspectiveTransform() to get M, the transform matrix, and Minv, the inverse M = cv2.getPerspectiveTransform(src, dst) # use cv2.warpPerspective() to warp your image to a top-down view warped = cv2.warpPerspective(img, M, (w, h), flags=cv2.INTER_LINEAR) return warped, M def perspective_correction(img, src=None, vis=False, method: PC_TYPE = PC_TYPE.HARRIS): # img (H,W,C) 0~255, src=[[ltx,lty],[rtx,rty],[rbx,rby],[lbx,lby]] if src is None: gray = _to_grayscale(img) if not isinstance(method, PC_TYPE): raise ValueError( f"Invalid method: {method}. Expected one of {list(PC_TYPE)}." ) if method == PC_TYPE.HARRIS: corner = cv2.cornerHarris(gray, 5, 3, 0.04) # (H,W) value: corner score threshold = 0.005 * corner.max() corner_coords = np.argwhere(corner > threshold) elif method == PC_TYPE.EDGES_CONTOURS: blurred = cv2.GaussianBlur(gray, (5, 5), 0) edges = cv2.Canny(blurred, 50, 150) contours, _ = cv2.findContours( edges, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE ) contour_points = [] for cs in contours: c = [css for css in cs] contour_points.extend(c) corner_coords = np.array(contour_points).reshape(-1, 2)[..., ::-1] elif method == PC_TYPE.GFTT: corners = cv2.goodFeaturesToTrack( gray, 0, 0.01, 5, blockSize=3, useHarrisDetector=True, k=0.03 ) corner_coords = corners.reshape(corners.shape[0], 2)[..., ::-1] elif method == PC_TYPE.FAST: th = 50 fast = cv2.FastFeatureDetector_create(th) keypoints = fast.detect(gray) corner_coords = np.array([[kp.pt[1], kp.pt[0]] for kp in keypoints]) elif method == PC_TYPE.KAZE: # feature = cv2.SIFT_create() feature = cv2.KAZE_create() keypoints = feature.detect(gray) corner_coords = np.array([[kp.pt[1], kp.pt[0]] for kp in keypoints]) if vis: view_img = img.copy() for corner in corner_coords: y, x = corner cv2.circle(view_img, (int(x), int(y)), 3, (255, 0, 0), 2) save_image(view_img, path="vis_corner.png") lt, rt, rb, lb = get_corner(corner_coords) src = np.float32([lt, rt, rb, lb]) dst = np.float32( [ (0, 0), (img.shape[1] - 1, 0), (img.shape[1] - 1, img.shape[0] - 1), (0, img.shape[0] - 1), ] ) result, M = unwarp(img, src, dst) save_image(result, path="cv_result.png") return result if __name__ == "__main__": image_url = "https://d2reotjpatzlok.cloudfront.net/qr-place/item/QR_20240726_2441_2_LZ1ZFCT38HN7PPCEZR8H.jpg" img, imgdata, format = load_image(image_url, with_byte=True, with_format=True) perspective_correction(img, vis=True)