import multiprocessing as mp import os import time import cv2 import torch import numpy as np import json import csv from shapely.geometry import Point, Polygon from src.sts.demo.genericmask import GenericMask from detectron2.config import get_cfg from detectron2.data.detection_utils import read_image from detectron2.structures import BitMasks, Boxes, PolygonMasks, RotatedBoxes from src.sts.demo.predictor import VisualizationDemo # constants WINDOW_NAME = "COCO detections" dictionary = "aàáạảãâầấậẩẫăằắặẳẵAÀÁẠẢÃĂẰẮẶẲẴÂẦẤẬẨẪeèéẹẻẽêềếệểễEÈÉẸẺẼÊỀẾỆỂỄoòóọỏõôồốộổỗơờớợởỡOÒÓỌỎÕÔỒỐỘỔỖƠỜỚỢỞỠiìíịỉĩIÌÍỊỈĨuùúụủũưừứựửữƯỪỨỰỬỮUÙÚỤỦŨyỳýỵỷỹYỲÝỴỶỸ" def make_groups(): groups = [] i = 0 while i < len(dictionary) - 5: group = [c for c in dictionary[i: i + 6]] i += 6 groups.append(group) return groups groups = make_groups() TONES = ["", "ˋ", "ˊ", "﹒", "ˀ", "˜"] SOURCES = ["ă", "â", "Ă", "Â", "ê", "Ê", "ô", "ơ", "Ô", "Ơ", "ư", "Ư", "Đ", "đ"] TARGETS = ["aˇ", "aˆ", "Aˇ", "Aˆ", "eˆ", "Eˆ", "oˆ", "o˒", "Oˆ", "O˒", "u˒", "U˒", "D‑", "d‑"] def parse_tone(word): res = "" tone = "" for char in word: if char in dictionary: for group in groups: if char in group: if tone == "": tone = TONES[group.index(char)] res += group[0] else: res += char res += tone return res def full_parse(word): word = parse_tone(word) res = "" for char in word: if char in SOURCES: res += TARGETS[SOURCES.index(char)] else: res += char return res def correct_tone_position(word): word = word[:-1] if len(word) < 2: pass first_ord_char = "" second_order_char = "" for char in word: for group in groups: if char in group: second_order_char = first_ord_char first_ord_char = group[0] if word[-1] == first_ord_char and second_order_char != "": pair_chars = ["qu", "Qu", "qU", "QU", "gi", "Gi", "gI", "GI"] for pair in pair_chars: if pair in word and second_order_char in ["u", "U", "i", "I"]: return first_ord_char return second_order_char return first_ord_char def decoder(recognition): for char in TARGETS: recognition = recognition.replace(char, SOURCES[TARGETS.index(char)]) if len(recognition) < 1: return recognition if recognition[-1] in TONES: if len(recognition) < 2: return recognition replace_char = correct_tone_position(recognition) tone = recognition[-1] recognition = recognition[:-1] for group in groups: if replace_char in group: recognition = recognition.replace( replace_char, group[TONES.index(tone)]) return recognition def decode_recognition(rec): CTLABELS = [" ", "!", '"', "#", "$", "%", "&", "'", "(", ")", "*", "+", ",", "-", ".", "/", "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", ":", ";", "<", "=", ">", "?", "@", "A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z", "[", "\\", "]", "^", "_", "`", "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z", "{", "|", "}", "~", "ˋ", "ˊ", "﹒", "ˀ", "˜", "ˇ", "ˆ", "˒", "‑", ] last_char = False s = '' for c in rec: c = int(c) if 0 < c < 107: s += CTLABELS[c-1] last_char = c elif c == 0: s += u'' else: last_char = False if len(s) == 0: s = ' ' s = decoder(s) return s def get_mini_boxes(contour, max_x, min_x, thr): bounding_box = cv2.minAreaRect(contour) # print('bbox', bounding_box) bounding_box = list(bounding_box) bounding_box[1] = list(bounding_box[1]) if bounding_box[2] <= 45: bounding_box[1][0] = bounding_box[1][0]*thr else: bounding_box[1][1] = bounding_box[1][1]*thr bounding_box[1] = tuple(bounding_box[1]) bounding_box = tuple(bounding_box) points = sorted(list(cv2.boxPoints(bounding_box)), key=lambda x: x[0]) index_1, index_2, index_3, index_4 = 0, 1, 2, 3 if points[1][1] > points[0][1]: index_1 = 0 index_4 = 1 else: index_1 = 1 index_4 = 0 if points[3][1] > points[2][1]: index_2 = 2 index_3 = 3 else: index_2 = 3 index_3 = 2 # p1 = np.array([min_x, points[index_1][1]]) # p2 = np.array([max_x, points[index_2][1]]) # p3 = np.array([max_x, points[index_3][1]]) # p4 = np.array([min_x, points[index_4][1]]) # box = [p1, p2, p3, p4] box = [ points[index_1], points[index_2], points[index_3], points[index_4] ] return box def get_mini_boxes_1(contour): bounding_box = cv2.minAreaRect(contour) points = sorted(list(cv2.boxPoints(bounding_box)), key=lambda x: x[0]) index_1, index_2, index_3, index_4 = 0, 1, 2, 3 if points[1][1] > points[0][1]: index_1 = 0 index_4 = 1 else: index_1 = 1 index_4 = 0 if points[3][1] > points[2][1]: index_2 = 2 index_3 = 3 else: index_2 = 3 index_3 = 2 box = [ points[index_1], points[index_2], points[index_3], points[index_4] ] return box def calculate_iou(box_1, box_2): # print(box_1, box_2) poly_1 = Polygon(box_1) poly_2 = Polygon(box_2) # print(poly_1.union(poly_2).area) try: iou = poly_1.intersection(poly_2).area / poly_1.union(poly_2).area except: iou = 0 return iou def merge_boxes(boxes, recs, trh): dict_bbox = {} x = 0 for i in range(len(boxes)-2): tmp_box = [i] db_copy1 = dict_bbox.copy() for key, value in db_copy1.items(): if i in value: tmp_box = db_copy1[key] del dict_bbox[key] break for j in range(i+1, len(boxes)-1): ba = cv2.minAreaRect(boxes[i].reshape(-1, 1, 2).astype(int)) bb = cv2.minAreaRect(boxes[j].reshape(-1, 1, 2).astype(int)) iou = calculate_iou(boxes[i], boxes[j]) # scr = min(ba[1][0], bb[1][0])/max(ba[1][0], bb[1][0]) if iou > trh: db_copy = dict_bbox.copy() check = False for key, value in db_copy.items(): if i in value: check = True tmp_box.remove(i) tmp_box.extend(db_copy[key]) del dict_bbox[key] break if check == False: tmp_box.append(j) dict_bbox[x] = tmp_box x += 1 recs_out = [] db_clone = {} for key, value in dict_bbox.items(): db_clone[key] = list(set(value)) for key, value in db_clone.items(): tmp_str = [] for i in value: tmp_str.append([recs[i], cv2.minAreaRect( boxes[i].reshape(-1, 1, 2).astype(int))[0][0]]) recs_out.append(tmp_str) return db_clone, recs_out def combine(dict_box, h, w, boxes): bboxs = [] for key, db in dict_box.items(): list_box = [] for j in db: list_box.append(boxes[j]) h1 = h h2 = 0 h3 = 0 h4 = h w1 = w w2 = w w3 = 0 w4 = 0 for box in list_box: if box[0, 0] < h1: h1 = box[0, 0] if box[1, 0] > h2: h2 = box[1, 0] if box[2, 0] > h3: h3 = box[2, 0] if box[3, 0] < h4: h4 = box[3, 0] if box[0, 1] < w1: w1 = box[0, 1] if box[1, 1] < w2: w2 = box[1, 1] if box[2, 1] > w3: w3 = box[2, 1] if box[3, 1] > w4: w4 = box[3, 1] tmp = np.array([[h1, w1], [h2, w2], [h3, w3], [h4, w4]]) bboxs.append(tmp.astype(np.int16)) return bboxs def rec_to_str(recs): rec_1 = [] for rec in recs: i = sorted(rec, key=lambda x: x[1]) i = " ".join(decoder(item[0]) for item in i) rec_1.append(i) return rec_1 def scale_points(mask): mask_tmp = mask.copy() for i in range(2, len(mask_tmp)-2): for j in range(2, len(mask_tmp[i])-2): if mask_tmp[i][j] != 0: mask[i-2][j-2] = mask[i-2][j-1] = mask[i-2][j] = mask[i-2][j+1] = mask[i-2][j+2] = mask[i-1][j-2] = mask[i-1][j-1] = mask[i-1][j] = mask[i-1][j+1] = mask[i-1][j+2] = mask[i][j-2] = mask[i][j-1] = mask[i][j + 1] = mask[i][j+2] = mask[i+1][j-2] = mask[i+1][j-1] = mask[i+1][j] = mask[i+1][j+1] = mask[i+1][j+2] = mask[i+2][j-2] = mask[i+2][j-1] = mask[i+2][j] = mask[i+2][j+1] = mask[i+2][j+2] = mask_tmp[i][j] return mask def convert_boxes(boxes): if isinstance(boxes, Boxes) or isinstance(boxes, RotatedBoxes): return boxes.tensor.numpy() else: return np.asarray(boxes) def convert_masks(masks_or_polygons, h, w): m = masks_or_polygons if isinstance(m, PolygonMasks): m = m.polygons if isinstance(m, BitMasks): m = m.tensor.numpy() if isinstance(m, torch.Tensor): m = m.numpy() ret = [] for x in m: if isinstance(x, GenericMask): ret.append(x) else: ret.append(GenericMask(x, h, w)) return ret def setup_cfg(args): # load config from file and command-line arguments cfg = get_cfg() from projects.SWINTS.swints import add_SWINTS_config add_SWINTS_config(cfg) # ----- cfg.merge_from_file(args["config_file"]) cfg.merge_from_list(args["opts"]) # Set score_threshold for builtin models cfg.MODEL.RETINANET.SCORE_THRESH_TEST = args["confidence_threshold"] cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = args["confidence_threshold"] cfg.MODEL.PANOPTIC_FPN.COMBINE.INSTANCES_CONFIDENCE_THRESH = args["confidence_threshold"] cfg.freeze() return cfg def handle_sts(image, output, seg_boxes, seg_ids): args_ = { "config_file": "src/sts/projects/SWINTS/configs/SWINTS-swin-finetune-vintext.yaml", "confidence_threshold": 0.5, "opts": ["MODEL.WEIGHTS", "./checkpoints/sts/sts.pth"] } mp.set_start_method("spawn", force=True) cfg = setup_cfg(args_) demo = VisualizationDemo(cfg) # if segment: # path_segment = segment box_sign = [] if seg_boxes: for seg_box in seg_boxes: x1, y1, x2, y2 = seg_box x1 = np.int(x1) y1 = np.int(y1) x2 = np.int(x2) y2 = np.int(y2) box_sign.append(np.array([[x1,y1], [x2, y1], [x2, y2], [x1, y2]])) # print(box_sign) if image: # use PIL, to be consistent with evaluation img = read_image(image, format="BGR") h, w, _ = img.shape # start_time = time.time() predictions, visualized_output = demo.run_on_image( img, args_["confidence_threshold"], image) # time_1 = time.time()-start_time # mask = np.loadtxt(path_segment, dtype=np.int32) # time_2 = time.time()-time_1 # mmax = np.amax(mask) # if mmax == 0: # mmax = 1 # mask = scale_points(mask) # time_3 = time.time()-time_2 # outs = cv2.findContours( # (mask * int(255/mmax)).astype(np.uint8), cv2.RETR_LIST, cv2.CHAIN_APPROX_SIMPLE) # if len(outs) == 3: # img, contours, _ = outs[0], outs[1], outs[2] # elif len(outs) == 2: # contours, _ = outs[0], outs[1] # for contour in contours: # points = get_mini_boxes_1(contour) # points = np.array(points) # box_sign.append(points) # time_4 = time.time()-time_3 dict_box_sign = {} dict_box_sign_out = {} dict_rec_sign = {} dict_rec_sign_out = {} in_signboard = 0 # full_box = 0 for i in range(len(box_sign)): dict_box_sign[seg_ids[i]] = [] dict_box_sign_out[seg_ids[i]] = [] dict_rec_sign[seg_ids[i]] = [] dict_rec_sign_out[seg_ids[i]] = [] list_limit = [] print(dict_rec_sign_out) try: for sig in box_sign: # print(sig) max_x = max(sig[0][0], sig[1][0], sig[2][0], sig[3][0]) min_x = min(sig[0][0], sig[1][0], sig[2][0], sig[3][0]) list_limit.append([max_x, min_x]) if "instances" in predictions: instances = predictions["instances"].to(torch.device("cpu")) # print("instance",type(instances)) instances = instances[instances.scores > args_["confidence_threshold"]] boxes = instances.pred_boxes if instances.has( "pred_boxes") else None scores = instances.scores if instances.has("scores") else None # classes = instances.pred_classes if instances.has("pred_classes") else None recs = instances.pred_rec if instances.has( "pred_rec") else None # rec_score = instances.pred_rec_score if instances.has("pred_rec_score") else None masks = np.asarray(instances.pred_masks) masks = [GenericMask(x, h, w) for x in masks] masks = convert_masks(masks, h, w) polys = [] for mask in masks: polys.append(np.concatenate( mask.polygons).reshape(-1, 2).tolist()) # text box into signboard box for bezier, rec, score in zip(polys, recs, scores): # print(bezier) if score >= 0.5: bezier = np.array( bezier, dtype='int').reshape(-1, 1, 2) bounding_box = cv2.minAreaRect(bezier) midpoint = Point(bounding_box[0]) for i in range(len(box_sign)): poly = Polygon(box_sign[i]) if midpoint.within(poly): in_signboard += 1 dict_box_sign[seg_ids[i]].append(bezier) dict_rec_sign[seg_ids[i]].append( full_parse(decode_recognition(rec))) # time_5 = time.time()-time_4 for i in range(len(dict_box_sign)): boxes = [] reces = [] for bezier, rec in zip(dict_box_sign[seg_ids[i]], dict_rec_sign[seg_ids[i]]): unclip_ratio = 1.0 bezier = bezier.reshape(-1, 1, 2) points = get_mini_boxes( bezier, list_limit[i][0], list_limit[i][1], 3) box = np.array(points, dtype=np.int16) box[:, 0] = np.clip(np.round(box[:, 0]), 0, w) box[:, 1] = np.clip(np.round(box[:, 1]), 0, h) boxes.append(box.astype(np.int16)) reces.append(rec) dict_box, rec_out = merge_boxes(boxes, reces, 0.1) rec_outs = rec_to_str(rec_out) bboxs = combine(dict_box, h, w, boxes) # print(rec_outs) dict_box_sign_out[seg_ids[i]] = bboxs dict_rec_sign_out[seg_ids[i]] = rec_outs # time_6 = time.time()-time_5 # Visualize image after merge boxes img_draw = cv2.imread(image) # print(dict_box_sign_out) # for key, values in dict_box_sign_out: # for value in values: # pts = value # x, y = pts[0][0], pts[0][1] # pts = np.array(pts, np.int32).reshape((-1, 1, 2)) # isClosed = True # color = (255, 0, 0) # thickness = 2 # img_draw = cv2.polylines( # img_draw, [pts], isClosed, color, thickness) # cv2.putText(img_draw, , ( # x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (36, 255, 12), 2) # print(time_1, time_2, time_3, time_4, time_5, time_6) if output: tags = ["Tag", "Signboard", "Frame", "Name", "Number", "Street", "Ward", "City_District", "City"] name = os.path.basename(image).split(".")[0] for key, values in dict_rec_sign_out.items(): labels = ["Value", key, name, values, "", "", "", "", ""] if os.path.isfile(os.path.join(output, str(key) + "_" + str(name) + '.csv')): os.remove(os.path.join(output, str(key) + "_" + str(name) + '.csv')) with open(os.path.join(output, str(key) + "_" + str(name) + '.csv'), "a", encoding='utf-8') as f: writer = csv.writer(f) for i in range(len(tags)): writer.writerow([tags[i], labels[i]]) # txt_name1 = str(image.split("/")[-1].split(".")[0]) + '_box.txt' # txt_name2 = str(image.split("/")[-1].split(".")[0]) + '_text.txt' # img_name = str(image.split("/")[-1].split(".")[0]) + '.jpg' # output_path_box = os.path.join(output, txt_name1) # output_path_text = os.path.join(output, txt_name2) # output_path_image = os.path.join(output, img_name) # with open(output_path_box, 'w+', encoding='utf-8') as output_file_box: # for index, box in dict_box_sign_out.items(): # arr = [] # for box_ in box: # arr.append(box_.tolist()) # dict_box_sign_out[index] = arr # json.dump(dict_box_sign_out, output_file_box, # ensure_ascii=False) # with open(output_path_text, 'w+', encoding='utf-8') as output_file_box: # json.dump(dict_rec_sign_out, output_file_box, # ensure_ascii=False) # cv2.imwrite(output_path_image, img_draw) except: pass return dict_box_sign_out, dict_rec_sign_out