|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os, math, argparse, json, re, traceback, numpy as np, pandas as pd, trimesh, laspy, shutil
|
|
import logging, matplotlib.pyplot as plt, shapely
|
|
from shapely.geometry import Polygon, LineString
|
|
from scipy.spatial import distance
|
|
from tqdm import trange, tqdm
|
|
from math import pi
|
|
|
|
logging.basicConfig(level=logging.DEBUG, filename='logs.txt',
|
|
format='%(asctime)s %(levelname)s %(message)s',
|
|
datefmt='%H:%M:%S')
|
|
logger = logging.getLogger("prep")
|
|
|
|
_precision = 0.00001
|
|
|
|
def get_bbox(polyline):
|
|
polyline_np = np.array(polyline)
|
|
xmin, ymin = np.amin(polyline_np, axis=0)
|
|
xmax, ymax = np.amax(polyline_np, axis=0)
|
|
return (xmin, ymin, xmax, ymax)
|
|
|
|
def get_center_point(pline):
|
|
if len(pline) == 0:
|
|
return (0, 0)
|
|
xs = [p[0] for p in pline]
|
|
ys = [p[1] for p in pline]
|
|
return (sum(xs) / len(pline), sum(ys) / len(pline))
|
|
|
|
def intersect_line(line1, line2):
|
|
(x1, y1), (x2, y2) = line1
|
|
(x3, y3), (x4, y4) = line2
|
|
|
|
denominator = (x1 - x2) * (y3 - y4) - (y1 - y2) * (x3 - x4)
|
|
if denominator == 0:
|
|
return None
|
|
|
|
x = ((x1 * y2 - y1 * x2) * (x3 - x4) - (x1 - x2) * (x3 * y4 - y3 * x4)) / denominator
|
|
y = ((x1 * y2 - y1 * x2) * (y3 - y4) - (y1 - y2) * (x3 * y4 - y3 * x4)) / denominator
|
|
|
|
|
|
if x < min(x1, x2) or x > max(x1, x2) or x < min(x3, x4) or x > max(x3, x4):
|
|
return None
|
|
|
|
return (x, y)
|
|
|
|
def get_positions_pline(base_pline, target_pline):
|
|
target_pos_marks = []
|
|
for i in range(len(target_pline)):
|
|
target = [target_pline[i], (target_pline[i][0], target_pline[i][1] + 1e+10)]
|
|
pos = 0.0
|
|
for j in range(len(base_pline) - 1):
|
|
base = [base_pline[j], base_pline[j + 1]]
|
|
intersect = intersect_line(base, target)
|
|
if intersect == None:
|
|
continue
|
|
|
|
if equal(intersect[1], target[0][1]):
|
|
pos = 0.0
|
|
break
|
|
|
|
pos = -1.0 if intersect[1] > target[0][1] else 1.0
|
|
break
|
|
target_pos_marks.append(pos)
|
|
|
|
return target_pos_marks
|
|
|
|
def get_below_pline(base_pline, target_pline):
|
|
pos_marks = get_positions_pline(base_pline, target_pline)
|
|
average = sum(pos_marks) / len(pos_marks)
|
|
return average < 0.0
|
|
|
|
def get_geometry(xsec, label):
|
|
for geom in xsec['geom']:
|
|
if geom['label'] == label:
|
|
return geom
|
|
return None
|
|
|
|
def is_point_in_rect(point1, point2, perp):
|
|
return is_point_in_rectangle(point1[0], point1[1], point2[0], point2[1], perp[0], perp[1])
|
|
|
|
def is_point_in_rectangle(x1, y1, x2, y2, x, y):
|
|
|
|
x1, x2 = min(x1, x2), max(x1, x2)
|
|
y1, y2 = min(y1, y2), max(y1, y2)
|
|
|
|
|
|
return x1 <= x <= x2 and y1 <= y <= y2
|
|
|
|
def sign_distance(a, b, c, p):
|
|
d = math.sqrt(a*a + b*b)
|
|
if d == 0.0:
|
|
lp = 0.0
|
|
else:
|
|
lp = (a * p[0] + b * p[1] + c) / d
|
|
return lp
|
|
|
|
def equal(a, b):
|
|
return abs(a - b) < _precision
|
|
|
|
def equal_point(p1, p2):
|
|
return equal(p1[0], p2[0]) and equal(p1[1], p2[1])
|
|
|
|
def get_angle(x1, y1, x2, y2):
|
|
pi = math.acos(-1.0)
|
|
|
|
|
|
dx = x2 - x1
|
|
dy = y2 - y1
|
|
|
|
|
|
if dx == 0 and dy == 0:
|
|
return -1.0
|
|
if dy < 0 and dx == 0:
|
|
angle_radius = pi + pi / 2
|
|
elif dy > 0 and dx == 0:
|
|
angle_radius = pi / 2
|
|
else:
|
|
angle_radius = math.atan(dy / dx)
|
|
|
|
|
|
if dy >= 0 and dx > 0:
|
|
pass
|
|
if dy < 0 and dx > 0:
|
|
angle_radius += 2 * pi
|
|
elif dx < 0:
|
|
angle_radius += pi
|
|
|
|
return angle_radius
|
|
|
|
def line_coefficients(point1, point2):
|
|
x1, y1 = point1
|
|
x2, y2 = point2
|
|
|
|
A = y2 - y1
|
|
B = x1 - x2
|
|
C = x2*y1 - x1*y2
|
|
|
|
return A, B, C
|
|
|
|
def sign_point_on_line(point1, point2, new_point):
|
|
if equal(point1[0], new_point[0]) and equal(point1[1], new_point[1]):
|
|
return 0.0
|
|
if equal(point2[0], new_point[0]) and equal(point2[1], new_point[1]):
|
|
return 0.0
|
|
|
|
line_A, line_B, line_C = line_coefficients(point1, point2)
|
|
x, y = new_point
|
|
value = line_A * x + line_B * y + line_C
|
|
if math.fabs(value) < _precision:
|
|
return 0.0
|
|
elif value > 0.0:
|
|
return 1.0
|
|
return -1.0
|
|
|
|
def sign_distance_on_line(line_point1, line_point2, point):
|
|
direction = sign_point_on_line(line_point1, line_point2, point)
|
|
if direction == 0:
|
|
return 0.0
|
|
|
|
if math.fabs(line_point1[0] - line_point2[0]) < _precision and math.fabs(line_point1[1] <= line_point2[1]) < _precision:
|
|
return 0.0
|
|
|
|
|
|
x = point[0]
|
|
y = point[1]
|
|
x1 = line_point1[0]
|
|
y1 = line_point1[1]
|
|
x2 = line_point2[0]
|
|
y2 = line_point2[1]
|
|
|
|
if x1 <= x2:
|
|
a = 1
|
|
b = 0
|
|
c = -x1
|
|
else:
|
|
m = (y2 - y1) / (x2 - x1)
|
|
a = -m
|
|
b = 1
|
|
c = -y1 + (m * x1)
|
|
|
|
dist = abs(a * x + b * y + c) / math.sqrt(a * a + b * b)
|
|
dist *= float(direction)
|
|
|
|
return dist
|
|
|
|
def is_point_on_line(point1, point2, perp):
|
|
if is_point_in_rect(point1, point2, perp) == False:
|
|
return False
|
|
direction = sign_point_on_line(point1, point2, perp)
|
|
if math.fabs(direction) < _precision:
|
|
return True
|
|
return False
|
|
|
|
def is_overlap_line(line, part_seg):
|
|
p1, p2 = line
|
|
p3, p4 = part_seg
|
|
|
|
f1 = is_point_on_line(p1, p2, p3)
|
|
f2 = is_point_on_line(p1, p2, p4)
|
|
|
|
if (f1 or f2) and f1 != f2:
|
|
if f1 and (equal_point(p1, p3) or equal_point(p2, p3)):
|
|
return False
|
|
if f2 and (equal_point(p1, p4) or equal_point(p2, p4)):
|
|
return False
|
|
|
|
return f1 or f2
|
|
|
|
def is_on_pline(polyline, base_line):
|
|
p1 = base_line[0]
|
|
p2 = base_line[1]
|
|
|
|
for i in range(len(polyline) - 1):
|
|
p3 = polyline[i]
|
|
p4 = polyline[i + 1]
|
|
if is_overlap_line((p1, p2), (p3, p4)) or is_overlap_line((p3, p4), (p1, p2)):
|
|
return True
|
|
|
|
return False
|
|
|
|
def get_match_line_labels(xsec, base_geom, base_line):
|
|
labels = []
|
|
for geom in xsec['geom']:
|
|
if geom == base_geom:
|
|
continue
|
|
geom_label = geom['label']
|
|
base_label = base_geom['label']
|
|
if geom_label == base_label:
|
|
continue
|
|
closed = geom['closed']
|
|
if closed == True:
|
|
continue
|
|
if geom_label == 'center':
|
|
continue
|
|
|
|
polyline = geom['polyline']
|
|
if is_on_pline(polyline, base_line):
|
|
labels.append(geom['label'])
|
|
return labels
|
|
|
|
def get_seq_feature_tokens(xsec, geom, closed_type):
|
|
polyline = geom['polyline']
|
|
closed = geom['closed']
|
|
if closed != closed_type:
|
|
return []
|
|
|
|
lines = []
|
|
for i in range(len(polyline) - 1):
|
|
line = (polyline[i], polyline[i + 1])
|
|
lines.append(line)
|
|
|
|
geom_tokens = []
|
|
for line in lines:
|
|
labels = get_match_line_labels(xsec, geom, line)
|
|
if len(labels) == 0:
|
|
continue
|
|
|
|
|
|
|
|
|
|
geom_tokens.extend(labels)
|
|
|
|
return geom_tokens
|
|
|
|
def translate_geometry(xsec, cp):
|
|
for geom in xsec['geom']:
|
|
polyline = geom['polyline']
|
|
geom['polyline'] = [(p[0] - cp[0], p[1] - cp[1]) for p in polyline]
|
|
|
|
return xsec
|
|
|
|
def is_closed(polyline):
|
|
if equal_point(polyline[0], polyline[-1]):
|
|
return True
|
|
return False
|
|
|
|
def summery_feature(features):
|
|
sum_features = []
|
|
if len(features) == 0:
|
|
return sum_features
|
|
|
|
index = 0
|
|
while index < len(features):
|
|
f = features[index]
|
|
sum_feature = f
|
|
if type(f) == list:
|
|
sum_feature = summery_feature(f)
|
|
if len(sum_feature) == 1:
|
|
sum_feature = sum_feature[0]
|
|
elif type(f) == str:
|
|
label = f
|
|
|
|
last_index = index
|
|
for i in range(index + 1, len(features)):
|
|
if type(features[i]) == str:
|
|
if features[i] == label:
|
|
last_index = i
|
|
else:
|
|
break
|
|
else:
|
|
break
|
|
if last_index != index:
|
|
sum_feature = (f'{f}({last_index - index + 1})')
|
|
index = last_index
|
|
else:
|
|
pass
|
|
|
|
sum_features.append(sum_feature)
|
|
index += 1
|
|
|
|
return sum_features
|
|
|
|
def get_intersection_count(xsec, base_geom, target_label):
|
|
pave_top = get_geometry(xsec, 'pave_surface')
|
|
if pave_top == None:
|
|
return 0
|
|
|
|
polyline = base_geom['polyline']
|
|
polygon = Polygon(polyline)
|
|
base_p1 = polygon.centroid
|
|
base_p2 = (base_p1.x, base_p1.y + 1e+10)
|
|
vertical_line = LineString([base_p1, base_p2])
|
|
|
|
count = 0
|
|
for target_geom in xsec['geom']:
|
|
if base_geom == target_geom:
|
|
continue
|
|
label = target_geom['label']
|
|
if re.search(target_label, label) == None:
|
|
continue
|
|
|
|
|
|
target_polyline = target_geom['polyline']
|
|
polyline = LineString(target_polyline)
|
|
ip = shapely.intersection(polyline, vertical_line)
|
|
if ip.is_empty:
|
|
continue
|
|
count += 1
|
|
|
|
return count
|
|
|
|
def update_xsection_feature(xsec):
|
|
gnd_geom = get_geometry(xsec, 'ground')
|
|
if gnd_geom == None:
|
|
return None
|
|
|
|
center = get_geometry(xsec, 'center')
|
|
if center == None or 'polyline' not in center:
|
|
return None
|
|
cp = get_center_point(center['polyline'])
|
|
|
|
xsec = translate_geometry(xsec, cp)
|
|
station = xsec['station']
|
|
|
|
index = 0
|
|
while index < len(xsec['geom']):
|
|
geom = xsec['geom'][index]
|
|
label = geom['label']
|
|
polyline = geom['polyline']
|
|
closed = geom['closed']
|
|
if len(polyline) <= 2 or closed == False:
|
|
index += 1
|
|
continue
|
|
|
|
pt1 = polyline[0]
|
|
pt2 = polyline[-1]
|
|
if equal_point(pt1, pt2) == False:
|
|
polyline.append(pt1)
|
|
|
|
|
|
polygon = Polygon(polyline)
|
|
if math.fabs(polygon.area) < _precision:
|
|
xsec['geom'].pop(index)
|
|
continue
|
|
|
|
if station == '1+660.00000' and label == 'cut_ditch':
|
|
label = 'cut_ditch'
|
|
|
|
|
|
if get_below_pline(gnd_geom['polyline'], polyline):
|
|
geom['earthwork_feature'].append('below')
|
|
else:
|
|
geom['earthwork_feature'].append('above')
|
|
|
|
if re.search('pave_.*', label):
|
|
pave_int_count = get_intersection_count(xsec, geom, 'pave_.*')
|
|
geom['earthwork_feature'].append(f'pave_int({pave_int_count})')
|
|
|
|
tokens = get_seq_feature_tokens(xsec, geom, True)
|
|
if len(tokens) == 1:
|
|
geom['earthwork_feature'].append(tokens[0])
|
|
else:
|
|
geom['earthwork_feature'].extend(tokens)
|
|
|
|
geom['earthwork_feature'] = summery_feature(geom['earthwork_feature'])
|
|
|
|
|
|
logger.debug(f'{station}. {label} feature: {geom["earthwork_feature"]}')
|
|
index += 1
|
|
|
|
return xsec
|
|
|
|
def update_xsections_feature(xsections):
|
|
|
|
for xsec in xsections:
|
|
for geom in xsec['geom']:
|
|
label = geom['label']
|
|
polyline = geom['polyline']
|
|
if len(polyline) < 2:
|
|
continue
|
|
closed = is_closed(polyline)
|
|
if closed == False:
|
|
closed = False if re.search('pave_layer.*', label) == None else True
|
|
geom['closed'] = closed
|
|
|
|
|
|
out_xsections = []
|
|
for xsec in xsections:
|
|
out_xsec = update_xsection_feature(xsec)
|
|
if out_xsec == None:
|
|
continue
|
|
out_xsections.append(out_xsec)
|
|
|
|
return out_xsections
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='create earthwork train dataset')
|
|
parser.add_argument('--input', type=str, default='output/', help='input folder')
|
|
parser.add_argument('--output', type=str, default='dataset/', help='output folder')
|
|
|
|
args = parser.parse_args()
|
|
try:
|
|
file_names = os.listdir(args.input)
|
|
for file_name in tqdm(file_names):
|
|
if file_name.endswith('.json') == False:
|
|
continue
|
|
print(f'processing {file_name}')
|
|
data = None
|
|
with open(os.path.join(args.input, file_name), 'r') as f:
|
|
data = json.load(f)
|
|
|
|
out_xsections = update_xsections_feature(data)
|
|
|
|
output_file = os.path.join(args.output, file_name)
|
|
with open(output_file, 'w') as f:
|
|
json.dump(out_xsections, f, indent=4)
|
|
|
|
except Exception as e:
|
|
print(f'error: {e}')
|
|
traceback.print_exc()
|
|
|
|
if __name__ == '__main__':
|
|
main() |