# title: create earthwwork train dataset # author: Taewook Kang # date: 2024.3.27 # description: create earthwork train dataset # license: MIT # version # 0.1. 2024.3.27. create file # import os, math, argparse, json, re, traceback, numpy as np, pandas as pd, trimesh, laspy, shutil import logging, matplotlib.pyplot as plt, shapely from shapely.geometry import Polygon, LineString from scipy.spatial import distance from tqdm import trange, tqdm from math import pi logging.basicConfig(level=logging.DEBUG, filename='logs.txt', format='%(asctime)s %(levelname)s %(message)s', datefmt='%H:%M:%S') logger = logging.getLogger("prep") _precision = 0.00001 def get_bbox(polyline): polyline_np = np.array(polyline) xmin, ymin = np.amin(polyline_np, axis=0) xmax, ymax = np.amax(polyline_np, axis=0) return (xmin, ymin, xmax, ymax) def get_center_point(pline): if len(pline) == 0: return (0, 0) xs = [p[0] for p in pline] ys = [p[1] for p in pline] return (sum(xs) / len(pline), sum(ys) / len(pline)) def intersect_line(line1, line2): (x1, y1), (x2, y2) = line1 (x3, y3), (x4, y4) = line2 denominator = (x1 - x2) * (y3 - y4) - (y1 - y2) * (x3 - x4) if denominator == 0: return None # lines are parallel x = ((x1 * y2 - y1 * x2) * (x3 - x4) - (x1 - x2) * (x3 * y4 - y3 * x4)) / denominator y = ((x1 * y2 - y1 * x2) * (y3 - y4) - (y1 - y2) * (x3 * y4 - y3 * x4)) / denominator # check (x, y) in line1 and line2 if x < min(x1, x2) or x > max(x1, x2) or x < min(x3, x4) or x > max(x3, x4): return None return (x, y) def get_positions_pline(base_pline, target_pline): target_pos_marks = [] for i in range(len(target_pline)): target = [target_pline[i], (target_pline[i][0], target_pline[i][1] + 1e+10)] # vertical line to check below pos = 0.0 for j in range(len(base_pline) - 1): base = [base_pline[j], base_pline[j + 1]] intersect = intersect_line(base, target) if intersect == None: continue if equal(intersect[1], target[0][1]): pos = 0.0 break pos = -1.0 if intersect[1] > target[0][1] else 1.0 break target_pos_marks.append(pos) return target_pos_marks def get_below_pline(base_pline, target_pline): pos_marks = get_positions_pline(base_pline, target_pline) average = sum(pos_marks) / len(pos_marks) return average < 0.0 def get_geometry(xsec, label): for geom in xsec['geom']: if geom['label'] == label: return geom return None def is_point_in_rect(point1, point2, perp): return is_point_in_rectangle(point1[0], point1[1], point2[0], point2[1], perp[0], perp[1]) def is_point_in_rectangle(x1, y1, x2, y2, x, y): # Ensure that x1 <= x2 and y1 <= y2 x1, x2 = min(x1, x2), max(x1, x2) y1, y2 = min(y1, y2), max(y1, y2) # Check if (x, y) is within the rectangle return x1 <= x <= x2 and y1 <= y <= y2 def sign_distance(a, b, c, p): d = math.sqrt(a*a + b*b) if d == 0.0: lp = 0.0 else: lp = (a * p[0] + b * p[1] + c) / d return lp def equal(a, b): return abs(a - b) < _precision def equal_point(p1, p2): return equal(p1[0], p2[0]) and equal(p1[1], p2[1]) def get_angle(x1, y1, x2, y2): pi = math.acos(-1.0) # Caculate the quadrant of the line. dx = x2 - x1 dy = y2 - y1 # Calculate the angle in radians for lines in the left and right quadrants if dx == 0 and dy == 0: return -1.0 if dy < 0 and dx == 0: angle_radius = pi + pi / 2 elif dy > 0 and dx == 0: angle_radius = pi / 2 else: angle_radius = math.atan(dy / dx) # Adjust the angle for different quadrants if dy >= 0 and dx > 0: pass if dy < 0 and dx > 0: angle_radius += 2 * pi elif dx < 0: angle_radius += pi return angle_radius def line_coefficients(point1, point2): x1, y1 = point1 x2, y2 = point2 A = y2 - y1 B = x1 - x2 C = x2*y1 - x1*y2 return A, B, C def sign_point_on_line(point1, point2, new_point): if equal(point1[0], new_point[0]) and equal(point1[1], new_point[1]): return 0.0 if equal(point2[0], new_point[0]) and equal(point2[1], new_point[1]): return 0.0 line_A, line_B, line_C = line_coefficients(point1, point2) x, y = new_point value = line_A * x + line_B * y + line_C if math.fabs(value) < _precision: return 0.0 elif value > 0.0: return 1.0 return -1.0 def sign_distance_on_line(line_point1, line_point2, point): direction = sign_point_on_line(line_point1, line_point2, point) if direction == 0: return 0.0 if math.fabs(line_point1[0] - line_point2[0]) < _precision and math.fabs(line_point1[1] <= line_point2[1]) < _precision: return 0.0 # TBD. bug x = point[0] y = point[1] x1 = line_point1[0] y1 = line_point1[1] x2 = line_point2[0] y2 = line_point2[1] if x1 <= x2: a = 1 b = 0 c = -x1 else: m = (y2 - y1) / (x2 - x1) a = -m b = 1 c = -y1 + (m * x1) dist = abs(a * x + b * y + c) / math.sqrt(a * a + b * b) dist *= float(direction) return dist def is_point_on_line(point1, point2, perp): if is_point_in_rect(point1, point2, perp) == False: return False direction = sign_point_on_line(point1, point2, perp) if math.fabs(direction) < _precision: return True return False def is_overlap_line(line, part_seg): p1, p2 = line p3, p4 = part_seg f1 = is_point_on_line(p1, p2, p3) f2 = is_point_on_line(p1, p2, p4) if (f1 or f2) and f1 != f2: # dangling point is not overlap. if f1 and (equal_point(p1, p3) or equal_point(p2, p3)): return False if f2 and (equal_point(p1, p4) or equal_point(p2, p4)): return False return f1 or f2 def is_on_pline(polyline, base_line): p1 = base_line[0] p2 = base_line[1] for i in range(len(polyline) - 1): p3 = polyline[i] p4 = polyline[i + 1] if is_overlap_line((p1, p2), (p3, p4)) or is_overlap_line((p3, p4), (p1, p2)): return True return False def get_match_line_labels(xsec, base_geom, base_line): labels = [] for geom in xsec['geom']: if geom == base_geom: continue geom_label = geom['label'] base_label = base_geom['label'] if geom_label == base_label: continue closed = geom['closed'] if closed == True: # only polyline is considered continue if geom_label == 'center': continue polyline = geom['polyline'] if is_on_pline(polyline, base_line): labels.append(geom['label']) return labels def get_seq_feature_tokens(xsec, geom, closed_type): polyline = geom['polyline'] closed = geom['closed'] if closed != closed_type: return [] lines = [] for i in range(len(polyline) - 1): line = (polyline[i], polyline[i + 1]) lines.append(line) geom_tokens = [] for line in lines: labels = get_match_line_labels(xsec, geom, line) if len(labels) == 0: continue # if len(labels) == 1: # geom_tokens.append(labels[0]) # else: geom_tokens.extend(labels) return geom_tokens def translate_geometry(xsec, cp): for geom in xsec['geom']: polyline = geom['polyline'] geom['polyline'] = [(p[0] - cp[0], p[1] - cp[1]) for p in polyline] return xsec def is_closed(polyline): if equal_point(polyline[0], polyline[-1]): return True return False def summery_feature(features): sum_features = [] if len(features) == 0: return sum_features index = 0 while index < len(features): f = features[index] sum_feature = f if type(f) == list: sum_feature = summery_feature(f) if len(sum_feature) == 1: sum_feature = sum_feature[0] elif type(f) == str: label = f # find last index of same level in features array with label last_index = index for i in range(index + 1, len(features)): if type(features[i]) == str: if features[i] == label: last_index = i else: break else: break if last_index != index: sum_feature = (f'{f}({last_index - index + 1})') index = last_index else: pass sum_features.append(sum_feature) index += 1 return sum_features def get_intersection_count(xsec, base_geom, target_label): pave_top = get_geometry(xsec, 'pave_surface') if pave_top == None: return 0 polyline = base_geom['polyline'] polygon = Polygon(polyline) base_p1 = polygon.centroid base_p2 = (base_p1.x, base_p1.y + 1e+10) vertical_line = LineString([base_p1, base_p2]) count = 0 for target_geom in xsec['geom']: if base_geom == target_geom: continue label = target_geom['label'] if re.search(target_label, label) == None: continue # check intersection target_polyline = target_geom['polyline'] polyline = LineString(target_polyline) ip = shapely.intersection(polyline, vertical_line) # https://shapely.readthedocs.io/en/stable/reference/shapely.intersection.html if ip.is_empty: continue count += 1 return count def update_xsection_feature(xsec): gnd_geom = get_geometry(xsec, 'ground') if gnd_geom == None: return None center = get_geometry(xsec, 'center') if center == None or 'polyline' not in center: return None cp = get_center_point(center['polyline']) xsec = translate_geometry(xsec, cp) station = xsec['station'] index = 0 while index < len(xsec['geom']): geom = xsec['geom'][index] label = geom['label'] polyline = geom['polyline'] closed = geom['closed'] if len(polyline) <= 2 or closed == False: index += 1 continue pt1 = polyline[0] pt2 = polyline[-1] if equal_point(pt1, pt2) == False: # closed polyline polyline.append(pt1) # noise filtering polygon = Polygon(polyline) # calculate area of polyline as polygon if math.fabs(polygon.area) < _precision: xsec['geom'].pop(index) # remove index element in xsec['geom'] continue if station == '1+660.00000' and label == 'cut_ditch': label = 'cut_ditch' # processing if get_below_pline(gnd_geom['polyline'], polyline): geom['earthwork_feature'].append('below') else: geom['earthwork_feature'].append('above') if re.search('pave_.*', label): pave_int_count = get_intersection_count(xsec, geom, 'pave_.*') geom['earthwork_feature'].append(f'pave_int({pave_int_count})') tokens = get_seq_feature_tokens(xsec, geom, True) if len(tokens) == 1: geom['earthwork_feature'].append(tokens[0]) else: geom['earthwork_feature'].extend(tokens) geom['earthwork_feature'] = summery_feature(geom['earthwork_feature']) # print(f'{station}. {label} feature: {geom["earthwork_feature"]}') logger.debug(f'{station}. {label} feature: {geom["earthwork_feature"]}') index += 1 return xsec def update_xsections_feature(xsections): # update closed polygon for xsec in xsections: for geom in xsec['geom']: label = geom['label'] polyline = geom['polyline'] if len(polyline) < 2: continue closed = is_closed(polyline) if closed == False: # exception case, pavement closed = False if re.search('pave_layer.*', label) == None else True geom['closed'] = closed # update features out_xsections = [] for xsec in xsections: out_xsec = update_xsection_feature(xsec) if out_xsec == None: continue out_xsections.append(out_xsec) return out_xsections def main(): parser = argparse.ArgumentParser(description='create earthwork train dataset') parser.add_argument('--input', type=str, default='output/', help='input folder') parser.add_argument('--output', type=str, default='dataset/', help='output folder') args = parser.parse_args() try: file_names = os.listdir(args.input) for file_name in tqdm(file_names): if file_name.endswith('.json') == False: continue print(f'processing {file_name}') data = None with open(os.path.join(args.input, file_name), 'r') as f: data = json.load(f) out_xsections = update_xsections_feature(data) output_file = os.path.join(args.output, file_name) with open(output_file, 'w') as f: json.dump(out_xsections, f, indent=4) except Exception as e: print(f'error: {e}') traceback.print_exc() if __name__ == '__main__': main()