Spaces:
Sleeping
Sleeping
File size: 5,402 Bytes
152df72 928873d 152df72 ff2cc46 c842ab7 152df72 c842ab7 ff2cc46 152df72 ff2cc46 152df72 ff2cc46 152df72 928873d 152df72 c842ab7 928873d 152df72 928873d 152df72 ff2cc46 152df72 c842ab7 152df72 c842ab7 928873d 152df72 928873d 152df72 c842ab7 152df72 c842ab7 152df72 928873d c842ab7 928873d c842ab7 152df72 c842ab7 152df72 ff2cc46 152df72 ff2cc46 928873d 152df72 928873d c842ab7 928873d c842ab7 928873d 152df72 ff2cc46 c842ab7 152df72 ff2cc46 c842ab7 152df72 928873d 152df72 c842ab7 928873d c842ab7 928873d c842ab7 928873d c842ab7 152df72 928873d 152df72 928873d 152df72 928873d 152df72 ff2cc46 152df72 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
from shapely.geometry import box as shapely_box, Point
from collections import defaultdict, deque
import math
MAX_FALLBACK_DIST = 150 # pixels
def map_arrows(nodes, arrows):
"""
Map arrows to node boxes using geometric containment with fallback to nearest box.
Returns directional edges (source_id, target_id, label).
Args:
nodes (list): List of node dicts with 'bbox' field.
arrows (list): List of arrow dicts with 'tail' and 'head' coordinates.
Returns:
list: List of (source, target, label) tuples.
"""
for node in nodes:
node["shape"] = shapely_box(*node["bbox"])
node["center"] = (
(node["bbox"][0] + node["bbox"][2]) // 2,
(node["bbox"][1] + node["bbox"][3]) // 2
)
edges = []
def find_nearest_node(pt):
min_dist = float("inf")
nearest_id = None
for n in nodes:
cx, cy = n["center"]
dist = math.dist(pt, (cx, cy))
if dist < min_dist:
min_dist = dist
nearest_id = n["id"]
return nearest_id, min_dist
for arrow in arrows:
if not isinstance(arrow, dict) or not isinstance(arrow.get("tail"), (tuple, list)) or not isinstance(arrow.get("head"), (tuple, list)):
print(f"⚠️ Skipping malformed arrow: {arrow}")
continue
tail_pt = Point(arrow["tail"])
head_pt = Point(arrow["head"])
label = arrow.get("label", "").strip()
source = next((n["id"] for n in nodes if n["shape"].buffer(10).contains(tail_pt)), None)
target = next((n["id"] for n in nodes if n["shape"].buffer(10).contains(head_pt)), None)
# Fallback to nearest node if not found
if not source:
source, dist = find_nearest_node(arrow["tail"])
if dist > MAX_FALLBACK_DIST:
source = None
if not target:
target, dist = find_nearest_node(arrow["head"])
if dist > MAX_FALLBACK_DIST:
target = None
if source and target and source != target:
print(f"✅ Mapped arrow from {source} → {target} [{label}]")
edges.append((source, target, label))
else:
print(f"⚠️ Could not map arrow endpoints to nodes: tail={arrow.get('tail')} head={arrow.get('head')}")
return edges
def detect_node_type(text, default_type="process"):
"""
Heuristically infer the node type from its text.
Args:
text (str): Node label.
default_type (str): Fallback type.
Returns:
str: Inferred node type.
"""
text_lower = text.lower()
if "start" in text_lower:
return "start"
if "end" in text_lower or "full" in text_lower:
return "end"
if "?" in text or "yes" in text_lower or "no" in text_lower:
return "decision"
return default_type
def build_flowchart_json(nodes, arrows):
"""
Construct a structured flowchart JSON using basic graph traversal.
Args:
nodes (list): Detected node dicts.
arrows (list): Detected arrow dicts.
Returns:
dict: JSON with 'start' and 'steps'.
"""
edges = map_arrows(nodes, arrows)
# Build adjacency and reverse mappings
graph = defaultdict(list)
reverse_links = defaultdict(list)
edge_labels = {}
for src, tgt, label in edges:
graph[src].append(tgt)
reverse_links[tgt].append(src)
edge_labels[(src, tgt)] = label.lower()
all_node_ids = {n["id"] for n in nodes}
start_candidates = [nid for nid in all_node_ids if nid not in reverse_links]
flowchart = {
"start": start_candidates[0] if start_candidates else None,
"steps": []
}
visited = set()
queue = deque(start_candidates)
id_to_node = {n["id"]: n for n in nodes}
while queue:
curr = queue.popleft()
if curr in visited:
continue
visited.add(curr)
node = id_to_node.get(curr, {})
text = node.get("text", "").strip()
node_type = node.get("type") or detect_node_type(text)
step = {
"id": curr,
"text": text,
"type": node_type
}
parents = list(set(reverse_links[curr]))
if len(parents) == 1:
step["parent"] = parents[0]
elif len(parents) > 1:
step["parents"] = parents
next_nodes = list(set(graph[curr]))
if node_type == "decision" and next_nodes:
step["branches"] = {}
for tgt in next_nodes:
label = edge_labels.get((curr, tgt), "")
if "yes" in label:
step["branches"]["yes"] = tgt
elif "no" in label:
step["branches"]["no"] = tgt
else:
step["branches"].setdefault("unknown", []).append(tgt)
if tgt not in visited:
queue.append(tgt)
elif len(next_nodes) == 1:
step["next"] = next_nodes[0]
if next_nodes[0] not in visited:
queue.append(next_nodes[0])
elif len(next_nodes) > 1:
step["next"] = next_nodes
for n in next_nodes:
if n not in visited:
queue.append(n)
flowchart["steps"].append(step)
return flowchart |