|
|
|
|
|
import io |
|
from collections import defaultdict |
|
from typing import Tuple, List |
|
|
|
import cv2 |
|
import numpy as np |
|
from PIL import Image as PImage |
|
from hoho.color_mappings import gestalt_color_mapping |
|
from hoho.read_write_colmap import read_cameras_binary, read_images_binary, read_points3D_binary |
|
from scipy.spatial.distance import cdist |
|
|
|
apex_color = gestalt_color_mapping["apex"] |
|
eave_end_point = gestalt_color_mapping["eave_end_point"] |
|
flashing_end_point = gestalt_color_mapping["flashing_end_point"] |
|
|
|
apex_color, eave_end_point, flashing_end_point = [np.array(i) for i in [apex_color, eave_end_point, flashing_end_point]] |
|
unclassified = np.array([(215, 62, 138)]) |
|
line_classes = ['eave', 'ridge', 'rake', 'valley'] |
|
|
|
|
|
def empty_solution(): |
|
'''Return a minimal valid solution, i.e. 2 vertices and 1 edge.''' |
|
return np.zeros((2, 3)), [(0, 1)] |
|
|
|
|
|
def undesired_objects(image): |
|
image = image.astype('uint8') |
|
nb_components, output, stats, centroids = cv2.connectedComponentsWithStats(image, connectivity=8) |
|
sizes = stats[:, -1] |
|
max_label = 1 |
|
max_size = sizes[1] |
|
for i in range(2, nb_components): |
|
if sizes[i] > max_size: |
|
max_label = i |
|
max_size = sizes[i] |
|
|
|
img2 = np.zeros(output.shape) |
|
img2[output == max_label] = 1 |
|
return img2 |
|
|
|
|
|
def clean_image(image_gestalt) -> np.ndarray: |
|
|
|
image_gestalt = np.array(image_gestalt) |
|
unclassified_mask = cv2.inRange(image_gestalt, unclassified + 0.0, unclassified + 0.8) |
|
unclassified_mask = cv2.bitwise_not(unclassified_mask) |
|
mask = undesired_objects(unclassified_mask).astype(np.uint8) |
|
mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, np.ones((11, 11), np.uint8), iterations=11) |
|
|
|
image_gestalt[:, :, 0] *= mask |
|
image_gestalt[:, :, 1] *= mask |
|
image_gestalt[:, :, 2] *= mask |
|
return image_gestalt |
|
|
|
|
|
def get_vertices(image_gestalt, *, color_range=4., dialations=3, erosions=1, kernel_size=13): |
|
apex_mask = cv2.inRange(image_gestalt, apex_color - color_range, apex_color + color_range) |
|
eave_end_point_mask = cv2.inRange(image_gestalt, eave_end_point - color_range, eave_end_point + color_range) |
|
flashing_end_point_mask = cv2.inRange(image_gestalt, flashing_end_point - color_range, |
|
flashing_end_point + color_range) |
|
eave_end_point_mask = cv2.bitwise_or(eave_end_point_mask, flashing_end_point_mask) |
|
|
|
kernel = np.ones((kernel_size, kernel_size), np.uint8) |
|
|
|
apex_mask = cv2.morphologyEx(apex_mask, cv2.MORPH_DILATE, kernel, iterations=dialations) |
|
apex_mask = cv2.morphologyEx(apex_mask, cv2.MORPH_ERODE, kernel, iterations=erosions) |
|
|
|
eave_end_point_mask = cv2.morphologyEx(eave_end_point_mask, cv2.MORPH_DILATE, kernel, iterations=dialations) |
|
eave_end_point_mask = cv2.morphologyEx(eave_end_point_mask, cv2.MORPH_ERODE, kernel, iterations=erosions) |
|
|
|
*_, apex_centroids = cv2.connectedComponentsWithStats(apex_mask, connectivity=8, stats=cv2.CV_32S) |
|
*_, other_centroids = cv2.connectedComponentsWithStats(eave_end_point_mask, connectivity=8, stats=cv2.CV_32S) |
|
|
|
return apex_centroids[1:], other_centroids[1:], apex_mask, eave_end_point_mask |
|
|
|
|
|
def convert_entry_to_human_readable(entry): |
|
out = {} |
|
already_good = {'__key__', 'wf_vertices', 'wf_edges', 'edge_semantics', 'mesh_vertices', 'mesh_faces', |
|
'face_semantics', 'K', 'R', 't'} |
|
for k, v in entry.items(): |
|
if k in already_good: |
|
out[k] = v |
|
continue |
|
match k: |
|
case 'points3d': |
|
out[k] = read_points3D_binary(fid=io.BytesIO(v)) |
|
case 'cameras': |
|
out[k] = read_cameras_binary(fid=io.BytesIO(v)) |
|
case 'images': |
|
out[k] = read_images_binary(fid=io.BytesIO(v)) |
|
case 'ade20k' | 'gestalt': |
|
out[k] = [PImage.open(io.BytesIO(x)).convert('RGB') for x in v] |
|
case 'depthcm': |
|
out[k] = [PImage.open(io.BytesIO(x)) for x in entry['depthcm']] |
|
return out |
|
|
|
|
|
def get_vertices_and_edges_from_segmentation(gest_seg_np, edge_th=50.0): |
|
'''Get the vertices and edges from the gestalt segmentation mask of the house''' |
|
|
|
color_range = 4. |
|
connections = [] |
|
edge_th = edge_th ** 2 |
|
|
|
apex_centroids, eave_end_point_centroids, apex_mask, eave_end_point_mask = get_vertices(gest_seg_np) |
|
|
|
apex_pts = np.concatenate([apex_centroids, eave_end_point_centroids]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
scale = 1 |
|
vertex_size = np.zeros(apex_pts.shape[0]) |
|
for i, coords in enumerate(apex_pts): |
|
|
|
radius = 25 |
|
vertex_size[i] = (scale*radius) ** 2 |
|
|
|
|
|
for edge_class in ['eave', 'ridge', 'rake', 'valley', 'flashing', 'step_flashing']: |
|
if len(apex_pts) < 2: |
|
break |
|
edge_color = np.array(gestalt_color_mapping[edge_class]) |
|
|
|
mask = cv2.inRange(gest_seg_np, |
|
edge_color-color_range, |
|
edge_color+color_range) |
|
|
|
|
|
mask = cv2.morphologyEx(mask, |
|
cv2.MORPH_DILATE, np.ones((3, 3)), iterations=1) |
|
|
|
|
|
|
|
|
|
|
|
if np.any(mask): |
|
|
|
rho = 1 |
|
theta = np.pi / 180 |
|
threshold = 20 |
|
min_line_length = 60 |
|
max_line_gap = 40 |
|
|
|
|
|
|
|
cv2.GaussianBlur(mask, (11,11), 0, mask) |
|
lines = cv2.HoughLinesP(mask, rho, theta, threshold, np.array([]), |
|
min_line_length, max_line_gap) |
|
|
|
edges = [] |
|
|
|
for line in lines if lines is not None else []: |
|
for x1,y1,x2,y2 in line: |
|
extend = 40 |
|
if x1 < x2: |
|
x1, y1, x2, y2 = x2, y2, x1, y1 |
|
|
|
direction = (np.array([x2 - x1, y2 - y1])) |
|
direction = extend * direction / np.linalg.norm(direction) |
|
|
|
|
|
x1,y1 = (-direction + (x1, y1)).astype(np.int32) |
|
x2,y2 = (+ direction + (x2, y2)).astype(np.int32) |
|
|
|
|
|
edges.append((x1, y1, x2, y2)) |
|
|
|
edges = np.array(edges) |
|
if len(edges) < 1: |
|
continue |
|
|
|
begin_distances = cdist(apex_pts, edges[:, :2], metric="sqeuclidean") |
|
end_distances = cdist(apex_pts, edges[:, 2:], metric="sqeuclidean") |
|
|
|
begin_closest_points = np.argmin(begin_distances, axis=0) |
|
end_closest_points = np.argmin(end_distances, axis=0) |
|
|
|
begin_closest_point_distances = begin_distances[begin_closest_points, np.arange(len(begin_closest_points))] |
|
end_closest_point_distances = end_distances[end_closest_points, np.arange(len(end_closest_points))] |
|
|
|
|
|
|
|
begin_in_range_mask = begin_closest_point_distances < vertex_size[begin_closest_points] |
|
end_in_range_mask = end_closest_point_distances < vertex_size[end_closest_points] |
|
|
|
|
|
in_range_connected_mask = np.logical_and(begin_in_range_mask, end_in_range_mask) |
|
|
|
edge_idxs = np.where(in_range_connected_mask)[0] |
|
|
|
edges = np.array([begin_closest_points[edge_idxs], end_closest_points[edge_idxs]]).T |
|
if len(edges) < 1: |
|
continue |
|
edges = np.sort(edges, axis=1) |
|
unique_edges = np.unique(edges, axis=0) |
|
|
|
unique_edges = unique_edges[unique_edges[:, 0] != unique_edges[:, 1]] |
|
|
|
if len(unique_edges) < 1: |
|
continue |
|
connections.extend(unique_edges) |
|
|
|
|
|
vertices = [{"xy": v, "type": "apex"} for v in apex_centroids] |
|
vertices += [{"xy": v, "type": "eave_end_point"} for v in eave_end_point_centroids] |
|
return vertices, connections |
|
|
|
|
|
def get_uv_depth(vertices, depth): |
|
'''Get the depth of the vertices from the depth image''' |
|
uv = [] |
|
for v in vertices: |
|
uv.append(v['xy']) |
|
uv = np.array(uv) |
|
uv_int = uv.astype(np.int32) |
|
H, W = depth.shape[:2] |
|
uv_int[:, 0] = np.clip(uv_int[:, 0], 0, W - 1) |
|
uv_int[:, 1] = np.clip(uv_int[:, 1], 0, H - 1) |
|
vertex_depth = depth[(uv_int[:, 1], uv_int[:, 0])] |
|
return uv, vertex_depth |
|
|
|
|
|
def merge_vertices_3d(vert_edge_per_image, th=0.1): |
|
'''Merge vertices that are close to each other in 3D space and are of same types''' |
|
all_3d_vertices = [] |
|
connections_3d = [] |
|
all_indexes = [] |
|
cur_start = 0 |
|
types = [] |
|
for cimg_idx, (vertices, connections, vertices_3d) in vert_edge_per_image.items(): |
|
types += [int(v['type'] == 'apex') for v in vertices] |
|
all_3d_vertices.append(vertices_3d) |
|
connections_3d += [(x + cur_start, y + cur_start) for (x, y) in connections] |
|
cur_start += len(vertices_3d) |
|
all_3d_vertices = np.concatenate(all_3d_vertices, axis=0) |
|
|
|
distmat = cdist(all_3d_vertices, all_3d_vertices) |
|
types = np.array(types).reshape(-1, 1) |
|
same_types = cdist(types, types) |
|
mask_to_merge = (distmat <= th) & (same_types == 0) |
|
new_vertices = [] |
|
new_connections = [] |
|
to_merge = sorted(list(set([tuple(a.nonzero()[0].tolist()) for a in mask_to_merge]))) |
|
to_merge_final = defaultdict(list) |
|
for i in range(len(all_3d_vertices)): |
|
for j in to_merge: |
|
if i in j: |
|
to_merge_final[i] += j |
|
for k, v in to_merge_final.items(): |
|
to_merge_final[k] = list(set(v)) |
|
already_there = set() |
|
merged = [] |
|
for k, v in to_merge_final.items(): |
|
if k in already_there: |
|
continue |
|
merged.append(v) |
|
for vv in v: |
|
already_there.add(vv) |
|
old_idx_to_new = {} |
|
count = 0 |
|
for idxs in merged: |
|
new_vertices.append(all_3d_vertices[idxs].mean(axis=0)) |
|
for idx in idxs: |
|
old_idx_to_new[idx] = count |
|
count += 1 |
|
|
|
new_vertices = np.array(new_vertices) |
|
|
|
for conn in connections_3d: |
|
new_con = sorted((old_idx_to_new[conn[0]], old_idx_to_new[conn[1]])) |
|
if new_con[0] == new_con[1]: |
|
continue |
|
if new_con not in new_connections: |
|
new_connections.append(new_con) |
|
|
|
return new_vertices, new_connections |
|
|
|
|
|
def prune_not_connected(all_3d_vertices, connections_3d): |
|
'''Prune vertices that are not connected to any other vertex''' |
|
connected = defaultdict(list) |
|
for c in connections_3d: |
|
connected[c[0]].append(c) |
|
connected[c[1]].append(c) |
|
new_indexes = {} |
|
new_verts = [] |
|
connected_out = [] |
|
for k, v in connected.items(): |
|
vert = all_3d_vertices[k] |
|
if tuple(vert) not in new_verts: |
|
new_verts.append(tuple(vert)) |
|
new_indexes[k] = len(new_verts) - 1 |
|
for k, v in connected.items(): |
|
for vv in v: |
|
connected_out.append((new_indexes[vv[0]], new_indexes[vv[1]])) |
|
connected_out = list(set(connected_out)) |
|
|
|
return np.array(new_verts), connected_out |
|
|
|
|
|
def predict(entry, visualize=False) -> Tuple[np.ndarray, List[int]]: |
|
good_entry = convert_entry_to_human_readable(entry) |
|
vert_edge_per_image = {} |
|
for i, (gest, depth, K, R, t) in enumerate(zip(good_entry['gestalt'], |
|
good_entry['depthcm'], |
|
good_entry['K'], |
|
good_entry['R'], |
|
good_entry['t'] |
|
)): |
|
gest_seg = gest.resize(depth.size) |
|
gest_seg_np = np.array(gest_seg).astype(np.uint8) |
|
|
|
depth_np = np.array(depth) / 2.5 |
|
vertices, connections = get_vertices_and_edges_from_segmentation(gest_seg_np, edge_th=60.) |
|
if (len(vertices) < 2) or (len(connections) < 1): |
|
print(f'Not enough vertices or connections in image {i}') |
|
vert_edge_per_image[i] = np.empty((0, 2)), [], np.empty((0, 3)) |
|
continue |
|
uv, depth_vert = get_uv_depth(vertices, depth_np) |
|
|
|
xy_local = np.ones((len(uv), 3)) |
|
xy_local[:, 0] = (uv[:, 0] - K[0, 2]) / K[0, 0] |
|
xy_local[:, 1] = (uv[:, 1] - K[1, 2]) / K[1, 1] |
|
|
|
vertices_3d_local = depth_vert[..., None] * (xy_local / np.linalg.norm(xy_local, axis=1)[..., None]) |
|
world_to_cam = np.eye(4) |
|
world_to_cam[:3, :3] = R |
|
world_to_cam[:3, 3] = t.reshape(-1) |
|
cam_to_world = np.linalg.inv(world_to_cam) |
|
vertices_3d = cv2.transform(cv2.convertPointsToHomogeneous(vertices_3d_local), cam_to_world) |
|
vertices_3d = cv2.convertPointsFromHomogeneous(vertices_3d).reshape(-1, 3) |
|
vert_edge_per_image[i] = vertices, connections, vertices_3d |
|
all_3d_vertices, connections_3d = merge_vertices_3d(vert_edge_per_image, 3.0) |
|
all_3d_vertices_clean, connections_3d_clean = prune_not_connected(all_3d_vertices, connections_3d) |
|
if (len(all_3d_vertices_clean) < 2) or len(connections_3d_clean) < 1: |
|
print(f'Not enough vertices or connections in the 3D vertices') |
|
return (good_entry['__key__'], *empty_solution()) |
|
if visualize: |
|
from hoho.viz3d import plot_estimate_and_gt |
|
plot_estimate_and_gt(all_3d_vertices_clean, |
|
connections_3d_clean, |
|
good_entry['wf_vertices'], |
|
good_entry['wf_edges']) |
|
return good_entry['__key__'], all_3d_vertices_clean, connections_3d_clean |
|
|