Spaces:
Sleeping
Sleeping
import math | |
from skimage.morphology import skeletonize | |
import copy | |
import numpy as np | |
import cv2 | |
from .models import Line, Point, Place, Transition, Arc | |
def get_hough_lines(img, config): | |
""" | |
Detects lines in the image using Hough Transform. | |
Returns the detected lines. | |
""" | |
# Default values, will be overridden by config if available | |
rho = config.get('connection_processing', {}).get('hough_rho', 1) | |
theta = config.get('connection_processing', {}).get('hough_theta_degrees', 1) * np.pi / 180 | |
threshold = config.get('connection_processing', {}).get('hough_threshold', 10) | |
min_line_length = config.get('connection_processing', {}).get('hough_min_line_length', 10) | |
max_line_gap = config.get('connection_processing', {}).get('hough_max_line_gap', 20) | |
min_line_length = max(min_line_length, 1) # Ensure it's at least 1 | |
# Skeletonize the image | |
skeleton = skeletonize(img / 255).astype(np.uint8)*255 | |
hough_lines = cv2.HoughLinesP(skeleton, rho, theta, threshold, minLineLength=min_line_length, maxLineGap=max_line_gap) | |
return hough_lines | |
class HoughBundler: | |
def __init__(self,min_distance=5,min_angle=2): | |
self.min_distance = min_distance | |
self.min_angle = min_angle | |
def get_orientation(self, line): | |
orientation = math.atan2(abs((line[3] - line[1])), abs((line[2] - line[0]))) | |
return math.degrees(orientation) | |
def check_is_line_different(self, line_1, groups, min_distance_to_merge, min_angle_to_merge): | |
for group in groups: | |
for line_2 in group: | |
if self.get_distance(line_2, line_1) < min_distance_to_merge: | |
orientation_1 = self.get_orientation(line_1) | |
orientation_2 = self.get_orientation(line_2) | |
if abs(orientation_1 - orientation_2) < min_angle_to_merge: | |
group.append(line_1) | |
return False | |
return True | |
def distance_point_to_line(self, point, line): | |
px, py = point | |
x1, y1, x2, y2 = line | |
def line_magnitude(x1, y1, x2, y2): | |
line_magnitude = math.sqrt(math.pow((x2 - x1), 2) + math.pow((y2 - y1), 2)) | |
return line_magnitude | |
lmag = line_magnitude(x1, y1, x2, y2) | |
if lmag < 0.00000001: | |
distance_point_to_line = 9999 | |
return distance_point_to_line | |
u1 = (((px - x1) * (x2 - x1)) + ((py - y1) * (y2 - y1))) | |
u = u1 / (lmag * lmag) | |
if (u < 0.00001) or (u > 1): | |
#// closest point does not fall within the line segment, take the shorter distance | |
#// to an endpoint | |
ix = line_magnitude(px, py, x1, y1) | |
iy = line_magnitude(px, py, x2, y2) | |
if ix > iy: | |
distance_point_to_line = iy | |
else: | |
distance_point_to_line = ix | |
else: | |
# Intersecting point is on the line, use the formula | |
ix = x1 + u * (x2 - x1) | |
iy = y1 + u * (y2 - y1) | |
distance_point_to_line = line_magnitude(px, py, ix, iy) | |
return distance_point_to_line | |
def get_distance(self, a_line, b_line): | |
dist1 = self.distance_point_to_line(a_line[:2], b_line) | |
dist2 = self.distance_point_to_line(a_line[2:], b_line) | |
dist3 = self.distance_point_to_line(b_line[:2], a_line) | |
dist4 = self.distance_point_to_line(b_line[2:], a_line) | |
return min(dist1, dist2, dist3, dist4) | |
def merge_lines_into_groups(self, lines): | |
groups = [] # all lines groups are here | |
# first line will create new group every time | |
groups.append([lines[0]]) | |
# if line is different from existing gropus, create a new group | |
for line_new in lines[1:]: | |
if self.check_is_line_different(line_new, groups, self.min_distance, self.min_angle): | |
groups.append([line_new]) | |
return groups | |
def merge_line_segments(self, lines): | |
orientation = self.get_orientation(lines[0]) | |
if(len(lines) == 1): | |
return np.block([[lines[0][:2], lines[0][2:]]]) | |
points = [] | |
for line in lines: | |
points.append(line[:2]) | |
points.append(line[2:]) | |
if 45 < orientation <= 90: | |
#sort by y | |
points = sorted(points, key=lambda point: point[1]) | |
else: | |
#sort by x | |
points = sorted(points, key=lambda point: point[0]) | |
return np.block([[points[0],points[-1]]]) | |
def process_lines(self, lines): | |
lines_horizontal = [] | |
lines_vertical = [] | |
for line_i in [l[0] for l in lines]: | |
orientation = self.get_orientation(line_i) | |
# if vertical | |
if 45 < orientation <= 90: | |
lines_vertical.append(line_i) | |
else: | |
lines_horizontal.append(line_i) | |
lines_vertical = sorted(lines_vertical , key=lambda line: line[1]) | |
lines_horizontal = sorted(lines_horizontal , key=lambda line: line[0]) | |
merged_lines_all = [] | |
# for each cluster in vertical and horizantal lines leave only one line | |
for i in [lines_horizontal, lines_vertical]: | |
if len(i) > 0: | |
groups = self.merge_lines_into_groups(i) | |
merged_lines = [] | |
for group in groups: | |
merged_lines.append(self.merge_line_segments(group)) | |
merged_lines_all.extend(merged_lines) | |
return np.asarray(merged_lines_all) | |
def assign_proximity_nodes( | |
original_lines: list[Line], | |
original_places: list[Place], | |
original_transitions: list[Transition], | |
config: dict, | |
) -> tuple[list[Line], list[Place], list[Transition]]: | |
""" | |
Assigns a 'proximity_node' attribute to points of lines if they are close | |
to a place or transition. Operates on deep copies of the input objects. | |
Args: | |
original_lines: A list of Line objects. | |
original_places: A list of Place objects. | |
original_transitions: A list of Transition objects. | |
proximity_threshold: A factor to expand node boundaries for proximity checks. | |
For Places, it scales the radius. | |
For Transitions, it scales the height and width. | |
Returns: | |
A tuple containing: | |
- lines_copy: Copied lines, where points may have a 'proximity_node' attribute. | |
- places_copy: Deep copies of the original places. | |
- transitions_copy: Deep copies of the original transitions. | |
The 'proximity_node' attributes will refer to objects within places_copy or transitions_copy. | |
""" | |
proximity_thres_place = config.get('connection_processing', {}).get('proximity_thres_place', 1.5) | |
proximity_thres_trans_width = config.get('connection_processing', {}).get('proximity_thres_trans_width', 3) | |
proximity_thres_trans_height = config.get('connection_processing', {}).get('proximity_thres_trans_height', 1.2) | |
# 1. Create deep copies of all input object lists | |
lines_copy = copy.deepcopy(original_lines) | |
places_copy = copy.deepcopy(original_places) | |
transitions_copy = copy.deepcopy(original_transitions) | |
all_copied_node_centers = [node.center for node in places_copy] + \ | |
[node.center for node in transitions_copy] | |
# 3. Iterate through copied lines and their points | |
for line in lines_copy: | |
for line_point in [line.point1, line.point2]: | |
for node_center_copy in all_copied_node_centers: | |
node_copy = node_center_copy.part_of | |
# print(f"Checking proximity for line point {line_point} to node {node_copy}") | |
if isinstance(node_copy, Place): | |
# print(f"Node {node_copy} is a place") | |
distance = line_point.get_distance_between_points(node_center_copy) | |
if distance < proximity_thres_place * node_copy.radius: | |
line_point.proximity_node = node_copy | |
elif isinstance(node_copy, Transition): | |
# print(f"Node {node_copy} is a transition") | |
expanded_height = node_copy.height * proximity_thres_trans_height | |
expanded_width = node_copy.width * proximity_thres_trans_width | |
expanded_bbox_contour = cv2.boxPoints(((float(node_center_copy.x), float(node_center_copy.y)), | |
(expanded_height, expanded_width), node_copy.angle)) | |
current_line_point_coords = (float(line_point.x), float(line_point.y)) | |
if cv2.pointPolygonTest(expanded_bbox_contour, current_line_point_coords, False) >= 0: | |
line_point.proximity_node = node_copy | |
else: | |
print(f"Node {node_copy} is not a recognized type") | |
# This case should ideally not be reached if inputs are as expected. | |
raise ValueError(f"Unknown node type encountered: {type(node_copy)}") | |
return lines_copy, places_copy, transitions_copy | |
def get_entry_points_from_lines(lines_list): | |
""" | |
Original function provided by user, slightly adapted to use a local list. | |
Extracts all unique points marked as 'is_entry' from a list of lines. | |
""" | |
entry_points_set = set() | |
for line in lines_list: | |
if hasattr(line.point1, "proximity_node") and line.point1.proximity_node: | |
entry_points_set.add(line.point1) | |
if hasattr(line.point2, "proximity_node") and line.point2.proximity_node: | |
entry_points_set.add(line.point2) | |
return list(entry_points_set) | |
def cosine_similarity(vec1_norm: np.ndarray, vec2_norm: np.ndarray) -> float: | |
"""Computes the cosine similarity (dot product of normalized vectors).""" | |
return np.dot(vec1_norm, vec2_norm) | |
def find_line_paths( | |
initial_lines_list: list[Line], | |
proximity_threshold: float = 30.0, | |
dot_product_weight: float = 0.6, | |
distance_to_line_weight: float = 0.2, | |
endpoint_distance_weight: float = 0.2 | |
) -> list[dict]: | |
""" | |
Connects lines from a pool into paths, starting from an entry point | |
and ending at another entry point. | |
Args: | |
initial_lines_list: A list of Line objects. | |
proximity_threshold: Maximum distance to search for next point. | |
dot_product_weight: Weight for vector alignment score. | |
distance_to_line_weight: Weight for point-to-line distance score. | |
endpoint_distance_weight: Weight for endpoint-to-endpoint distance score. | |
Returns: | |
A list of paths. Each path is a dictionary with 'lines' (list of Line) | |
and 'points' (ordered list of Point forming the path). | |
""" | |
lines_pool = set(initial_lines_list) # Use a set for efficient removal (O(1) on average) | |
all_paths_found = [] | |
# Keep track of entry points that have successfully started a path to avoid re-processing | |
# or entry points that have been used as an end of a path. | |
consumed_entry_points = set() | |
while True: | |
current_start_line = None | |
current_start_entry_point = None | |
# Find a new starting line with an available entry point | |
# Iterate over a temporary list as lines_pool can be modified | |
for line in list(lines_pool): | |
potential_start_points = [] | |
if hasattr(line.point1, "proximity_node") and line.point1.proximity_node and line.point1 not in consumed_entry_points: | |
potential_start_points.append(line.point1) | |
if hasattr(line.point2, "proximity_node") and line.point2.proximity_node and line.point2 not in consumed_entry_points: | |
potential_start_points.append(line.point2) | |
if potential_start_points: | |
current_start_line = line | |
# Prefer point1 if both are entries and available, or just take the first one. | |
current_start_entry_point = potential_start_points[0] | |
break | |
if not current_start_line: | |
break # No more available entry points or lines to start a path | |
current_path_lines = [current_start_line] | |
current_path_points = [current_start_entry_point] | |
lines_pool.remove(current_start_line) | |
consumed_entry_points.add(current_start_entry_point) # Mark this entry point as used for path initiation | |
last_line_in_path = current_start_line | |
# The current tip of the path is the other point of the start_line | |
current_tip_of_path = last_line_in_path.get_other_point(current_start_entry_point) | |
current_path_points.append(current_tip_of_path) | |
# Inner loop to extend the current path | |
while True: | |
# Check if the current_tip_of_path is a destination entry point | |
if hasattr(current_tip_of_path, "proximity_node") and current_tip_of_path.proximity_node: | |
all_paths_found.append({"lines": list(current_path_lines), "points": list(current_path_points)}) | |
consumed_entry_points.add(current_tip_of_path) # Mark end entry point | |
break # Path successfully found, break from inner loop | |
candidate_extensions = [] | |
# Vector of the last segment, oriented towards the current tip | |
vec_last_segment_norm = last_line_in_path.get_normalized_vector( | |
start_point=last_line_in_path.get_other_point(current_tip_of_path), | |
end_point=current_tip_of_path | |
) | |
for candidate_line in list(lines_pool): # Iterate over a copy of the pool for safe removal | |
for point_on_candidate in [candidate_line.point1, candidate_line.point2]: | |
# Must not connect via an intermediate entry point | |
if hasattr(point_on_candidate, "proximity_node") and point_on_candidate.proximity_node: | |
continue | |
endpoint_dist = current_tip_of_path.get_distance_between_points(point_on_candidate) | |
if endpoint_dist <= proximity_threshold: | |
# Scoring Criterion 1: Dot product of normalized vectors | |
# Vector of candidate_line, oriented away from point_on_candidate | |
vec_candidate_norm = candidate_line.get_normalized_vector( | |
start_point=point_on_candidate, | |
end_point=candidate_line.get_other_point(point_on_candidate) | |
) | |
dot_prod_score = (cosine_similarity(vec_last_segment_norm, vec_candidate_norm) + 1) / 2 # Scale to [0,1] | |
# Scoring Criterion 2: Start point of "to be merged" line is close to the infinite line | |
# formed by our last merged line. | |
dist_to_prev_line_inf = last_line_in_path.distance_point_to_infinite_line(point_on_candidate) | |
# Score: higher is better (closer to 0 distance) | |
# Avoid division by zero; add 1. Max possible distance could normalize this. | |
# For simplicity, using 1 / (1 + dist). | |
dist_line_score = 1.0 / (1.0 + dist_to_prev_line_inf) if proximity_threshold > 0 else 1.0 | |
# Bonus: endpoint_distance score (closer is better) | |
endpoint_dist_score = (proximity_threshold - endpoint_dist) / proximity_threshold \ | |
if proximity_threshold > 0 else 1.0 | |
# Combined score | |
total_score = (dot_product_weight * dot_prod_score + | |
distance_to_line_weight * dist_line_score + | |
endpoint_distance_weight * endpoint_dist_score) | |
candidate_extensions.append({ | |
"line": candidate_line, | |
"connection_point_on_candidate": point_on_candidate, | |
"score": total_score | |
}) | |
if not candidate_extensions: | |
# No suitable extension found, path terminates here (not at an entry point). | |
# This path is considered "noise" or incomplete. | |
break # Break from inner loop | |
# Select the best candidate extension | |
candidate_extensions.sort(key=lambda x: x["score"], reverse=True) | |
best_extension = candidate_extensions[0] | |
# Add best extension to the current path | |
lines_pool.remove(best_extension["line"]) # Remove from available lines | |
current_path_lines.append(best_extension["line"]) | |
last_line_in_path = best_extension["line"] | |
# The connection point on the candidate becomes part of the path | |
current_path_points.append(best_extension["connection_point_on_candidate"]) | |
# The new tip is the other end of the newly added line | |
current_tip_of_path = last_line_in_path.get_other_point(best_extension["connection_point_on_candidate"]) | |
current_path_points.append(current_tip_of_path) | |
# Continue extending this path | |
return all_paths_found | |
def assign_arrowheads(found_paths_original: list[Line], arrowhead_result: dict, config) -> tuple[list[Line], int]: | |
arrowhead_proximity_thres = config.get('connection_processing', {}).get('arrowhead_proximity_threshold', 30) | |
found_paths_copy = copy.deepcopy(found_paths_original) | |
path_endpoints = [] | |
for path in found_paths_copy: | |
if path["points"]: | |
path_endpoints.extend([path["points"][0], path["points"][-1]]) | |
else: | |
raise ValueError("Path points list is empty. Cannot assign arrowheads.") | |
rejected_arrowhead_count = 0 | |
for arrowhead in arrowhead_result["predictions"]: | |
arrowhead_center = Point(arrowhead["x"], arrowhead["y"]) | |
closest_point = None | |
closest_distance = float("inf") | |
for endpoint in path_endpoints: | |
distance = arrowhead_center.get_distance_between_points(endpoint) | |
if distance < closest_distance and distance < arrowhead_proximity_thres: | |
closest_distance = distance | |
closest_point = endpoint | |
### check if the closest point is None and throw error | |
if closest_point is None: | |
print("No closest point found for the arrowhead center.") | |
rejected_arrowhead_count += 1 | |
else: | |
closest_point.is_arrow = True | |
### remove endpoint from endpoints to avoid reusing | |
path_endpoints.remove(closest_point) | |
return found_paths_copy, rejected_arrowhead_count | |
def get_arcs(paths): | |
""" | |
Links the nodes of the paths based on the proximity_node attribute of the points. | |
This function assumes that the paths are already processed and contain points with proximity_node. | |
""" | |
arcs = [] | |
for path in paths: | |
if not path["points"][0].proximity_node or not path["points"][-1].proximity_node: | |
raise ValueError("Path must start and end with a proximity node.") | |
if len(path["points"]) < 2: | |
raise ValueError("Path must contain at least two points.") | |
if len(path["lines"]) < 1: | |
raise ValueError("Path must contain at least one line.") | |
# Assuming a path of N points connected sequentially has N-1 lines. | |
if len(path["points"]) != len(path["lines"]) * 2: | |
raise ValueError("Path points and lines are inconsistent.") | |
start_point = path["points"][0] | |
end_point = path["points"][-1] | |
# Add arc from start to end unless start is an arrow and end is not | |
if not (start_point.is_arrow and not end_point.is_arrow): | |
arcs.append(Arc( | |
source=start_point.proximity_node, | |
target=end_point.proximity_node, | |
start_point=start_point, | |
end_point=end_point, | |
points=path["points"], | |
lines=path["lines"] | |
)) | |
# Add arc from end to start if start is an arrow | |
if start_point.is_arrow: | |
arcs.append(Arc( | |
source=end_point.proximity_node, | |
target=start_point.proximity_node, | |
start_point=end_point, | |
end_point=start_point, | |
points=path["points"], | |
lines=path["lines"] | |
)) | |
return arcs | |