VitalyVorobyev commited on
Commit
aaa448c
·
1 Parent(s): b94eb6b

dl_adapters from dexined and superpoint

Browse files
README.md CHANGED
@@ -21,7 +21,7 @@ FeatureLab now exposes a production-friendly layout: FastAPI serves the detector
21
  ```
22
  FastAPI (/v1/detect/*) <-- shared numpy/CV runtime --> Gradio UI (/)
23
  ```
24
- - **Classical path**: Canny, Harris, Probabilistic Hough, contour-based ellipse fitting.
25
  - **Deep path**: ONNX models (HED, SuperPoint, SOLD2, etc.) auto-loaded from `./models`.
26
  - **Responses**: base64 PNG overlays, rich feature metadata, timings, model info.
27
 
@@ -38,7 +38,12 @@ python app.py # FastAPI + Gradio on http://localhost:7860
38
  ```json
39
  {
40
  "image": "<base64 png/jpeg>",
41
- "params": { "canny_low": 50, "canny_high": 150, "...": "..." },
 
 
 
 
 
42
  "mode": "classical|dl|both",
43
  "compare": false,
44
  "dl_model": "hed.onnx"
@@ -56,6 +61,7 @@ python app.py # FastAPI + Gradio on http://localhost:7860
56
  "models": { "classical": {...}, "dl": {...} }
57
  }
58
  ```
 
59
  - Multipart uploads: `POST /v1/detect/<detector>/upload` with `file`, optional `params` (JSON string), `mode`, `compare`, `dl_model`.
60
 
61
  ## WebSocket API
 
21
  ```
22
  FastAPI (/v1/detect/*) <-- shared numpy/CV runtime --> Gradio UI (/)
23
  ```
24
+ - **Classical path**: Canny, Harris, Probabilistic Hough, Line Segment Detector (LSD), contour-based ellipse fitting.
25
  - **Deep path**: ONNX models (HED, SuperPoint, SOLD2, etc.) auto-loaded from `./models`.
26
  - **Responses**: base64 PNG overlays, rich feature metadata, timings, model info.
27
 
 
38
  ```json
39
  {
40
  "image": "<base64 png/jpeg>",
41
+ "params": {
42
+ "canny_low": 50,
43
+ "canny_high": 150,
44
+ "line_detector": "lsd",
45
+ "...": "..."
46
+ },
47
  "mode": "classical|dl|both",
48
  "compare": false,
49
  "dl_model": "hed.onnx"
 
61
  "models": { "classical": {...}, "dl": {...} }
62
  }
63
  ```
64
+ - Classical line detector toggle: set `params.line_detector` to `"lsd"` to run OpenCV's Line Segment Detector instead of Probabilistic Hough.
65
  - Multipart uploads: `POST /v1/detect/<detector>/upload` with `file`, optional `params` (JSON string), `mode`, `compare`, `dl_model`.
66
 
67
  ## WebSocket API
backend/py/app/api/v1/detect.py CHANGED
@@ -15,7 +15,7 @@ router = APIRouter(prefix="/v1/detect", tags=["detection"])
15
  DETECTOR_KEYS: Dict[str, str] = {
16
  "edges": "Edges (Canny)",
17
  "corners": "Corners (Harris)",
18
- "lines": "Lines (Hough)",
19
  "ellipses": "Ellipses (Contours + fitEllipse)",
20
  }
21
 
@@ -259,4 +259,3 @@ async def detection_stream(websocket: WebSocket):
259
  await websocket.send_json(_format_result(result, runtime_mode).dict())
260
  except WebSocketDisconnect:
261
  return
262
-
 
15
  DETECTOR_KEYS: Dict[str, str] = {
16
  "edges": "Edges (Canny)",
17
  "corners": "Corners (Harris)",
18
+ "lines": "Lines (Hough/LSD)",
19
  "ellipses": "Ellipses (Contours + fitEllipse)",
20
  }
21
 
 
259
  await websocket.send_json(_format_result(result, runtime_mode).dict())
260
  except WebSocketDisconnect:
261
  return
 
backend/py/app/gradio_demo/ui.py CHANGED
@@ -13,10 +13,17 @@ DESC = (
13
  )
14
 
15
 
 
 
 
 
 
 
16
  def _gradio_runtime(
17
  image: Optional[np.ndarray],
18
  detector: str,
19
  compare: bool,
 
20
  dl_choice: str,
21
  canny_low: int,
22
  canny_high: int,
@@ -45,6 +52,8 @@ def _gradio_runtime(
45
  "max_ellipses": int(max_ellipses),
46
  }
47
 
 
 
48
  mode = "both" if compare else "classical"
49
  dl_model = dl_choice.strip() or None
50
  result = run_detection(image, detector, params=params, mode=mode, dl_choice=dl_model)
@@ -62,6 +71,11 @@ def _gradio_runtime(
62
 
63
  def build_demo() -> gr.Blocks:
64
  defaults = dict(DEFAULT_PARAMS)
 
 
 
 
 
65
 
66
  with gr.Blocks(title=TITLE) as demo:
67
  gr.Markdown(f"# {TITLE}\n{DESC}")
@@ -77,7 +91,7 @@ def build_demo() -> gr.Blocks:
77
  [
78
  "Edges (Canny)",
79
  "Corners (Harris)",
80
- "Lines (Hough)",
81
  "Ellipses (Contours + fitEllipse)",
82
  ],
83
  value="Edges (Canny)",
@@ -88,6 +102,12 @@ def build_demo() -> gr.Blocks:
88
  dl_choice = gr.Textbox(value="", label="DL model filename (optional, in ./models)")
89
 
90
  with gr.Accordion("Parameters", open=False):
 
 
 
 
 
 
91
  canny_low = gr.Slider(0, 255, value=defaults["canny_low"], step=1, label="Canny low threshold")
92
  canny_high = gr.Slider(0, 255, value=defaults["canny_high"], step=1, label="Canny high threshold")
93
  harris_k = gr.Slider(0.02, 0.15, value=defaults["harris_k"], step=0.005, label="Harris k")
@@ -128,6 +148,7 @@ def build_demo() -> gr.Blocks:
128
  in_img,
129
  detector,
130
  compare,
 
131
  dl_choice,
132
  canny_low,
133
  canny_high,
@@ -150,6 +171,7 @@ def build_demo() -> gr.Blocks:
150
  in_img,
151
  detector,
152
  compare,
 
153
  dl_choice,
154
  canny_low,
155
  canny_high,
 
13
  )
14
 
15
 
16
+ LINE_METHOD_LABELS = {
17
+ "Hough (Probabilistic)": "hough",
18
+ "LSD (Line Segment Detector)": "lsd",
19
+ }
20
+
21
+
22
  def _gradio_runtime(
23
  image: Optional[np.ndarray],
24
  detector: str,
25
  compare: bool,
26
+ line_method: str,
27
  dl_choice: str,
28
  canny_low: int,
29
  canny_high: int,
 
52
  "max_ellipses": int(max_ellipses),
53
  }
54
 
55
+ params["line_detector"] = LINE_METHOD_LABELS.get(line_method, "hough")
56
+
57
  mode = "both" if compare else "classical"
58
  dl_model = dl_choice.strip() or None
59
  result = run_detection(image, detector, params=params, mode=mode, dl_choice=dl_model)
 
71
 
72
  def build_demo() -> gr.Blocks:
73
  defaults = dict(DEFAULT_PARAMS)
74
+ line_default_key = defaults.get("line_detector", "hough")
75
+ line_default_label = next(
76
+ (label for label, key in LINE_METHOD_LABELS.items() if key == line_default_key),
77
+ "Hough (Probabilistic)",
78
+ )
79
 
80
  with gr.Blocks(title=TITLE) as demo:
81
  gr.Markdown(f"# {TITLE}\n{DESC}")
 
91
  [
92
  "Edges (Canny)",
93
  "Corners (Harris)",
94
+ "Lines (Hough/LSD)",
95
  "Ellipses (Contours + fitEllipse)",
96
  ],
97
  value="Edges (Canny)",
 
102
  dl_choice = gr.Textbox(value="", label="DL model filename (optional, in ./models)")
103
 
104
  with gr.Accordion("Parameters", open=False):
105
+ line_method = gr.Radio(
106
+ choices=list(LINE_METHOD_LABELS.keys()),
107
+ value=line_default_label,
108
+ label="Line detector (classical)",
109
+ info="Choose LSD to enable OpenCV's Line Segment Detector instead of Probabilistic Hough.",
110
+ )
111
  canny_low = gr.Slider(0, 255, value=defaults["canny_low"], step=1, label="Canny low threshold")
112
  canny_high = gr.Slider(0, 255, value=defaults["canny_high"], step=1, label="Canny high threshold")
113
  harris_k = gr.Slider(0.02, 0.15, value=defaults["harris_k"], step=0.005, label="Harris k")
 
148
  in_img,
149
  detector,
150
  compare,
151
+ line_method,
152
  dl_choice,
153
  canny_low,
154
  canny_high,
 
171
  in_img,
172
  detector,
173
  compare,
174
+ line_method,
175
  dl_choice,
176
  canny_low,
177
  canny_high,
backend/py/app/inference/classical.py CHANGED
@@ -19,6 +19,7 @@ def detect_classical(
19
  hough_max_gap: int,
20
  ellipse_min_area: int,
21
  max_ellipses: int,
 
22
  ) -> Tuple[np.ndarray, Dict[str, Any]]:
23
  bgr = to_bgr(image)
24
  gray = cv2.cvtColor(bgr, cv2.COLOR_BGR2GRAY)
@@ -41,17 +42,42 @@ def detect_classical(
41
  cv2.circle(overlay, (int(x), int(y)), 2, (0, 255, 255), -1)
42
  meta["num_corners"] = int(len(corners))
43
 
44
- elif detector == "Lines (Hough)":
45
- edges = cv2.Canny(gray, canny_low, canny_high, L2gradient=True)
46
- lines = cv2.HoughLinesP(edges, rho=1, theta=np.pi / 180, threshold=hough_thresh,
47
- minLineLength=hough_min_len, maxLineGap=hough_max_gap)
48
- n = 0
49
- if lines is not None:
50
- for l in lines:
51
- x1, y1, x2, y2 = l[0]
52
- cv2.line(overlay, (x1, y1), (x2, y2), (255, 128, 0), 2)
53
- n = len(lines)
54
- meta["num_lines"] = int(n)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
55
 
56
  elif detector == "Ellipses (Contours + fitEllipse)":
57
  edges = cv2.Canny(gray, canny_low, canny_high, L2gradient=True)
@@ -84,4 +110,3 @@ def detect_classical(
84
  meta["error"] = f"Unknown detector: {detector}"
85
 
86
  return to_rgb(overlay), meta
87
-
 
19
  hough_max_gap: int,
20
  ellipse_min_area: int,
21
  max_ellipses: int,
22
+ line_detector: str = "hough",
23
  ) -> Tuple[np.ndarray, Dict[str, Any]]:
24
  bgr = to_bgr(image)
25
  gray = cv2.cvtColor(bgr, cv2.COLOR_BGR2GRAY)
 
42
  cv2.circle(overlay, (int(x), int(y)), 2, (0, 255, 255), -1)
43
  meta["num_corners"] = int(len(corners))
44
 
45
+ elif detector == "Lines (Hough/LSD)":
46
+ method = (line_detector or "hough").lower()
47
+ if method not in {"hough", "lsd"}:
48
+ method = "hough"
49
+ meta["line_detector"] = method
50
+
51
+ if method == "lsd":
52
+ if not hasattr(cv2, "createLineSegmentDetector"):
53
+ meta["error"] = "OpenCV build lacks Line Segment Detector (LSD) support."
54
+ return to_rgb(overlay), meta
55
+ lsd = cv2.createLineSegmentDetector(refine=cv2.LSD_REFINE_ADV)
56
+ lines = lsd.detect(gray)[0]
57
+ n = 0
58
+ if lines is not None:
59
+ for seg in lines:
60
+ x1, y1, x2, y2 = map(int, np.round(seg[0]))
61
+ cv2.line(overlay, (x1, y1), (x2, y2), (0, 255, 255), 2)
62
+ n = len(lines)
63
+ meta["num_lines"] = int(n)
64
+ else:
65
+ edges = cv2.Canny(gray, canny_low, canny_high, L2gradient=True)
66
+ lines = cv2.HoughLinesP(
67
+ edges,
68
+ rho=1,
69
+ theta=np.pi / 180,
70
+ threshold=hough_thresh,
71
+ minLineLength=hough_min_len,
72
+ maxLineGap=hough_max_gap,
73
+ )
74
+ n = 0
75
+ if lines is not None:
76
+ for l in lines:
77
+ x1, y1, x2, y2 = l[0]
78
+ cv2.line(overlay, (x1, y1), (x2, y2), (255, 128, 0), 2)
79
+ n = len(lines)
80
+ meta["num_lines"] = int(n)
81
 
82
  elif detector == "Ellipses (Contours + fitEllipse)":
83
  edges = cv2.Canny(gray, canny_low, canny_high, L2gradient=True)
 
110
  meta["error"] = f"Unknown detector: {detector}"
111
 
112
  return to_rgb(overlay), meta
 
backend/py/app/inference/dl.py CHANGED
@@ -1,10 +1,12 @@
1
  import os
 
2
  from typing import Any, Dict, Optional, Tuple
3
 
4
  import cv2
5
  import numpy as np
6
 
7
  from .common import to_bgr, to_rgb
 
8
 
9
  try:
10
  import onnxruntime as ort # type: ignore
@@ -17,7 +19,7 @@ MODEL_DIR = os.path.join(os.getcwd(), "models")
17
  DL_MODELS = {
18
  "Edges (Canny)": ["hed.onnx", "dexined.onnx"],
19
  "Corners (Harris)": ["superpoint.onnx"],
20
- "Lines (Hough)": ["sold2.onnx", "hawp.onnx"],
21
  "Ellipses (Contours + fitEllipse)": ["ellipse_head.onnx"],
22
  }
23
 
@@ -36,7 +38,7 @@ def _find_model(detector: str, choice_name: Optional[str]) -> Optional[str]:
36
  def _load_session(path: str):
37
  if ort is None:
38
  raise RuntimeError("onnxruntime not installed. `pip install onnxruntime`.")
39
- providers = ["CoreMLExecutionProvider", "CPUExecutionProvider"] if "darwin" in os.sys.platform else ["CPUExecutionProvider"]
40
  try:
41
  return ort.InferenceSession(path, providers=providers)
42
  except Exception as e:
@@ -50,7 +52,6 @@ def detect_dl(
50
  ) -> Tuple[np.ndarray, Dict[str, Any]]:
51
  bgr = to_bgr(image)
52
  rgb = to_rgb(bgr)
53
- h, w = rgb.shape[:2]
54
  meta: Dict[str, Any] = {"path": "dl"}
55
 
56
  model_path = _find_model(detector, model_choice)
@@ -69,103 +70,25 @@ def detect_dl(
69
  meta["error"] = str(e)
70
  return rgb, meta
71
 
72
- input_name = sess.get_inputs()[0].name
73
- in_shape = sess.get_inputs()[0].shape # e.g., [1,3,H,W] or dynamic
74
- target_h, target_w = None, None
75
- if len(in_shape) == 4:
76
- target_h = in_shape[2] if isinstance(in_shape[2], int) and in_shape[2] > 0 else 512
77
- target_w = in_shape[3] if isinstance(in_shape[3], int) and in_shape[3] > 0 else 512
78
- else:
79
- target_h, target_w = 512, 512
80
-
81
- img_resized = cv2.resize(rgb, (target_w, target_h), interpolation=cv2.INTER_AREA)
82
- x = img_resized.astype(np.float32) / 255.0
83
- if x.ndim == 2:
84
- x = np.expand_dims(x, axis=-1)
85
- if x.shape[2] == 1:
86
- x = np.repeat(x, 3, axis=2)
87
- x = np.transpose(x, (2, 0, 1))[None, ...] # NCHW
88
 
89
  try:
90
- outputs = sess.run(None, {input_name: x})
91
  except Exception as e:
92
  meta["error"] = f"ONNX inference failed: {e}"
93
  return rgb, meta
94
 
95
- overlay = rgb.copy()
96
- if detector == "Edges (Canny)":
97
- pred = outputs[0]
98
- if pred.ndim == 4:
99
- prob = pred[0, 0]
100
- prob = (prob - prob.min()) / (prob.max() - prob.min() + 1e-8)
101
- edges = (prob > 0.5).astype(np.uint8) * 255
102
- edges = cv2.resize(edges, (w, h), interpolation=cv2.INTER_NEAREST)
103
- bgr2 = cv2.cvtColor(rgb, cv2.COLOR_RGB2BGR)
104
- bgr2[edges > 0] = (0, 255, 0)
105
- overlay = cv2.cvtColor(bgr2, cv2.COLOR_BGR2RGB)
106
- meta["edge_prob_mean"] = float(prob.mean())
107
- else:
108
- meta["warning"] = "Unexpected model output shape for edges."
109
-
110
- elif detector == "Corners (Harris)":
111
- pred = outputs[0]
112
- if pred.ndim == 4:
113
- heat = pred[0, 0]
114
- heat = (heat - heat.min()) / (heat.max() - heat.min() + 1e-8)
115
- heat = cv2.resize(heat, (w, h), interpolation=cv2.INTER_CUBIC)
116
- ys, xs = np.where(heat > 0.5)
117
- overlay = rgb.copy()
118
- for (y, x_) in zip(ys.tolist(), xs.tolist()):
119
- cv2.circle(overlay, (int(x_), int(y)), 2, (0, 255, 255), -1)
120
- meta["num_corners"] = int(len(xs))
121
- else:
122
- meta["warning"] = "Unexpected model output shape for corners."
123
-
124
- elif detector == "Lines (Hough)":
125
- pred = outputs[0]
126
- if pred.ndim == 4:
127
- heat = pred[0, 0]
128
- heat = (heat - heat.min()) / (heat.max() - heat.min() + 1e-8)
129
- mask = (heat > 0.5).astype(np.uint8) * 255
130
- mask = cv2.resize(mask, (w, h), interpolation=cv2.INTER_NEAREST)
131
- lines = cv2.HoughLinesP(mask, 1, np.pi/180, 50, minLineLength=30, maxLineGap=5)
132
- overlay = rgb.copy()
133
- n = 0
134
- if lines is not None:
135
- for l in lines:
136
- x1, y1, x2, y2 = l[0]
137
- cv2.line(overlay, (x1, y1), (x2, y2), (255, 128, 0), 2)
138
- n = len(lines)
139
- meta["num_lines"] = int(n)
140
- else:
141
- meta["warning"] = "Unexpected model output for lines."
142
-
143
- elif detector == "Ellipses (Contours + fitEllipse)":
144
- pred = outputs[0]
145
- if pred.ndim == 4:
146
- heat = pred[0, 0]
147
- heat = (heat - heat.min()) / (heat.max() - heat.min() + 1e-8)
148
- mask = (heat > 0.5).astype(np.uint8) * 255
149
- mask = cv2.resize(mask, (w, h), interpolation=cv2.INTER_NEAREST)
150
- contours, _ = cv2.findContours(mask, cv2.RETR_LIST, cv2.CHAIN_APPROX_NONE)
151
- count = 0
152
- for cnt in contours:
153
- if len(cnt) < 5:
154
- continue
155
- try:
156
- (cx, cy), (MA, ma), angle = cv2.fitEllipse(cnt)
157
- area = float(np.pi * (MA / 2) * (ma / 2))
158
- if area >= 300:
159
- cv2.ellipse(overlay, ((int(cx), int(cy)), (int(MA), int(ma)), float(angle)), (0, 200, 255), 2)
160
- count += 1
161
- except cv2.error:
162
- continue
163
- meta["num_ellipses"] = int(count)
164
- else:
165
- meta["warning"] = "Unexpected model output for ellipses."
166
-
167
- else:
168
- meta["error"] = f"Unknown detector: {detector}"
169
 
170
  return overlay, meta
171
-
 
1
  import os
2
+ import sys
3
  from typing import Any, Dict, Optional, Tuple
4
 
5
  import cv2
6
  import numpy as np
7
 
8
  from .common import to_bgr, to_rgb
9
+ from .dl_adapters import get_adapter
10
 
11
  try:
12
  import onnxruntime as ort # type: ignore
 
19
  DL_MODELS = {
20
  "Edges (Canny)": ["hed.onnx", "dexined.onnx"],
21
  "Corners (Harris)": ["superpoint.onnx"],
22
+ "Lines (Hough/LSD)": ["sold2.onnx", "hawp.onnx"],
23
  "Ellipses (Contours + fitEllipse)": ["ellipse_head.onnx"],
24
  }
25
 
 
38
  def _load_session(path: str):
39
  if ort is None:
40
  raise RuntimeError("onnxruntime not installed. `pip install onnxruntime`.")
41
+ providers = ["CoreMLExecutionProvider", "CPUExecutionProvider"] if "darwin" in sys.platform else ["CPUExecutionProvider"]
42
  try:
43
  return ort.InferenceSession(path, providers=providers)
44
  except Exception as e:
 
52
  ) -> Tuple[np.ndarray, Dict[str, Any]]:
53
  bgr = to_bgr(image)
54
  rgb = to_rgb(bgr)
 
55
  meta: Dict[str, Any] = {"path": "dl"}
56
 
57
  model_path = _find_model(detector, model_choice)
 
70
  meta["error"] = str(e)
71
  return rgb, meta
72
 
73
+ # Dispatch to model-specific adapter
74
+ adapter = get_adapter(model_path, detector)
75
+ try:
76
+ feed, ctx = adapter.preprocess(rgb, sess)
77
+ except Exception as e:
78
+ meta["error"] = f"Preprocess failed: {e}"
79
+ return rgb, meta
 
 
 
 
 
 
 
 
 
80
 
81
  try:
82
+ outputs = sess.run(None, feed)
83
  except Exception as e:
84
  meta["error"] = f"ONNX inference failed: {e}"
85
  return rgb, meta
86
 
87
+ try:
88
+ overlay, post_meta = adapter.postprocess(outputs, rgb, ctx, detector)
89
+ meta.update(post_meta)
90
+ except Exception as e:
91
+ meta["error"] = f"Postprocess failed: {e}"
92
+ return rgb, meta
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
93
 
94
  return overlay, meta
 
backend/py/app/inference/dl_adapters.py ADDED
@@ -0,0 +1,227 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from __future__ import annotations
2
+
3
+ import os
4
+ from dataclasses import dataclass
5
+ from typing import Any, Dict, List, Optional, Tuple
6
+
7
+ import cv2
8
+ import numpy as np
9
+
10
+
11
+ @dataclass
12
+ class AdapterContext:
13
+ input_name: str
14
+ in_size: Tuple[int, int]
15
+ orig_size: Tuple[int, int]
16
+ resize_size: Tuple[int, int]
17
+ extra: Dict[str, Any]
18
+
19
+
20
+ class DLAdapter:
21
+ def preprocess(self, rgb: np.ndarray, sess) -> Tuple[Dict[str, np.ndarray], AdapterContext]: # pragma: no cover - runtime dependent
22
+ raise NotImplementedError
23
+
24
+ def postprocess(
25
+ self, outputs: List[np.ndarray], rgb: np.ndarray, ctx: AdapterContext, detector: str
26
+ ) -> Tuple[np.ndarray, Dict[str, Any]]: # pragma: no cover - runtime dependent
27
+ raise NotImplementedError
28
+
29
+
30
+ def _first_input(sess) -> Tuple[str, Tuple[int, int]]:
31
+ inp = sess.get_inputs()[0]
32
+ name = inp.name
33
+ shape = inp.shape
34
+ if len(shape) == 4:
35
+ h = shape[2] if isinstance(shape[2], int) and shape[2] > 0 else None
36
+ w = shape[3] if isinstance(shape[3], int) and shape[3] > 0 else None
37
+ if h is None or w is None:
38
+ return name, (None, None) # type: ignore
39
+ return name, (int(h), int(w))
40
+ return name, (None, None) # type: ignore
41
+
42
+
43
+ def _ensure_3ch(x: np.ndarray) -> np.ndarray:
44
+ if x.ndim == 2:
45
+ x = np.expand_dims(x, -1)
46
+ if x.shape[2] == 1:
47
+ x = np.repeat(x, 3, axis=2)
48
+ return x
49
+
50
+
51
+ class EdgesAdapter(DLAdapter):
52
+ """Generic single-channel edge detector (DexiNed/HED-style).
53
+
54
+ - Input: RGB float32 in [0,1], NCHW 1x3xHxW
55
+ - Output: take first (or only) output, expect N x 1 x H x W (or compatible)
56
+ """
57
+
58
+ def preprocess(self, rgb: np.ndarray, sess) -> Tuple[Dict[str, np.ndarray], AdapterContext]:
59
+ input_name, in_wh = _first_input(sess)
60
+ H, W = rgb.shape[:2]
61
+ th, tw = in_wh
62
+ if th is None or tw is None:
63
+ # Default to 512x512 if model is dynamic and does not specify
64
+ th, tw = 512, 512
65
+ resized = cv2.resize(rgb, (tw, th), interpolation=cv2.INTER_AREA)
66
+ x = _ensure_3ch(resized.astype(np.float32) / 255.0)
67
+ x = np.transpose(x, (2, 0, 1))[None, ...]
68
+ ctx = AdapterContext(input_name=input_name, in_size=(th, tw), orig_size=(H, W), resize_size=(th, tw), extra={})
69
+ return {input_name: x}, ctx
70
+
71
+ def _extract_edge_prob(self, outputs: List[np.ndarray]) -> np.ndarray:
72
+ pred = outputs[0]
73
+ if pred.ndim == 4:
74
+ # N x C x H x W
75
+ cdim = pred.shape[1]
76
+ prob = pred[0, 0] if cdim >= 1 else pred[0, 0]
77
+ elif pred.ndim == 3:
78
+ # C x H x W or N x H x W
79
+ if pred.shape[0] in (1, 3):
80
+ prob = pred[0]
81
+ else:
82
+ prob = pred[0]
83
+ elif pred.ndim == 2:
84
+ prob = pred
85
+ else:
86
+ # Fallback: flatten and fail-safe normalize
87
+ prob = pred.reshape(-1)
88
+ prob = prob - prob.min()
89
+ prob = prob / (prob.max() + 1e-8)
90
+ prob = prob.reshape(int(np.sqrt(prob.size)), -1)
91
+ # Normalize to [0,1]
92
+ pmin, pmax = float(np.min(prob)), float(np.max(prob))
93
+ if pmax > pmin:
94
+ prob = (prob - pmin) / (pmax - pmin)
95
+ else:
96
+ prob = np.zeros_like(prob)
97
+ return prob.astype(np.float32)
98
+
99
+ def postprocess(
100
+ self, outputs: List[np.ndarray], rgb: np.ndarray, ctx: AdapterContext, detector: str
101
+ ) -> Tuple[np.ndarray, Dict[str, Any]]:
102
+ H, W = ctx.orig_size
103
+ prob = self._extract_edge_prob(outputs)
104
+ mask = (prob > 0.5).astype(np.uint8) * 255
105
+ mask = cv2.resize(mask, (W, H), interpolation=cv2.INTER_NEAREST)
106
+ bgr = cv2.cvtColor(rgb, cv2.COLOR_RGB2BGR)
107
+ bgr[mask > 0] = (0, 255, 0)
108
+ overlay = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
109
+ meta: Dict[str, Any] = {
110
+ "edge_prob_mean": float(np.mean(prob)),
111
+ "resize": {"h": ctx.in_size[0], "w": ctx.in_size[1]},
112
+ }
113
+ return overlay, meta
114
+
115
+
116
+ class SuperPointAdapter(DLAdapter):
117
+ """SuperPoint-style keypoint detector.
118
+
119
+ - Input: grayscale float32 [0,1], NCHW 1x1xHxW, H and W divisible by 8.
120
+ - Outputs: semi (1x65xhxw), desc (1x256xhxw). Extract keypoints from semi.
121
+ """
122
+
123
+ def _make_hw_div8(self, H: int, W: int) -> Tuple[int, int]:
124
+ H8 = max(8, (H // 8) * 8)
125
+ W8 = max(8, (W // 8) * 8)
126
+ return H8, W8
127
+
128
+ def preprocess(self, rgb: np.ndarray, sess) -> Tuple[Dict[str, np.ndarray], AdapterContext]:
129
+ input_name, in_wh = _first_input(sess)
130
+ H, W = rgb.shape[:2]
131
+ th, tw = in_wh
132
+ if th is None or tw is None:
133
+ th, tw = self._make_hw_div8(H, W)
134
+ else:
135
+ th, tw = self._make_hw_div8(th, tw)
136
+ gray = cv2.cvtColor(rgb, cv2.COLOR_RGB2GRAY)
137
+ gray_r = cv2.resize(gray, (tw, th), interpolation=cv2.INTER_AREA).astype(np.float32) / 255.0
138
+ x = gray_r[None, None, ...]
139
+ ctx = AdapterContext(input_name=input_name, in_size=(th, tw), orig_size=(H, W), resize_size=(th, tw), extra={})
140
+ return {input_name: x}, ctx
141
+
142
+ def _pick_outputs(self, outputs: List[np.ndarray]) -> Tuple[np.ndarray, Optional[np.ndarray]]:
143
+ semi = None
144
+ desc = None
145
+ for o in outputs:
146
+ if o.ndim == 4 and o.shape[1] == 65:
147
+ semi = o
148
+ elif o.ndim == 4 and o.shape[1] == 256:
149
+ desc = o
150
+ if semi is None:
151
+ # fallback: first output
152
+ semi = outputs[0]
153
+ return semi, desc
154
+
155
+ def _softmax_channel(self, x: np.ndarray, axis: int = 1) -> np.ndarray:
156
+ x = x - np.max(x, axis=axis, keepdims=True)
157
+ e = np.exp(x)
158
+ return e / np.sum(e, axis=axis, keepdims=True)
159
+
160
+ def _semi_to_heat(self, semi: np.ndarray) -> np.ndarray:
161
+ # semi: 1 x 65 x h x w -> heat: (h*8) x (w*8)
162
+ if semi.ndim != 4:
163
+ semi = semi.reshape(1, semi.shape[0], semi.shape[1], semi.shape[2])
164
+ semi = self._softmax_channel(semi, axis=1)
165
+ semi = semi[0]
166
+ if semi.shape[0] == 65:
167
+ semi = semi[:-1, ...] # drop dustbin
168
+ Hc, Wc = semi.shape[1], semi.shape[2]
169
+ semi = semi.transpose(1, 2, 0) # h x w x 64
170
+ semi = semi.reshape(Hc, Wc, 8, 8)
171
+ semi = semi.transpose(0, 2, 1, 3)
172
+ heat = semi.reshape(Hc * 8, Wc * 8)
173
+ return heat
174
+
175
+ def _nms_points(self, heat: np.ndarray, thresh: float = 0.015, nms_size: int = 3, max_kp: int = 1000) -> Tuple[np.ndarray, np.ndarray]:
176
+ H, W = heat.shape
177
+ dil = cv2.dilate(heat, np.ones((nms_size, nms_size), np.float32))
178
+ maxima = (heat == dil) & (heat > thresh)
179
+ ys, xs = np.where(maxima)
180
+ if len(xs) > max_kp:
181
+ # keep strongest
182
+ vals = heat[ys, xs]
183
+ idx = np.argsort(vals)[-max_kp:]
184
+ ys, xs = ys[idx], xs[idx]
185
+ return ys, xs
186
+
187
+ def postprocess(
188
+ self, outputs: List[np.ndarray], rgb: np.ndarray, ctx: AdapterContext, detector: str
189
+ ) -> Tuple[np.ndarray, Dict[str, Any]]:
190
+ semi, desc = self._pick_outputs(outputs)
191
+ heat_r = self._semi_to_heat(semi)
192
+ # Resize heatmap back to original size
193
+ H0, W0 = ctx.orig_size
194
+ heat = cv2.resize(heat_r, (W0, H0), interpolation=cv2.INTER_CUBIC)
195
+ ys, xs = self._nms_points(heat)
196
+
197
+ overlay = rgb.copy()
198
+ for y, x in zip(ys.tolist(), xs.tolist()):
199
+ cv2.circle(overlay, (int(x), int(y)), 2, (255, 255, 0), -1)
200
+
201
+ meta: Dict[str, Any] = {
202
+ "num_corners": int(len(xs)),
203
+ "heat_mean": float(np.mean(heat)),
204
+ }
205
+ if desc is not None:
206
+ meta["descriptors_shape"] = list(desc.shape)
207
+ return overlay, meta
208
+
209
+
210
+ def get_adapter(model_path: str, detector: str) -> DLAdapter:
211
+ name = os.path.basename(model_path).lower()
212
+ if "superpoint" in name or (detector.startswith("Corners") and "super" in name):
213
+ return SuperPointAdapter()
214
+ if any(k in name for k in ("dexined", "hed")) or detector.startswith("Edges"):
215
+ return EdgesAdapter()
216
+ # Default fallback: treat like edges
217
+ return EdgesAdapter()
218
+
219
+
220
+ __all__ = [
221
+ "DLAdapter",
222
+ "AdapterContext",
223
+ "EdgesAdapter",
224
+ "SuperPointAdapter",
225
+ "get_adapter",
226
+ ]
227
+
backend/py/app/models/schemas.py CHANGED
@@ -14,6 +14,10 @@ class DetectionParams(BaseModel):
14
  hough_max_gap: Optional[int] = Field(None, ge=0, le=200)
15
  ellipse_min_area: Optional[int] = Field(None, ge=10, le=100000)
16
  max_ellipses: Optional[int] = Field(None, ge=1, le=100)
 
 
 
 
17
 
18
 
19
  class DetectionRequest(BaseModel):
@@ -34,4 +38,3 @@ class DetectionResponse(BaseModel):
34
  fps_estimate: Optional[float] = None
35
  model: Dict[str, Any]
36
  models: Dict[str, Dict[str, Any]]
37
-
 
14
  hough_max_gap: Optional[int] = Field(None, ge=0, le=200)
15
  ellipse_min_area: Optional[int] = Field(None, ge=10, le=100000)
16
  max_ellipses: Optional[int] = Field(None, ge=1, le=100)
17
+ line_detector: Optional[Literal["hough", "lsd"]] = Field(
18
+ None,
19
+ description="Classical line detector variant to use: 'hough' (default) or 'lsd'.",
20
+ )
21
 
22
 
23
  class DetectionRequest(BaseModel):
 
38
  fps_estimate: Optional[float] = None
39
  model: Dict[str, Any]
40
  models: Dict[str, Dict[str, Any]]
 
backend/py/app/services/runtime_adapter.py CHANGED
@@ -21,6 +21,7 @@ DEFAULT_PARAMS: Dict[str, Any] = {
21
  "hough_max_gap": 5,
22
  "ellipse_min_area": 300,
23
  "max_ellipses": 5,
 
24
  }
25
 
26
  PARAM_TYPES: Dict[str, Any] = {
@@ -34,6 +35,7 @@ PARAM_TYPES: Dict[str, Any] = {
34
  "hough_max_gap": int,
35
  "ellipse_min_area": int,
36
  "max_ellipses": int,
 
37
  }
38
 
39
  CLASSICAL_MODEL_INFO = {"name": "opencv-classical", "version": cv2.__version__}
@@ -105,6 +107,7 @@ def run_detection(
105
  merged["hough_max_gap"],
106
  merged["ellipse_min_area"],
107
  merged["max_ellipses"],
 
108
  )
109
  t_ms = (time.perf_counter() - t0) * 1000.0
110
  overlays["classical"] = classical_img
@@ -145,4 +148,3 @@ __all__ = [
145
  "merge_params",
146
  "run_detection",
147
  ]
148
-
 
21
  "hough_max_gap": 5,
22
  "ellipse_min_area": 300,
23
  "max_ellipses": 5,
24
+ "line_detector": "hough",
25
  }
26
 
27
  PARAM_TYPES: Dict[str, Any] = {
 
35
  "hough_max_gap": int,
36
  "ellipse_min_area": int,
37
  "max_ellipses": int,
38
+ "line_detector": lambda x: str(x).lower(),
39
  }
40
 
41
  CLASSICAL_MODEL_INFO = {"name": "opencv-classical", "version": cv2.__version__}
 
107
  merged["hough_max_gap"],
108
  merged["ellipse_min_area"],
109
  merged["max_ellipses"],
110
+ merged["line_detector"],
111
  )
112
  t_ms = (time.perf_counter() - t0) * 1000.0
113
  overlays["classical"] = classical_img
 
148
  "merge_params",
149
  "run_detection",
150
  ]