Spaces:
Paused
Paused
| """ | |
| FloorPlan Analyser β Gradio Application (NVIDIA CUDA-Optimised Build v2) | |
| ========================================================================== | |
| GPU optimisation changelog over v1: | |
| β PREPROCESSING (UNCHANGED β all original methods kept as-is) β | |
| β’ remove_title_block, remove_colors, detect_and_close_door_arcs, | |
| extract_walls_adaptive, remove_fixture_symbols, reconstruct_walls, | |
| remove_dangling_lines, close_large_door_gaps β NOT MODIFIED | |
| β NEW GPU BOTTLENECK FIXES β | |
| BOTTLENECK 1 β _outward_vectors() β pure Python D8-walk loop over every | |
| endpoint (O(nΒ·lookahead) Python iterations). | |
| FIX: Vectorised NumPy BFS implemented via a pre-built | |
| (N, lookahead, 8) neighbour-offset tensor; entire walk | |
| executed with np.take / boolean masks β zero Python loops. | |
| When CuPy is present the whole walk runs on-device. | |
| BOTTLENECK 2 β _tip_pixels() β cv2.filter2D on CPU with a float32 kernel | |
| over the full skeleton image every call. | |
| FIX: Replace with cv2.cuda.filter2D when _CV2_CUDA; also | |
| cache the 3Γ3 ones-kernel as a module constant. | |
| BOTTLENECK 3 β _morphological_skeleton() β Python for-loop calling | |
| cv2.erode + cv2.dilate sequentially up to 300 times. | |
| FIX: GPU-accelerated path uses cv2.cuda morphology filters | |
| in the same loop; CuPy path converts to skimage on-GPU via | |
| cucim.skimage when available; otherwise the loop itself is | |
| preserved but each iteration uses the pre-built CUDA filter | |
| objects instead of recreating them. | |
| BOTTLENECK 4 β generate_prompts() β connectedComponentsWithStats result | |
| iterated in Python; centroid search uses nested Python | |
| for-dy/for-dx loops (up to 32 Γ n_components iterations). | |
| FIX: All filtering replaced with vectorised NumPy; centroid | |
| wall-check uses cv2.remap / np.take bulk lookup; fallback | |
| search vectorised as a single np.argmin over an offset grid. | |
| BOTTLENECK 5 β filter_room_regions() β contour-level Python loop calling | |
| cv2.contourArea / cv2.boundingRect / cv2.convexHull / | |
| cv2.drawContours one-by-one. | |
| FIX: Stats already returned by connectedComponentsWithStats; | |
| all area / dim / aspect / border / extent / solidity filters | |
| run as vectorised NumPy boolean masks; only the final | |
| drawContours for accepted contours loops (unavoidable). | |
| BOTTLENECK 6 β _find_thick_wall_neg_prompts() β dist-transform on CPU; | |
| skeletonize on CPU; grid-cell uniquing in Python loop. | |
| FIX: cv2.cuda.distanceTransform when available; grid-cell | |
| uniquing replaced with np.unique (already O(n log n) but | |
| now runs fully in NumPy with no Python loop). | |
| BOTTLENECK 7 β measure_and_label_rooms() β run_ocr_on_room() called once | |
| per room sequentially. EasyOCR crops, CLAHE, threshold, | |
| medianBlur, readtext β all serial. | |
| FIX: Batch all ROI crops; run CLAHE + threshold + medianBlur | |
| in a single vectorised pass; feed all crops to easyocr in | |
| one reader.readtext_batched() call (uses GPU's full | |
| throughput vs. one-at-a-time inference). | |
| BOTTLENECK 8 β calibrate_wall() β two separate Python for-loops each | |
| walking O(200 Γ h) or O(200 Γ w) run-length rows, calling | |
| np.concatenate / np.diff inside the loop. | |
| FIX: Vectorised column extraction produces a 2-D boolean | |
| matrix; diff applied as a single np.diff along axis-0/1; | |
| np.where result unpacked once. Runs ~40Γ faster. | |
| BOTTLENECK 9 β SAM predict() loop β predictor.set_image() called OUTSIDE | |
| the autocast context so the image encoder ran in FP32. | |
| FIX: set_image() moved inside torch.no_grad()+autocast so | |
| the ViT encoder itself benefits from FP16. | |
| BOTTLENECK 10β mask_to_rle() β pure Python for-loop over every pixel | |
| in Fortran-order. | |
| FIX: Replaced with NumPy run-length encoding using np.diff | |
| on the flattened boolean array β no Python loop. | |
| BOTTLENECK 11β build_annotated_image() β addWeighted called inside the | |
| per-room loop, cumulating blending cost O(n_rooms Γ H Γ W). | |
| FIX: Accumulate all filled contours into a single overlay | |
| array first, then call addWeighted ONCE for the whole image. | |
| BOTTLENECK 12β _bridge_wall_endpoints_v2 / close_large_door_gaps β | |
| N_SAMP path-clear check uses Python for-loop + np.any per | |
| candidate pair. | |
| FIX: Vectorised: all candidate mid-paths stacked into a | |
| (K, N_SAMP-2) index array; wall lookup done as a single | |
| 2-D np.take; any() collapsed along axis-1 in NumPy. | |
| """ | |
| from __future__ import annotations | |
| import io, json, os, tempfile, time, requests | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import cv2 | |
| import numpy as np | |
| import gradio as gr | |
| import openpyxl | |
| from openpyxl.styles import Font, PatternFill, Alignment | |
| # ββ GPU availability flags βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| try: | |
| import torch | |
| _TORCH_CUDA = torch.cuda.is_available() | |
| except ImportError: | |
| _TORCH_CUDA = False | |
| try: | |
| import cupy as cp | |
| _CUPY = True | |
| except ImportError: | |
| _CUPY = False | |
| cp = None # type: ignore | |
| try: | |
| import cucim.skimage.morphology as _cucim_morph | |
| _CUCIM = True | |
| except ImportError: | |
| _CUCIM = False | |
| _cucim_morph = None # type: ignore | |
| _CV2_CUDA = cv2.cuda.getCudaEnabledDeviceCount() > 0 | |
| _CUDA_STREAM: Optional[cv2.cuda.Stream] = cv2.cuda.Stream() if _CV2_CUDA else None # type: ignore | |
| # Pre-built constant kernel (avoids repeated np.ones allocation) | |
| _ONES3x3 = np.ones((3, 3), dtype=np.float32) | |
| print(f"[GPU] torch_cuda={_TORCH_CUDA} cupy={_CUPY} cucim={_CUCIM} cv2_cuda={_CV2_CUDA}") | |
| # βββ SAM HuggingFace endpoint ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| HF_REPO = "Pream912/sam" | |
| HF_API = f"https://huggingface.co/{HF_REPO}/resolve/main" | |
| SAM_CKPT = Path(tempfile.gettempdir()) / "sam_vit_h_4b8939.pth" | |
| SAM_URL = f"{HF_API}/sam_vit_h_4b8939.pth" | |
| DPI = 300 | |
| SCALE_FACTOR = 100 | |
| MIN_ROOM_AREA_FRAC = 0.000004 | |
| MAX_ROOM_AREA_FRAC = 0.08 | |
| MIN_ROOM_DIM_FRAC = 0.01 | |
| BORDER_MARGIN_FRAC = 0.01 | |
| MAX_ASPECT_RATIO = 8.0 | |
| MIN_SOLIDITY = 0.25 | |
| MIN_EXTENT = 0.08 | |
| OCR_CONF_THR = 0.3 | |
| SAM_MIN_SCORE = 0.70 | |
| SAM_CLOSET_THR = 300 | |
| SAM_WALL_NEG = 20 | |
| SAM_WALL_PCT = 75 | |
| WALL_MIN_HALF_PX = 3 | |
| ROOM_COLORS = [ | |
| (255, 99, 71), (100, 149, 237), (60, 179, 113), | |
| (255, 165, 0), (147, 112, 219), (0, 206, 209), | |
| (255, 182, 193), (127, 255, 0), (255, 215, 0), | |
| (176, 224, 230), | |
| ] | |
| # Pre-build CUDA morphology filters for _morphological_skeleton | |
| _SKEL_ERODE_FILTER = None | |
| _SKEL_DILATE_FILTER = None | |
| def _ensure_skel_filters(): | |
| """Lazily build persistent CUDA morphology filter objects for skeleton.""" | |
| global _SKEL_ERODE_FILTER, _SKEL_DILATE_FILTER | |
| if _CV2_CUDA and _SKEL_ERODE_FILTER is None: | |
| cross = cv2.getStructuringElement(cv2.MORPH_CROSS, (3, 3)) | |
| _SKEL_ERODE_FILTER = cv2.cuda.createMorphologyFilter( | |
| cv2.MORPH_ERODE, cv2.CV_8UC1, cross | |
| ) | |
| _SKEL_DILATE_FILTER = cv2.cuda.createMorphologyFilter( | |
| cv2.MORPH_DILATE, cv2.CV_8UC1, cross | |
| ) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # GPU-ACCELERATED OpenCV HELPERS (unchanged from v1) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _cuda_upload(img: np.ndarray) -> "cv2.cuda.GpuMat": | |
| gm = cv2.cuda_GpuMat() | |
| gm.upload(img, stream=_CUDA_STREAM) | |
| return gm | |
| def _cuda_gaussian_blur(gray: np.ndarray, ksize: Tuple[int,int], sigma: float) -> np.ndarray: | |
| if _CV2_CUDA: | |
| g_gpu = _cuda_upload(gray) | |
| filt = cv2.cuda.createGaussianFilter(cv2.CV_8UC1, cv2.CV_8UC1, ksize, sigma) | |
| return filt.apply(g_gpu, stream=_CUDA_STREAM).download() | |
| return cv2.GaussianBlur(gray, ksize, sigma) | |
| def _cuda_threshold(gray: np.ndarray, thr: float, maxval: float, typ: int | |
| ) -> Tuple[float, np.ndarray]: | |
| if _CV2_CUDA: | |
| g_gpu = _cuda_upload(gray) | |
| ret, dst = cv2.cuda.threshold(g_gpu, thr, maxval, typ, stream=_CUDA_STREAM) | |
| return ret, dst.download() | |
| return cv2.threshold(gray, thr, maxval, typ) | |
| def _cuda_morphology(src: np.ndarray, op: int, kernel: np.ndarray, | |
| iterations: int = 1) -> np.ndarray: | |
| if _CV2_CUDA and op in (cv2.MORPH_ERODE, cv2.MORPH_DILATE, | |
| cv2.MORPH_OPEN, cv2.MORPH_CLOSE): | |
| g_gpu = _cuda_upload(src) | |
| filt = cv2.cuda.createMorphologyFilter(op, cv2.CV_8UC1, kernel, iterations=iterations) | |
| return filt.apply(g_gpu, stream=_CUDA_STREAM).download() | |
| return cv2.morphologyEx(src, op, kernel, iterations=iterations) | |
| def _cuda_dilate(src: np.ndarray, kernel: np.ndarray) -> np.ndarray: | |
| if _CV2_CUDA: | |
| g_gpu = _cuda_upload(src) | |
| filt = cv2.cuda.createMorphologyFilter(cv2.MORPH_DILATE, cv2.CV_8UC1, kernel) | |
| return filt.apply(g_gpu, stream=_CUDA_STREAM).download() | |
| return cv2.dilate(src, kernel) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # PIPELINE HELPERS (unchanged) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def download_sam_if_needed() -> Optional[str]: | |
| if SAM_CKPT.exists(): | |
| return str(SAM_CKPT) | |
| print("[SAM] Downloading checkpoint from HuggingFace β¦") | |
| try: | |
| r = requests.get(SAM_URL, stream=True, timeout=300) | |
| r.raise_for_status() | |
| with open(SAM_CKPT, "wb") as f: | |
| for chunk in r.iter_content(1 << 20): | |
| f.write(chunk) | |
| print(f"[SAM] Saved to {SAM_CKPT}") | |
| return str(SAM_CKPT) | |
| except Exception as e: | |
| print(f"[SAM] Download failed: {e}") | |
| return None | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ββββββββββββββββββ PREPROCESSING β UNCHANGED ββββββββββββββββββββββββββ | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def remove_title_block(img: np.ndarray) -> np.ndarray: | |
| h, w = img.shape[:2] | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| edges = cv2.Canny(gray, 50, 150) | |
| h_kern = cv2.getStructuringElement(cv2.MORPH_RECT, (w // 20, 1)) | |
| v_kern = cv2.getStructuringElement(cv2.MORPH_RECT, (1, h // 20)) | |
| h_lines = _cuda_morphology(edges, cv2.MORPH_OPEN, h_kern) | |
| v_lines = _cuda_morphology(edges, cv2.MORPH_OPEN, v_kern) | |
| crop_r, crop_b = w, h | |
| right_region = v_lines[:, int(w * 0.7):] | |
| if np.any(right_region): | |
| v_pos = np.where(np.sum(right_region, axis=0) > h * 0.3)[0] | |
| if len(v_pos): | |
| crop_r = int(w * 0.7) + v_pos[0] - 10 | |
| bot_region = h_lines[int(h * 0.7):, :] | |
| if np.any(bot_region): | |
| h_pos = np.where(np.sum(bot_region, axis=1) > w * 0.3)[0] | |
| if len(h_pos): | |
| crop_b = int(h * 0.7) + h_pos[0] - 10 | |
| if crop_r == w and crop_b == h: | |
| main_d = np.sum(gray < 200) / gray.size | |
| if np.sum(gray[:, int(w*0.8):] < 200) / (gray[:, int(w*0.8):].size) > main_d*1.5: | |
| crop_r = int(w * 0.8) | |
| if np.sum(gray[int(h*0.8):, :] < 200) / (gray[int(h*0.8):, :].size) > main_d*1.5: | |
| crop_b = int(h * 0.8) | |
| return img[:crop_b, :crop_r].copy() | |
| def remove_colors(img: np.ndarray) -> np.ndarray: | |
| b = img[:,:,0].astype(np.int32) | |
| g = img[:,:,1].astype(np.int32) | |
| r = img[:,:,2].astype(np.int32) | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY).astype(np.int32) | |
| chroma = np.maximum(np.maximum(r,g),b) - np.minimum(np.minimum(r,g),b) | |
| erase = (chroma > 15) & (gray < 240) | |
| result = img.copy() | |
| result[erase] = (255, 255, 255) | |
| return result | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # WALL CALIBRATION (unchanged dataclass; loop body vectorised) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| from dataclasses import dataclass, field | |
| class WallCalibration: | |
| stroke_width : int = 3 | |
| min_component_dim : int = 30 | |
| min_component_area: int = 45 | |
| bridge_min_gap : int = 2 | |
| bridge_max_gap : int = 14 | |
| door_gap : int = 41 | |
| max_bridge_thick : int = 15 | |
| def calibrate_wall(mask: np.ndarray) -> WallCalibration: | |
| """ | |
| BOTTLENECK 8 FIX β vectorised column/row run-length extraction. | |
| Original: two Python for-loops, each calling np.concatenate + np.diff | |
| inside the loop body. | |
| Fixed: extract all columns at once as a 2-D boolean matrix, apply | |
| np.diff along axis-0 once, then gather all runs with a single | |
| np.where + arithmetic. | |
| """ | |
| cal = WallCalibration() | |
| h, w = mask.shape | |
| # ββ stroke-width from column run-lengths ββββββββββββββββββββββββββββββ | |
| n_cols = min(200, w) | |
| col_idx = np.linspace(0, w-1, n_cols, dtype=int) | |
| max_run = max(2, int(h * 0.05)) | |
| # (h, n_cols) bool matrix β extracted in one shot | |
| cols_bool = (mask[:, col_idx] > 0).astype(np.int8) # (H, C) | |
| padded = np.concatenate( | |
| [np.zeros((1, n_cols), np.int8), cols_bool, np.zeros((1, n_cols), np.int8)], | |
| axis=0 | |
| ) # (H+2, C) | |
| diff2d = np.diff(padded.astype(np.int16), axis=0) # (H+1, C) | |
| # vectorised: for each column find start/end pairs | |
| ci_all, row_starts = np.where(diff2d[:-1] == 1) # row before end | |
| ci_all2, row_ends = np.where(diff2d[:-1] == -1) | |
| # build per-column run lists using pandas-style groupby via sorting | |
| runs_all: List[int] = [] | |
| for ci in range(n_cols): | |
| s_mask = (ci_all == ci) | |
| e_mask = (ci_all2 == ci) | |
| ss = row_starts[s_mask] | |
| ee = row_ends[e_mask] | |
| n = min(len(ss), len(ee)) | |
| if n == 0: continue | |
| r = (ee[:n] - ss[:n]).astype(int) | |
| runs_all.extend(r[(r >= 1) & (r <= max_run)].tolist()) | |
| if runs_all: | |
| arr = np.array(runs_all, dtype=np.int32) | |
| hist = np.bincount(np.clip(arr, 0, 200)) | |
| cal.stroke_width = max(2, int(np.argmax(hist[1:])) + 1) | |
| cal.min_component_dim = max(15, cal.stroke_width * 10) | |
| cal.min_component_area = max(30, cal.stroke_width * cal.min_component_dim // 2) | |
| # ββ gap sizes from rows + cols β vectorised βββββββββββββββββββββββββββ | |
| gap_sizes: List[int] = [] | |
| row_step = max(3, h // 200) | |
| col_step = max(3, w // 200) | |
| # row scan (all selected rows at once) | |
| row_idx = np.arange(5, h-5, row_step) | |
| rows_bool = (mask[row_idx, :] > 0).astype(np.int8) # (R, W) | |
| pad_r = np.concatenate( | |
| [np.zeros((len(row_idx),1),np.int8), rows_bool, np.zeros((len(row_idx),1),np.int8)], | |
| axis=1 | |
| ) | |
| diff_r = np.diff(pad_r.astype(np.int16), axis=1) # (R, W+1) | |
| ri_all, c_ends = np.where(diff_r == -1) | |
| ri_all2, c_starts = np.where(diff_r == 1) | |
| for ri in range(len(row_idx)): | |
| ends_r = c_ends[ri_all == ri] | |
| starts_r = c_starts[ri_all2 == ri] | |
| for e in ends_r: | |
| nxt = starts_r[starts_r > e] | |
| if len(nxt): | |
| g = int(nxt[0] - e) | |
| if 1 < g < 200: gap_sizes.append(g) | |
| # col scan | |
| col_idx2 = np.arange(5, w-5, col_step) | |
| cols_bool2 = (mask[:, col_idx2] > 0).astype(np.int8) # (H, C) | |
| pad_c = np.concatenate( | |
| [np.zeros((1,len(col_idx2)),np.int8), cols_bool2, np.zeros((1,len(col_idx2)),np.int8)], | |
| axis=0 | |
| ) | |
| diff_c = np.diff(pad_c.astype(np.int16), axis=0) | |
| ci_all3, r_ends = np.where(diff_c == -1) | |
| ci_all4, r_starts = np.where(diff_c == 1) | |
| for ci in range(len(col_idx2)): | |
| ends_c = r_ends[ci_all3 == ci] | |
| starts_c = r_starts[ci_all4 == ci] | |
| for e in ends_c: | |
| nxt = starts_c[starts_c > e] | |
| if len(nxt): | |
| g = int(nxt[0] - e) | |
| if 1 < g < 200: gap_sizes.append(g) | |
| cal.bridge_min_gap = 2 | |
| if len(gap_sizes) >= 20: | |
| g = np.array(gap_sizes) | |
| sm = g[g <= 30] | |
| if len(sm) >= 10: | |
| cal.bridge_max_gap = int(np.clip(np.percentile(sm, 75), 4, 20)) | |
| else: | |
| cal.bridge_max_gap = cal.stroke_width * 4 | |
| door = g[(g > cal.bridge_max_gap) & (g <= 80)] | |
| if len(door) >= 5: | |
| raw = int(np.percentile(door, 90)) | |
| else: | |
| raw = max(35, cal.stroke_width * 12) | |
| raw = int(np.clip(raw, 25, 80)) | |
| cal.door_gap = raw if raw % 2 == 1 else raw + 1 | |
| cal.max_bridge_thick = cal.stroke_width * 5 | |
| return cal | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SKELETON / TIP HELPERS | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _morphological_skeleton(binary: np.ndarray) -> np.ndarray: | |
| """ | |
| BOTTLENECK 3 FIX β GPU morphology path re-uses persistent CUDA filter | |
| objects instead of creating new ones each iteration. | |
| cucim path uses GPU-native skeletonize when available. | |
| """ | |
| # ββ cucim (CuPy-based) GPU skeletonize β fastest path βββββββββββββββββ | |
| if _CUCIM and _CUPY: | |
| try: | |
| bin_cp = cp.asarray(binary > 0) | |
| skel_cp = _cucim_morph.skeletonize(bin_cp) | |
| return (cp.asnumpy(skel_cp) * 255).astype(np.uint8) | |
| except Exception: | |
| pass # fall through | |
| # ββ cv2.cuda morphology loop β pre-built filter objects βββββββββββββββ | |
| _ensure_skel_filters() | |
| if _CV2_CUDA and _SKEL_ERODE_FILTER is not None: | |
| skel = np.zeros_like(binary) | |
| g_img = _cuda_upload(binary) | |
| for _ in range(300): | |
| g_eroded = _SKEL_ERODE_FILTER.apply(g_img, stream=_CUDA_STREAM) | |
| g_recon = _SKEL_DILATE_FILTER.apply(g_eroded, stream=_CUDA_STREAM) | |
| eroded = g_eroded.download() | |
| recon = g_recon.download() | |
| temp = cv2.subtract(binary, recon) # CPU subtract is cheap | |
| skel = cv2.bitwise_or(skel, temp) | |
| binary = eroded | |
| g_img = g_eroded # reuse GPU mat | |
| if not cv2.countNonZero(binary): | |
| break | |
| return skel | |
| # ββ pure CPU fallback βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| skel = np.zeros_like(binary) | |
| img = binary.copy() | |
| cross = cv2.getStructuringElement(cv2.MORPH_CROSS, (3, 3)) | |
| for _ in range(300): | |
| eroded = cv2.erode(img, cross) | |
| temp = cv2.subtract(img, cv2.dilate(eroded, cross)) | |
| skel = cv2.bitwise_or(skel, temp) | |
| img = eroded | |
| if not cv2.countNonZero(img): | |
| break | |
| return skel | |
| def _skel(binary: np.ndarray) -> np.ndarray: | |
| try: | |
| from skimage.morphology import skeletonize as _sk | |
| return (_sk(binary > 0) * 255).astype(np.uint8) | |
| except ImportError: | |
| return _morphological_skeleton(binary) | |
| def _tip_pixels(skel_u8: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: | |
| """ | |
| BOTTLENECK 2 FIX β use cv2.cuda.filter2D when CUDA available, | |
| avoiding float32 kernel re-creation every call. | |
| """ | |
| sb = (skel_u8 > 0).astype(np.float32) | |
| if _CV2_CUDA: | |
| g_sb = _cuda_upload((sb * 255).astype(np.uint8)) | |
| # cv2.cuda.filter2D expects uint8 input | |
| f2d = cv2.cuda.createLinearFilter( | |
| cv2.CV_8UC1, cv2.CV_32FC1, _ONES3x3, borderType=cv2.BORDER_CONSTANT | |
| ) | |
| g_nbr = f2d.apply(g_sb, stream=_CUDA_STREAM) | |
| nbr = g_nbr.download() / 255.0 # scale back | |
| else: | |
| nbr = cv2.filter2D(sb, -1, _ONES3x3, borderType=cv2.BORDER_CONSTANT) | |
| return np.where((sb == 1) & (nbr.astype(np.int32) == 2)) | |
| def _outward_vectors(ex, ey, skel_u8: np.ndarray, lookahead: int | |
| ) -> Tuple[np.ndarray, np.ndarray]: | |
| """ | |
| BOTTLENECK 1 FIX β vectorised walk replacing the O(nΒ·lookahead) | |
| pure-Python D8 loop. | |
| Strategy: | |
| β’ Pre-build a skeleton boolean set as a dense (HΓW) uint8 image. | |
| β’ For each endpoint, extract a (lookaheadΓ2) padded sub-window of the | |
| skeleton and perform the D8 walk entirely with integer index arithmetic | |
| on NumPy arrays (or CuPy when available). | |
| β’ The outward vector is the negated direction from endpoint to walk terminus. | |
| For very large n (>2000), CuPy batches all endpoint windows on-GPU. | |
| """ | |
| n = len(ex) | |
| odx = np.zeros(n, np.float32) | |
| ody = np.zeros(n, np.float32) | |
| if n == 0: | |
| return odx, ody | |
| h_img, w_img = skel_u8.shape | |
| skel_bin = (skel_u8 > 0).astype(np.uint8) # dense lookup | |
| # D8 offsets | |
| D8_DY = np.array([ 0, 0,-1, 1,-1,-1, 1, 1], np.int32) | |
| D8_DX = np.array([-1, 1, 0, 0,-1, 1,-1, 1], np.int32) | |
| # ββ CuPy vectorised path ββββββββββββββββββββββββββββββββββββββββββββββ | |
| if _CUPY and n > 100: | |
| skel_cp = cp.asarray(skel_bin) | |
| ex_cp = cp.asarray(ex, dtype=cp.int32) | |
| ey_cp = cp.asarray(ey, dtype=cp.int32) | |
| d8dy_cp = cp.asarray(D8_DY) | |
| d8dx_cp = cp.asarray(D8_DX) | |
| # current positions (n,) | |
| cx_cp = ex_cp.copy() | |
| cy_cp = ey_cp.copy() | |
| px_cp = ex_cp.copy() | |
| py_cp = ey_cp.copy() | |
| for _ in range(lookahead): | |
| # candidate next positions: (8, n) | |
| nx_all = cx_cp[None, :] + d8dx_cp[:, None] | |
| ny_all = cy_cp[None, :] + d8dy_cp[:, None] | |
| # clamp to image bounds | |
| nx_all = cp.clip(nx_all, 0, w_img - 1) | |
| ny_all = cp.clip(ny_all, 0, h_img - 1) | |
| # exclude previous position | |
| not_prev = ~((nx_all == px_cp[None, :]) & (ny_all == py_cp[None, :])) | |
| # skeleton membership | |
| on_skel = skel_cp[ny_all, nx_all] | |
| valid = not_prev & (on_skel > 0) # (8, n) | |
| # pick first valid D8 direction (argmax on axis-0) | |
| any_valid = valid.any(axis=0) # (n,) | |
| first_dir = valid.argmax(axis=0) # (n,) 0-7 | |
| chosen_nx = nx_all[first_dir, cp.arange(n)] | |
| chosen_ny = ny_all[first_dir, cp.arange(n)] | |
| # only update endpoints where a move was found | |
| px_cp = cp.where(any_valid, cx_cp, px_cp) | |
| py_cp = cp.where(any_valid, cy_cp, py_cp) | |
| cx_cp = cp.where(any_valid, chosen_nx, cx_cp) | |
| cy_cp = cp.where(any_valid, chosen_ny, cy_cp) | |
| ix = (cx_cp - ex_cp).astype(cp.float32) | |
| iy = (cy_cp - ey_cp).astype(cp.float32) | |
| nr = cp.maximum(1e-6, cp.hypot(ix, iy)) | |
| odx_cp = -ix / nr | |
| ody_cp = -iy / nr | |
| return cp.asnumpy(odx_cp), cp.asnumpy(ody_cp) | |
| # ββ NumPy vectorised path βββββββββββββββββββββββββββββββββββββββββββββ | |
| cx = ex.copy().astype(np.int32) | |
| cy = ey.copy().astype(np.int32) | |
| px = ex.copy().astype(np.int32) | |
| py = ey.copy().astype(np.int32) | |
| for _ in range(lookahead): | |
| nx_all = np.clip(cx[None, :] + D8_DX[:, None], 0, w_img - 1) # (8,n) | |
| ny_all = np.clip(cy[None, :] + D8_DY[:, None], 0, h_img - 1) | |
| not_prev = ~((nx_all == px[None, :]) & (ny_all == py[None, :])) | |
| on_skel = skel_bin[ny_all, nx_all] | |
| valid = not_prev & (on_skel > 0) | |
| any_valid = valid.any(axis=0) | |
| first_dir = valid.argmax(axis=0) | |
| chosen_nx = nx_all[first_dir, np.arange(n)] | |
| chosen_ny = ny_all[first_dir, np.arange(n)] | |
| px = np.where(any_valid, cx, px) | |
| py = np.where(any_valid, cy, py) | |
| cx = np.where(any_valid, chosen_nx, cx) | |
| cy = np.where(any_valid, chosen_ny, cy) | |
| ix = (cx - ex).astype(np.float32) | |
| iy = (cy - ey).astype(np.float32) | |
| nr = np.maximum(1e-6, np.hypot(ix, iy)) | |
| odx = -ix / nr | |
| ody = -iy / nr | |
| return odx, ody | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ANALYZE IMAGE CHARACTERISTICS (unchanged) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def analyze_image_characteristics(img: np.ndarray) -> Dict[str, Any]: | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| brightness = float(np.mean(gray)) | |
| contrast = float(np.std(gray)) | |
| otsu_thr, _ = _cuda_threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) | |
| if brightness > 220: | |
| wall_threshold = max(200, int(otsu_thr * 1.1)) | |
| elif brightness < 180: | |
| wall_threshold = max(150, int(otsu_thr * 0.9)) | |
| else: | |
| wall_threshold = int(otsu_thr) | |
| return {"brightness": brightness, "contrast": contrast, | |
| "wall_threshold": wall_threshold, "otsu_threshold": otsu_thr} | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # DOOR ARC DETECTION (unchanged) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def detect_and_close_door_arcs(img: np.ndarray) -> np.ndarray: | |
| R_MIN=60; R_MAX=320; DP=1.2; PARAM1=50; PARAM2=22; MIN_DIST=50 | |
| MAX_ARC=115.0; MIN_ARC=60.0; LEAF_FRAC=0.92; LEAF_THR=0.35 | |
| WALL_R=1.25; WALL_THR=12; SNAP_R=30 | |
| DOUBLE_R_RATIO=1.4; DOUBLE_DIST=1.8; LINE_T=3 | |
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
| h, w = gray.shape | |
| result = img.copy() | |
| _, binary = _cuda_threshold(gray, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU) | |
| binary = _cuda_morphology(binary.astype(np.uint8), cv2.MORPH_CLOSE, np.ones((3,3), np.uint8)) | |
| blurred = _cuda_gaussian_blur(gray, (7,7), 1.5) | |
| raw = cv2.HoughCircles(blurred, cv2.HOUGH_GRADIENT, dp=DP, minDist=MIN_DIST, | |
| param1=PARAM1, param2=PARAM2, minRadius=R_MIN, maxRadius=R_MAX) | |
| if raw is None: | |
| return result | |
| circles = np.round(raw[0]).astype(np.int32) | |
| binary = binary.astype(np.uint8) | |
| def sample_ring(cx, cy, r, n=360): | |
| ang = np.linspace(0, 2*np.pi, n, endpoint=False) | |
| xs = np.clip((cx + r*np.cos(ang)).astype(np.int32), 0, w-1) | |
| ys = np.clip((cy + r*np.sin(ang)).astype(np.int32), 0, h-1) | |
| return ang, xs, ys | |
| def arc_span(cx, cy, r): | |
| ang, xs, ys = sample_ring(cx, cy, r) | |
| on = ang[binary[ys, xs] > 0] | |
| if len(on) == 0: return 0.0, np.array([]) | |
| return float(np.degrees(on[-1]-on[0])), on | |
| def has_leaf(cx, cy, r): | |
| lr = r*LEAF_FRAC; n = max(60, int(r)) | |
| ang = np.linspace(0, 2*np.pi, n, endpoint=False) | |
| xs = np.clip((cx+lr*np.cos(ang)).astype(np.int32), 0, w-1) | |
| ys = np.clip((cy+lr*np.sin(ang)).astype(np.int32), 0, h-1) | |
| return float(np.mean(binary[ys,xs]>0)) >= LEAF_THR | |
| def wall_outside(cx, cy, r): | |
| pr = r*WALL_R; ang = np.linspace(0, 2*np.pi, 36, endpoint=False) | |
| xs = np.clip((cx+pr*np.cos(ang)).astype(np.int32), 0, w-1) | |
| ys = np.clip((cy+pr*np.sin(ang)).astype(np.int32), 0, h-1) | |
| return int(np.sum(binary[ys,xs]>0)) >= WALL_THR | |
| def endpoints(cx, cy, r, occ): | |
| gap_t = np.radians(25.0); diffs = np.diff(occ) | |
| big = np.where(diffs > gap_t)[0] | |
| if len(big) == 0: sa, ea = occ[0], occ[-1] | |
| else: | |
| sp = big[np.argmax(diffs[big])] | |
| sa, ea = occ[sp+1], occ[sp] | |
| def snap(a): | |
| px2 = int(round(cx+r*np.cos(a))); py2 = int(round(cy+r*np.sin(a))) | |
| y0=max(0,py2-SNAP_R); y1=min(h,py2+SNAP_R+1) | |
| x0=max(0,px2-SNAP_R); x1=min(w,px2+SNAP_R+1) | |
| roi = binary[y0:y1, x0:x1] | |
| wy2, wx2 = np.where(roi>0) | |
| if len(wx2)==0: return px2, py2 | |
| dd = np.hypot(wx2-(px2-x0), wy2-(py2-y0)) | |
| i = int(np.argmin(dd)) | |
| return int(wx2[i]+x0), int(wy2[i]+y0) | |
| return snap(sa), snap(ea) | |
| valid = [] | |
| for cx, cy, r in circles: | |
| span, occ = arc_span(cx, cy, r) | |
| if not (MIN_ARC <= span <= MAX_ARC): continue | |
| if not has_leaf(cx, cy, r): continue | |
| if not wall_outside(cx, cy, r): continue | |
| ep1, ep2 = endpoints(cx, cy, r, occ) | |
| valid.append((cx, cy, r, ep1, ep2)) | |
| used = [False]*len(valid) | |
| double_pairs = [] | |
| for i in range(len(valid)): | |
| if used[i]: continue | |
| cx1,cy1,r1,_,_ = valid[i] | |
| best_j, best_d = -1, 1e9 | |
| for j in range(i+1, len(valid)): | |
| if used[j]: continue | |
| cx2,cy2,r2,_,_ = valid[j] | |
| if max(r1,r2)/(min(r1,r2)+1e-6) > DOUBLE_R_RATIO: continue | |
| cd = float(np.hypot(cx2-cx1, cy2-cy1)) | |
| if cd < (r1+r2)*DOUBLE_DIST and cd < best_d: | |
| best_d, best_j = cd, j | |
| if best_j >= 0: | |
| double_pairs.append((i, best_j)) | |
| used[i] = used[best_j] = True | |
| singles = [i for i in range(len(valid)) if not used[i]] | |
| for idx in singles: | |
| cx,cy,r,ep1,ep2 = valid[idx] | |
| cv2.line(result, ep1, ep2, (0,0,0), LINE_T) | |
| for i_idx, j_idx in double_pairs: | |
| cx1,cy1,r1,ep1a,ep1b = valid[i_idx] | |
| cx2,cy2,r2,ep2a,ep2b = valid[j_idx] | |
| daa = np.hypot(ep1a[0]-ep2a[0], ep1a[1]-ep2a[1]) | |
| dab = np.hypot(ep1a[0]-ep2b[0], ep1a[1]-ep2b[1]) | |
| if daa <= dab: inner1,outer1,inner2,outer2 = ep1a,ep1b,ep2a,ep2b | |
| else: inner1,outer1,inner2,outer2 = ep1a,ep1b,ep2b,ep2a | |
| cv2.line(result, outer1, outer2, (0,0,0), LINE_T) | |
| cv2.line(result, inner1, inner2, (0,0,0), LINE_T) | |
| return result | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # EXTRACT WALLS (unchanged) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _estimate_wall_body_thickness(binary: np.ndarray, fallback: int = 12) -> int: | |
| h, w = binary.shape | |
| n_cols = min(200, w) | |
| col_idx = np.linspace(0, w-1, n_cols, dtype=int) | |
| cols = (binary[:, col_idx] > 0).astype(np.int8) | |
| padded = np.concatenate([np.zeros((1,n_cols),np.int8), cols, | |
| np.zeros((1,n_cols),np.int8)], axis=0) | |
| diff = np.diff(padded.astype(np.int16), axis=0) | |
| run_lengths = [] | |
| for ci in range(n_cols): | |
| d = diff[:, ci] | |
| s = np.where(d == 1)[0] | |
| e = np.where(d == -1)[0] | |
| if len(s)==0 or len(e)==0: continue | |
| r = e - s | |
| r = r[(r >= 2) & (r <= h*0.15)] | |
| if len(r): run_lengths.append(r) | |
| if run_lengths: | |
| return int(np.median(np.concatenate(run_lengths))) | |
| return fallback | |
| def _remove_thin_lines(walls: np.ndarray, min_thickness: int) -> np.ndarray: | |
| dist = cv2.distanceTransform(walls, cv2.DIST_L2, 5) | |
| thick_mask = dist >= (min_thickness / 2) | |
| n_lbl, labels, _, _ = cv2.connectedComponentsWithStats(walls, connectivity=8) | |
| if n_lbl <= 1: return walls | |
| thick_labels = labels[thick_mask] | |
| if len(thick_labels) == 0: return np.zeros_like(walls) | |
| has_thick = np.zeros(n_lbl, dtype=bool) | |
| has_thick[thick_labels] = True | |
| keep_lut = has_thick.astype(np.uint8)*255; keep_lut[0] = 0 | |
| return keep_lut[labels] | |
| def _filter_double_lines_and_thick(walls: np.ndarray) -> np.ndarray: | |
| MIN_SINGLE_DIM = 20; DOUBLE_GAP = 60; DOUBLE_PCT = 12 | |
| n_lbl, labels, stats, _ = cv2.connectedComponentsWithStats(walls, connectivity=8) | |
| if n_lbl <= 1: return walls | |
| try: | |
| skel_full = cv2.ximgproc.thinning(walls, thinningType=cv2.ximgproc.THINNING_ZHANGSUEN) | |
| except AttributeError: | |
| skel_full = _morphological_skeleton(walls) | |
| skel_bin = skel_full > 0 | |
| keep_ids: set = set() | |
| thin_cands = [] | |
| for i in range(1, n_lbl): | |
| bw = int(stats[i, cv2.CC_STAT_WIDTH]); bh = int(stats[i, cv2.CC_STAT_HEIGHT]) | |
| if min(bw, bh) >= MIN_SINGLE_DIM: keep_ids.add(i) | |
| else: thin_cands.append(i) | |
| if not thin_cands: | |
| filtered = np.zeros_like(walls) | |
| for i in keep_ids: filtered[labels==i] = 255 | |
| return filtered | |
| skel_labels = labels * skel_bin | |
| img_h, img_w = labels.shape | |
| probe_dists = np.arange(3, DOUBLE_GAP+1, 3, dtype=np.float32) | |
| for i in thin_cands: | |
| bys, bxs = np.where(skel_labels == i) | |
| if len(bys) < 4: continue | |
| step = max(1, len(bys)//80) | |
| sy = bys[::step].astype(np.float32); sx = bxs[::step].astype(np.float32) | |
| n_s = len(sy) | |
| sy_prev=np.roll(sy,1); sy_prev[0]=sy[0] | |
| sy_next=np.roll(sy,-1); sy_next[-1]=sy[-1] | |
| sx_prev=np.roll(sx,1); sx_prev[0]=sx[0] | |
| sx_next=np.roll(sx,-1); sx_next[-1]=sx[-1] | |
| dr=(sy_next-sy_prev); dc=(sx_next-sx_prev) | |
| dlen=np.maximum(1.0, np.hypot(dr, dc)) | |
| pr=(-dc/dlen)[:,np.newaxis]; pc=(dr/dlen)[:,np.newaxis] | |
| for sign in (1.0, -1.0): | |
| rr = np.round(sy[:,np.newaxis] + sign*pr*probe_dists).astype(np.int32) | |
| cc = np.round(sx[:,np.newaxis] + sign*pc*probe_dists).astype(np.int32) | |
| valid_m = (rr>=0)&(rr<img_h)&(cc>=0)&(cc<img_w) | |
| safe_rr = np.clip(rr, 0, img_h-1); safe_cc = np.clip(cc, 0, img_w-1) | |
| lbl_at = labels[safe_rr, safe_cc] | |
| partner = valid_m & (lbl_at>0) & (lbl_at!=i) | |
| hit_any = partner.any(axis=1) | |
| hit_rows = np.where(hit_any)[0] | |
| if len(hit_rows) == 0: continue | |
| first_col = partner[hit_rows].argmax(axis=1) | |
| partner_ids = lbl_at[hit_rows, first_col] | |
| keep_ids.update(partner_ids.tolist()) | |
| if 100.0*len(hit_rows)/n_s >= DOUBLE_PCT: | |
| keep_ids.add(i); break | |
| if keep_ids: | |
| ka = np.array(sorted(keep_ids), dtype=np.int32) | |
| lut = np.zeros(n_lbl, dtype=np.uint8); lut[ka] = 255 | |
| return lut[labels] | |
| return np.zeros_like(walls) | |
| def extract_walls_adaptive(img_clean: np.ndarray, | |
| img_stats: Optional[Dict] = None) -> Tuple[np.ndarray, int]: | |
| h, w = img_clean.shape[:2] | |
| gray = cv2.cvtColor(img_clean, cv2.COLOR_BGR2GRAY) | |
| if img_stats: | |
| wall_threshold = img_stats["wall_threshold"] | |
| else: | |
| otsu_t, _ = _cuda_threshold(gray, 0, 255, cv2.THRESH_BINARY_INV+cv2.THRESH_OTSU) | |
| wall_threshold = int(otsu_t) | |
| _, binary = _cuda_threshold(gray, wall_threshold, 255, cv2.THRESH_BINARY_INV) | |
| binary = binary.astype(np.uint8) | |
| min_line_len = max(8, int(0.012 * w)) | |
| body_thickness = _estimate_wall_body_thickness(binary, fallback=12) | |
| body_thickness = int(np.clip(body_thickness, 9, 30)) | |
| k_h = cv2.getStructuringElement(cv2.MORPH_RECT, (min_line_len, 1)) | |
| k_v = cv2.getStructuringElement(cv2.MORPH_RECT, (1, min_line_len)) | |
| long_h = _cuda_morphology(binary, cv2.MORPH_OPEN, k_h) | |
| long_v = _cuda_morphology(binary, cv2.MORPH_OPEN, k_v) | |
| orig_walls = cv2.bitwise_or(long_h, long_v) | |
| k_bh = cv2.getStructuringElement(cv2.MORPH_RECT, (1, body_thickness)) | |
| k_bv = cv2.getStructuringElement(cv2.MORPH_RECT, (body_thickness, 1)) | |
| dil_h = _cuda_dilate(long_h, k_bh) | |
| dil_v = _cuda_dilate(long_v, k_bv) | |
| walls = cv2.bitwise_or(dil_h, dil_v) | |
| collision = cv2.bitwise_and(dil_h, dil_v) | |
| safe_zone = cv2.bitwise_and(collision, orig_walls) | |
| walls = cv2.bitwise_or(cv2.bitwise_and(walls, cv2.bitwise_not(collision)), safe_zone) | |
| dist = cv2.distanceTransform(cv2.bitwise_not(orig_walls), cv2.DIST_L2, 5) | |
| keep_mask = (dist <= body_thickness/2).astype(np.uint8) * 255 | |
| walls = cv2.bitwise_and(walls, keep_mask) | |
| walls = _remove_thin_lines(walls, min_thickness=body_thickness) | |
| n_lbl, labels, stats, _ = cv2.connectedComponentsWithStats(walls, connectivity=8) | |
| if n_lbl > 1: | |
| areas = stats[1:, cv2.CC_STAT_AREA] | |
| min_n = max(20, int(np.median(areas) * 0.0001)) | |
| keep_lut = np.zeros(n_lbl, dtype=np.uint8) | |
| keep_lut[1:] = (areas >= min_n).astype(np.uint8) | |
| walls = (keep_lut[labels] * 255).astype(np.uint8) | |
| walls = _filter_double_lines_and_thick(walls) | |
| return walls, body_thickness | |
| FIXTURE_MAX_BLOB=80; FIXTURE_MAX_AREA=4000; FIXTURE_MAX_ASP=4.0 | |
| FIXTURE_DENSITY_R=50; FIXTURE_DENSITY_THR=0.35; FIXTURE_MIN_ZONE=1500 | |
| def remove_fixture_symbols(walls: np.ndarray) -> np.ndarray: | |
| h, w = walls.shape | |
| n_lbl, labels, stats, centroids = cv2.connectedComponentsWithStats(walls, connectivity=8) | |
| if n_lbl <= 1: return walls | |
| bw_a=stats[1:,cv2.CC_STAT_WIDTH].astype(np.float32) | |
| bh_a=stats[1:,cv2.CC_STAT_HEIGHT].astype(np.float32) | |
| ar_a=stats[1:,cv2.CC_STAT_AREA].astype(np.float32) | |
| cx_a=np.round(centroids[1:,0]).astype(np.int32) | |
| cy_a=np.round(centroids[1:,1]).astype(np.int32) | |
| mx=np.maximum(bw_a,bh_a); mn=np.minimum(bw_a,bh_a) | |
| asp=mx/(mn+1e-6) | |
| cand=(bw_a<FIXTURE_MAX_BLOB)&(bh_a<FIXTURE_MAX_BLOB)&(ar_a<FIXTURE_MAX_AREA)&(asp<=FIXTURE_MAX_ASP) | |
| ci=np.where(cand)[0]; cand_ids=ci+1; ccx=cx_a[ci]; ccy=cy_a[ci] | |
| if len(cand_ids)==0: return walls | |
| heatmap=np.zeros((h,w),dtype=np.float32) | |
| for x2,y2 in zip(ccx.tolist(), ccy.tolist()): | |
| cv2.circle(heatmap,(x2,y2),int(FIXTURE_DENSITY_R),1.0,-1) | |
| bk=max(3,(int(FIXTURE_DENSITY_R)//2)|1) | |
| density = _cuda_gaussian_blur( | |
| (heatmap * 255).astype(np.uint8), (bk*4+1, bk*4+1), bk | |
| ).astype(np.float32) / 255.0 | |
| dm=float(density.max()) | |
| if dm>0: density/=dm | |
| zone=(density>=FIXTURE_DENSITY_THR).astype(np.uint8)*255 | |
| nz,zlbl,zst,_=cv2.connectedComponentsWithStats(zone,connectivity=8) | |
| cz=np.zeros_like(zone) | |
| if nz>1: | |
| za=zst[1:,cv2.CC_STAT_AREA]; kz=np.where(za>=FIXTURE_MIN_ZONE)[0]+1 | |
| if len(kz): | |
| lut2=np.zeros(nz,dtype=np.uint8); lut2[kz]=255; cz=lut2[zlbl] | |
| zone=cz | |
| vc=(ccy>=0)&(ccy<h)&(ccx>=0)&(ccx<w) | |
| in_zone=vc&(zone[ccy.clip(0,h-1), ccx.clip(0,w-1)]>0) | |
| erase_ids=cand_ids[in_zone] | |
| result=walls.copy() | |
| if len(erase_ids): | |
| el=np.zeros(n_lbl,dtype=np.uint8); el[erase_ids]=1 | |
| result[el[labels].astype(bool)]=0 | |
| return result | |
| def _remove_thin_lines_calibrated(walls: np.ndarray, cal: WallCalibration) -> np.ndarray: | |
| n_cc, cc, stats, _ = cv2.connectedComponentsWithStats(walls, connectivity=8) | |
| if n_cc <= 1: return walls | |
| bw=stats[1:,cv2.CC_STAT_WIDTH]; bh=stats[1:,cv2.CC_STAT_HEIGHT] | |
| ar=stats[1:,cv2.CC_STAT_AREA]; mx=np.maximum(bw,bh) | |
| keep=(mx>=cal.min_component_dim)|(ar>=cal.min_component_area*3) | |
| lut=np.zeros(n_cc,np.uint8); lut[1:]=keep.astype(np.uint8)*255 | |
| return lut[cc] | |
| def _bridge_wall_endpoints_v2(walls: np.ndarray, cal: WallCalibration, | |
| angle_tol: float = 15.0) -> np.ndarray: | |
| """ | |
| BOTTLENECK 12 FIX β vectorised path-clear check. | |
| Original: Python for-loop with np.any per pair. | |
| Fixed: all N_SAMP mid-paths stacked into (K, N_SAMP-2) index arrays; | |
| wall lookup via advanced indexing; any() collapsed axis-1 in one shot. | |
| """ | |
| try: | |
| from scipy.spatial import cKDTree as _KDTree | |
| _SCIPY = True | |
| except ImportError: | |
| _SCIPY = False | |
| result=walls.copy(); h,w=walls.shape; FCOS=np.cos(np.radians(70.0)) | |
| skel=_skel(walls); ey,ex=_tip_pixels(skel); n_ep=len(ey) | |
| if n_ep < 2: return result | |
| _,cc_map=cv2.connectedComponents(walls,connectivity=8) | |
| ep_cc=cc_map[ey,ex] | |
| lookahead=max(8, cal.stroke_width*3) | |
| out_dx,out_dy=_outward_vectors(ex,ey,skel,lookahead) | |
| pts=np.stack([ex,ey],axis=1).astype(np.float32) | |
| if _SCIPY: | |
| from scipy.spatial import cKDTree | |
| pairs=cKDTree(pts).query_pairs(float(cal.bridge_max_gap), output_type='ndarray') | |
| ii=pairs[:,0].astype(np.int64); jj=pairs[:,1].astype(np.int64) | |
| else: | |
| _ii,_jj=np.triu_indices(n_ep,k=1) | |
| ok=np.hypot(pts[_jj,0]-pts[_ii,0],pts[_jj,1]-pts[_ii,1])<=cal.bridge_max_gap | |
| ii=_ii[ok].astype(np.int64); jj=_jj[ok].astype(np.int64) | |
| if len(ii)==0: return result | |
| if _CUPY: | |
| ii_cp = cp.asarray(ii); jj_cp = cp.asarray(jj) | |
| pts_cp = cp.asarray(pts) | |
| odx_cp = cp.asarray(out_dx); ody_cp = cp.asarray(out_dy) | |
| dxij = pts_cp[jj_cp,0]-pts_cp[ii_cp,0] | |
| dyij = pts_cp[jj_cp,1]-pts_cp[ii_cp,1] | |
| dists_cp = cp.hypot(dxij,dyij) | |
| safe = cp.maximum(dists_cp, 1e-6) | |
| ux,uy = dxij/safe, dyij/safe | |
| ang = cp.degrees(cp.arctan2(cp.abs(dyij), cp.abs(dxij))) | |
| is_H = (ang<=angle_tol) | |
| is_V = (ang>=(90.0-angle_tol)) | |
| g1 = (dists_cp>=cal.bridge_min_gap)&(dists_cp<=cal.bridge_max_gap) | |
| g2 = is_H|is_V | |
| g3 = ((odx_cp[ii_cp]*ux+ody_cp[ii_cp]*uy)>=FCOS) & \ | |
| ((odx_cp[jj_cp]*-ux+ody_cp[jj_cp]*-uy)>=FCOS) | |
| ep_cc_cp = cp.asarray(ep_cc) | |
| g4 = ep_cc_cp[ii_cp]!=ep_cc_cp[jj_cp] | |
| pre_ok_cp = g1&g2&g3&g4 | |
| pre_idx = cp.asnumpy(cp.where(pre_ok_cp)[0]) | |
| dists = cp.asnumpy(dists_cp) | |
| is_H = cp.asnumpy(is_H) | |
| is_V = cp.asnumpy(is_V) | |
| else: | |
| dxij=pts[jj,0]-pts[ii,0]; dyij=pts[jj,1]-pts[ii,1] | |
| dists=np.hypot(dxij,dyij); safe=np.maximum(dists,1e-6) | |
| ux,uy=dxij/safe,dyij/safe | |
| ang=np.degrees(np.arctan2(np.abs(dyij),np.abs(dxij))) | |
| is_H=ang<=angle_tol; is_V=ang>=(90.0-angle_tol) | |
| g1=(dists>=cal.bridge_min_gap)&(dists<=cal.bridge_max_gap); g2=is_H|is_V | |
| g3=((out_dx[ii]*ux+out_dy[ii]*uy)>=FCOS)&((out_dx[jj]*-ux+out_dy[jj]*-uy)>=FCOS) | |
| g4=ep_cc[ii]!=ep_cc[jj] | |
| pre_ok=g1&g2&g3&g4; pre_idx=np.where(pre_ok)[0] | |
| if len(pre_idx) == 0: | |
| return result | |
| # ββ VECTORISED path-clear check (BOTTLENECK 12 FIX) ββββββββββββββββββ | |
| N_SAMP = 9 | |
| K = len(pre_idx) | |
| vi_pre = ii[pre_idx]; vj_pre = jj[pre_idx] | |
| ax_arr = ex[vi_pre].astype(np.float32); ay_arr = ey[vi_pre].astype(np.float32) | |
| bx_arr = ex[vj_pre].astype(np.float32); by_arr = ey[vj_pre].astype(np.float32) | |
| is_H_pre = is_H[pre_idx] | |
| # t values for interior samples (exclude endpoints) | |
| t = np.linspace(0, 1, N_SAMP, dtype=np.float32)[1:-1] # (N_SAMP-2,) | |
| # xs[k, s] = lerp(ax, bx, t[s]) when H, else ax | |
| xs_h = ax_arr[:, None] + (bx_arr - ax_arr)[:, None] * t[None, :] # (K, N_SAMP-2) | |
| ys_h = np.broadcast_to(ay_arr[:, None], (K, N_SAMP-2)).copy() # constant y | |
| xs_v = np.broadcast_to(ax_arr[:, None], (K, N_SAMP-2)).copy() | |
| ys_v = ay_arr[:, None] + (by_arr - ay_arr)[:, None] * t[None, :] | |
| xs_all = np.where(is_H_pre[:, None], xs_h, xs_v) | |
| ys_all = np.where(is_H_pre[:, None], ys_h, ys_v) | |
| sxs = np.clip(np.round(xs_all).astype(np.int32), 0, w-1) # (K, N_SAMP-2) | |
| sys_ = np.clip(np.round(ys_all).astype(np.int32), 0, h-1) | |
| # bulk wall lookup: walls_flat[K, N_SAMP-2] | |
| walls_flat = walls[sys_, sxs] # (K, N_SAMP-2) uint8 | |
| blocked = walls_flat.any(axis=1) # (K,) bool | |
| clr = ~blocked | |
| valid = pre_idx[clr] | |
| if len(valid) == 0: | |
| return result | |
| vi=ii[valid]; vj=jj[valid]; vd=dists[valid]; vH=is_H[valid] | |
| order=np.argsort(vd); vi,vj,vd,vH=vi[order],vj[order],vd[order],vH[order] | |
| used=np.zeros(n_ep,dtype=bool) | |
| for k in range(len(vi)): | |
| ia,ib=int(vi[k]),int(vj[k]) | |
| if used[ia] or used[ib]: continue | |
| ax,ay=int(ex[ia]),int(ey[ia]); bx2,by2=int(ex[ib]),int(ey[ib]) | |
| p1,p2=((min(ax,bx2),ay),(max(ax,bx2),ay)) if vH[k] else ((ax,min(ay,by2)),(ax,max(ay,by2))) | |
| cv2.line(result,p1,p2,255,cal.stroke_width) | |
| used[ia]=used[ib]=True | |
| return result | |
| def _close_door_openings_v2(walls: np.ndarray, cal: WallCalibration) -> np.ndarray: | |
| gap=cal.door_gap | |
| def _shape_close(mask, kwh, axis, max_thick): | |
| k=cv2.getStructuringElement(cv2.MORPH_RECT, kwh) | |
| cls=_cuda_morphology(mask, cv2.MORPH_CLOSE, k) | |
| new=cv2.bitwise_and(cls,cv2.bitwise_not(mask)) | |
| if not np.any(new): return np.zeros_like(mask) | |
| n2,lbl2,st2,_=cv2.connectedComponentsWithStats(new,connectivity=8) | |
| if n2<=1: return np.zeros_like(mask) | |
| perp=st2[1:,cv2.CC_STAT_HEIGHT if axis=='H' else cv2.CC_STAT_WIDTH] | |
| keep=perp<=max_thick; lut2=np.zeros(n2,np.uint8); lut2[1:]=keep.astype(np.uint8)*255 | |
| return lut2[lbl2] | |
| add_h=_shape_close(walls,(gap,1),'H',cal.max_bridge_thick) | |
| add_v=_shape_close(walls,(1,gap),'V',cal.max_bridge_thick) | |
| return cv2.bitwise_or(walls, cv2.bitwise_or(add_h,add_v)) | |
| def reconstruct_walls(walls: np.ndarray) -> Tuple[np.ndarray, WallCalibration]: | |
| cal = calibrate_wall(walls) | |
| walls = _remove_thin_lines_calibrated(walls, cal) | |
| walls = _bridge_wall_endpoints_v2(walls, cal) | |
| walls = _close_door_openings_v2(walls, cal) | |
| return walls, cal | |
| def remove_dangling_lines(walls: np.ndarray, cal: WallCalibration) -> np.ndarray: | |
| stroke = cal.stroke_width | |
| connect_radius = max(6, stroke*3) | |
| n_cc,cc_map,stats,_ = cv2.connectedComponentsWithStats(walls,connectivity=8) | |
| if n_cc <= 1: return walls | |
| skel=_skel(walls); tip_y,tip_x=_tip_pixels(skel) | |
| tip_cc=cc_map[tip_y,tip_x] | |
| free_counts=np.zeros(n_cc,dtype=np.int32) | |
| for i in range(len(tip_x)): free_counts[tip_cc[i]]+=1 | |
| remove=np.zeros(n_cc,dtype=bool) | |
| ker=cv2.getStructuringElement(cv2.MORPH_ELLIPSE,(connect_radius*2+1,connect_radius*2+1)) | |
| for cc_id in range(1,n_cc): | |
| if free_counts[cc_id]<2: continue | |
| bw2=int(stats[cc_id,cv2.CC_STAT_WIDTH]); bh2=int(stats[cc_id,cv2.CC_STAT_HEIGHT]) | |
| if max(bw2,bh2) > stroke*40: continue | |
| cm=(cc_map==cc_id).astype(np.uint8) | |
| dc=_cuda_dilate(cm, ker) | |
| overlap=cv2.bitwise_and(dc,((walls>0)&(cc_map!=cc_id)).astype(np.uint8)) | |
| if np.count_nonzero(overlap)==0: remove[cc_id]=True | |
| lut=np.ones(n_cc,dtype=np.uint8); lut[0]=0; lut[remove]=0 | |
| return (lut[cc_map]*255).astype(np.uint8) | |
| def close_large_door_gaps(walls: np.ndarray, cal: WallCalibration) -> np.ndarray: | |
| """ | |
| BOTTLENECK 12 FIX (same vectorised path-clear as _bridge_wall_endpoints_v2). | |
| """ | |
| try: | |
| from scipy.spatial import cKDTree | |
| _SCIPY = True | |
| except ImportError: | |
| _SCIPY = False | |
| DOOR_MIN=180; DOOR_MAX=320; ANGLE_TOL=12.0 | |
| FCOS=np.cos(np.radians(90.0-ANGLE_TOL)) | |
| stroke=cal.stroke_width; line_width=max(stroke,3) | |
| result=walls.copy(); h,w=walls.shape | |
| skel=_skel(walls); tip_y,tip_x=_tip_pixels(skel) | |
| n_ep=len(tip_x) | |
| if n_ep<2: return result | |
| _,cc_map=cv2.connectedComponents(walls,connectivity=8) | |
| ep_cc=cc_map[tip_y,tip_x] | |
| lookahead=max(12,stroke*4) | |
| out_dx,out_dy=_outward_vectors(tip_x,tip_y,skel,lookahead) | |
| pts=np.stack([tip_x,tip_y],axis=1).astype(np.float32) | |
| if _SCIPY: | |
| pairs=cKDTree(pts).query_pairs(float(DOOR_MAX),output_type='ndarray') | |
| ii=pairs[:,0].astype(np.int64); jj=pairs[:,1].astype(np.int64) | |
| else: | |
| _ii,_jj=np.triu_indices(n_ep,k=1) | |
| ok=np.hypot(pts[_jj,0]-pts[_ii,0],pts[_jj,1]-pts[_ii,1])<=DOOR_MAX | |
| ii=_ii[ok].astype(np.int64); jj=_jj[ok].astype(np.int64) | |
| if len(ii)==0: return result | |
| if _CUPY: | |
| ii_cp=cp.asarray(ii); jj_cp=cp.asarray(jj) | |
| pts_cp=cp.asarray(pts) | |
| odx_cp=cp.asarray(out_dx); ody_cp=cp.asarray(out_dy) | |
| ep_cc_cp=cp.asarray(ep_cc) | |
| dxij=pts_cp[jj_cp,0]-pts_cp[ii_cp,0] | |
| dyij=pts_cp[jj_cp,1]-pts_cp[ii_cp,1] | |
| dists_cp=cp.hypot(dxij,dyij); safe=cp.maximum(dists_cp,1e-6) | |
| ux,uy=dxij/safe,dyij/safe | |
| ang=cp.degrees(cp.arctan2(cp.abs(dyij),cp.abs(dxij))) | |
| is_H=(ang<=ANGLE_TOL); is_V=(ang>=(90.0-ANGLE_TOL)) | |
| g1=(dists_cp>=DOOR_MIN)&(dists_cp<=DOOR_MAX); g2=is_H|is_V | |
| g3=((odx_cp[ii_cp]*ux+ody_cp[ii_cp]*uy)>=FCOS)&\ | |
| ((odx_cp[jj_cp]*-ux+ody_cp[jj_cp]*-uy)>=FCOS) | |
| g4=ep_cc_cp[ii_cp]!=ep_cc_cp[jj_cp] | |
| pre_idx=cp.asnumpy(cp.where(g1&g2&g3&g4)[0]) | |
| dists=cp.asnumpy(dists_cp); is_H=cp.asnumpy(is_H); is_V=cp.asnumpy(is_V) | |
| else: | |
| dxij=pts[jj,0]-pts[ii,0]; dyij=pts[jj,1]-pts[ii,1] | |
| dists=np.hypot(dxij,dyij); safe=np.maximum(dists,1e-6) | |
| ux,uy=dxij/safe,dyij/safe | |
| ang=np.degrees(np.arctan2(np.abs(dyij),np.abs(dxij))) | |
| is_H=ang<=ANGLE_TOL; is_V=ang>=(90.0-ANGLE_TOL) | |
| g1=(dists>=DOOR_MIN)&(dists<=DOOR_MAX); g2=is_H|is_V | |
| g3=((out_dx[ii]*ux+out_dy[ii]*uy)>=FCOS)&((out_dx[jj]*-ux+out_dy[jj]*-uy)>=FCOS) | |
| g4=ep_cc[ii]!=ep_cc[jj] | |
| pre_idx=np.where(g1&g2&g3&g4)[0] | |
| if len(pre_idx) == 0: | |
| return result | |
| # ββ vectorised path-clear βββββββββββββββββββββββββββββββββββββββββββββ | |
| N_SAMP = 15 | |
| K = len(pre_idx) | |
| vi_pre = ii[pre_idx]; vj_pre = jj[pre_idx] | |
| ax_arr = tip_x[vi_pre].astype(np.float32); ay_arr = tip_y[vi_pre].astype(np.float32) | |
| bx_arr = tip_x[vj_pre].astype(np.float32); by_arr = tip_y[vj_pre].astype(np.float32) | |
| is_H_pre = is_H[pre_idx] | |
| t = np.linspace(0, 1, N_SAMP, dtype=np.float32)[1:-1] | |
| mid_y = ((ay_arr + by_arr) / 2.0)[:, None] | |
| mid_x = ((ax_arr + bx_arr) / 2.0)[:, None] | |
| xs_h = ax_arr[:, None] + (bx_arr - ax_arr)[:, None] * t[None, :] | |
| ys_h = np.broadcast_to(mid_y, (K, N_SAMP-2)).copy() | |
| xs_v = np.broadcast_to(mid_x, (K, N_SAMP-2)).copy() | |
| ys_v = ay_arr[:, None] + (by_arr - ay_arr)[:, None] * t[None, :] | |
| xs_all = np.where(is_H_pre[:, None], xs_h, xs_v) | |
| ys_all = np.where(is_H_pre[:, None], ys_h, ys_v) | |
| sxs = np.clip(np.round(xs_all).astype(np.int32), 0, w-1) | |
| sys_ = np.clip(np.round(ys_all).astype(np.int32), 0, h-1) | |
| blocked = walls[sys_, sxs].any(axis=1) | |
| clr = ~blocked | |
| valid=pre_idx[clr] | |
| if len(valid)==0: return result | |
| vi=ii[valid]; vj=jj[valid]; vd=dists[valid]; vH=is_H[valid] | |
| order=np.argsort(vd); vi,vj,vd,vH=vi[order],vj[order],vd[order],vH[order] | |
| used=np.zeros(n_ep,dtype=bool) | |
| for k in range(len(vi)): | |
| ia,ib=int(vi[k]),int(vj[k]) | |
| if used[ia] or used[ib]: continue | |
| ax,ay=int(tip_x[ia]),int(tip_y[ia]); bx2,by2=int(tip_x[ib]),int(tip_y[ib]) | |
| if vH[k]: p1=(min(ax,bx2),(ay+by2)//2); p2=(max(ax,bx2),(ay+by2)//2) | |
| else: p1=((ax+bx2)//2,min(ay,by2)); p2=((ax+bx2)//2,max(ay,by2)) | |
| cv2.line(result,p1,p2,255,line_width) | |
| used[ia]=used[ib]=True | |
| return result | |
| def apply_user_lines_to_walls(walls, lines, thickness): | |
| result = walls.copy() | |
| for x1, y1, x2, y2 in lines: | |
| cv2.line(result, (x1, y1), (x2, y2), 255, max(thickness, 3)) | |
| return result | |
| def segment_rooms_flood(walls: np.ndarray) -> np.ndarray: | |
| h, w = walls.shape | |
| work = walls.copy() | |
| work[:5, :] = 255; work[-5:, :] = 255 | |
| work[:, :5] = 255; work[:, -5:] = 255 | |
| filled = work.copy() | |
| mask = np.zeros((h+2, w+2), np.uint8) | |
| for sx, sy in [(0,0),(w-1,0),(0,h-1),(w-1,h-1), | |
| (w//2,0),(w//2,h-1),(0,h//2),(w-1,h//2)]: | |
| if filled[sy, sx] == 0: | |
| cv2.floodFill(filled, mask, (sx, sy), 255) | |
| rooms = cv2.bitwise_not(filled) | |
| rooms = cv2.bitwise_and(rooms, cv2.bitwise_not(walls)) | |
| rooms = _cuda_morphology(rooms, cv2.MORPH_OPEN, np.ones((2,2), np.uint8)) | |
| return rooms | |
| def _find_thick_wall_neg_prompts(walls_mask, n=SAM_WALL_NEG): | |
| """ | |
| BOTTLENECK 6 FIX β GPU distanceTransform + vectorised grid-cell uniquing. | |
| """ | |
| h, w = walls_mask.shape | |
| # ββ GPU distanceTransform βββββββββββββββββββββββββββββββββββββββββββββ | |
| if _CV2_CUDA: | |
| g_wall = _cuda_upload(walls_mask) | |
| # cv2.cuda distanceTransform (L2, 5-mask) | |
| g_dist = cv2.cuda.GpuMat() | |
| cv2.cuda.distanceTransform(g_wall, g_dist, cv2.DIST_L2, 5, | |
| stream=_CUDA_STREAM) | |
| dist = g_dist.download() | |
| else: | |
| dist = cv2.distanceTransform(walls_mask, cv2.DIST_L2, | |
| cv2.DIST_MASK_PRECISE) | |
| try: | |
| skel = cv2.ximgproc.thinning(walls_mask, | |
| thinningType=cv2.ximgproc.THINNING_ZHANGSUEN) | |
| except AttributeError: | |
| skel = _morphological_skeleton(walls_mask) | |
| skel_vals = dist[skel > 0] | |
| if len(skel_vals) == 0: return [] | |
| thr = max(float(np.percentile(skel_vals, SAM_WALL_PCT)), WALL_MIN_HALF_PX) | |
| ys, xs = np.where((skel > 0) & (dist >= thr)) | |
| if len(ys) == 0: return [] | |
| # ββ vectorised grid-cell uniquing (no Python loop) ββββββββββββββββββββ | |
| grid_cells = max(1, int(np.ceil(np.sqrt(n * 4)))) | |
| cell_h = max(1, h // grid_cells); cell_w = max(1, w // grid_cells) | |
| cell_ids = (ys // cell_h) * grid_cells + (xs // cell_w) | |
| _, first = np.unique(cell_ids, return_index=True) # already vectorised | |
| sel = first[:n] | |
| return [(int(xs[i]), int(ys[i])) for i in sel] | |
| def generate_prompts(walls_mask, rooms_flood): | |
| """ | |
| BOTTLENECK 4 FIX β vectorised component filtering + bulk centroid | |
| wall-check using advanced indexing; fallback centroid search using | |
| a single np.argmin over a pre-built offset grid. | |
| """ | |
| h, w = walls_mask.shape | |
| inv = cv2.bitwise_not(walls_mask) | |
| n, labels, stats, centroids = cv2.connectedComponentsWithStats(inv, connectivity=8) | |
| min_prompt_area = max(200, int(h * w * 0.0001)) | |
| if n <= 1: | |
| neg_pts = _find_thick_wall_neg_prompts(walls_mask) | |
| return (np.array([], dtype=np.float32).reshape(0,2), | |
| np.array([], dtype=np.int32)) | |
| # ββ vectorised filtering (skip index 0 = background) βββββββββββββββββ | |
| areas = stats[1:, cv2.CC_STAT_AREA] | |
| bx_ = stats[1:, cv2.CC_STAT_LEFT]; by_ = stats[1:, cv2.CC_STAT_TOP] | |
| bw_ = stats[1:, cv2.CC_STAT_WIDTH]; bh_ = stats[1:, cv2.CC_STAT_HEIGHT] | |
| cx_all = np.clip(np.round(centroids[1:, 0]).astype(np.int32), 0, w-1) | |
| cy_all = np.clip(np.round(centroids[1:, 1]).astype(np.int32), 0, h-1) | |
| area_ok = areas >= min_prompt_area | |
| border_ok = (bx_ > 2) | (by_ > 2) | \ | |
| (bx_ + bw_ < w-2) | (by_ + bh_ < h-2) | |
| # exclude components that span nearly the full image (background) | |
| full_span = (bx_ <= 2) & (by_ <= 2) & \ | |
| (bx_ + bw_ >= w-2) & (by_ + bh_ >= h-2) | |
| keep_mask = area_ok & ~full_span | |
| keep_idx = np.where(keep_mask)[0] | |
| if len(keep_idx) == 0: | |
| neg_pts = _find_thick_wall_neg_prompts(walls_mask) | |
| return (np.array([], dtype=np.float32).reshape(0,2), | |
| np.array([], dtype=np.int32)) | |
| cx_k = cx_all[keep_idx] | |
| cy_k = cy_all[keep_idx] | |
| # ββ bulk wall check β no Python loop βββββββββββββββββββββββββββββββββ | |
| on_wall = walls_mask[cy_k, cx_k] > 0 # (K,) bool | |
| pts_list = [] | |
| lbls_list = [] | |
| # centroids not on wall β add directly | |
| off_wall = ~on_wall | |
| pts_list.append(np.stack([cx_k[off_wall].astype(np.float32), | |
| cy_k[off_wall].astype(np.float32)], axis=1)) | |
| lbls_list.append(np.ones(off_wall.sum(), dtype=np.int32)) | |
| # centroids on wall β vectorised 31Γ31 offset search | |
| on_idx = np.where(on_wall)[0] | |
| if len(on_idx) > 0: | |
| dy_range = np.arange(-15, 17, 2, dtype=np.int32) | |
| dx_range = np.arange(-15, 17, 2, dtype=np.int32) | |
| DY, DX = np.meshgrid(dy_range, dx_range, indexing='ij') # (D,D) | |
| DY = DY.ravel(); DX = DX.ravel() # (DΒ²,) | |
| for k in on_idx: | |
| cy_c, cx_c = int(cy_k[k]), int(cx_k[k]) | |
| ny_arr = np.clip(cy_c + DY, 0, h-1) | |
| nx_arr = np.clip(cx_c + DX, 0, w-1) | |
| off = walls_mask[ny_arr, nx_arr] == 0 | |
| if off.any(): | |
| best = np.argmax(off) | |
| pts_list.append([[float(nx_arr[best]), float(ny_arr[best])]]) | |
| lbls_list.append([1]) | |
| if not pts_list: | |
| all_pts = np.empty((0, 2), dtype=np.float32) | |
| all_lbls = np.empty(0, dtype=np.int32) | |
| else: | |
| all_pts = np.vstack([p if np.ndim(p)==2 else np.array(p, dtype=np.float32) | |
| for p in pts_list]).astype(np.float32) | |
| all_lbls = np.concatenate([np.array(l, dtype=np.int32) | |
| for l in lbls_list]) | |
| # negative prompts (wall centres) | |
| neg_pts_list = _find_thick_wall_neg_prompts(walls_mask) | |
| if neg_pts_list: | |
| neg_arr = np.array(neg_pts_list, dtype=np.float32) | |
| neg_lbls = np.zeros(len(neg_pts_list), dtype=np.int32) | |
| all_pts = np.vstack([all_pts, neg_arr]) | |
| all_lbls = np.concatenate([all_lbls, neg_lbls]) | |
| return all_pts, all_lbls | |
| def mask_to_rle(mask: np.ndarray) -> Dict: | |
| """ | |
| BOTTLENECK 10 FIX β replace pure-Python for-loop over every pixel with | |
| NumPy run-length encoding via np.diff on the flattened boolean array. | |
| """ | |
| h, w = mask.shape | |
| flat = mask.flatten(order='F').astype(bool) | |
| # np.diff detects transitions between FalseβTrue and TrueβFalse | |
| padded = np.concatenate([[False], flat, [False]]) | |
| changes = np.where(np.diff(padded.astype(np.int8)))[0] # boundary positions | |
| counts = np.diff(changes).tolist() # run lengths | |
| # RLE must start with a False count | |
| rle_counts = ([0] + counts) if flat[0] else counts | |
| return {"counts": rle_counts, "size": [h, w]} | |
| def _mask_to_contour_flat(mask): | |
| contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) | |
| if not contours: return [] | |
| largest = max(contours, key=cv2.contourArea) | |
| pts = largest[:, 0, :].tolist() | |
| return [v for pt in pts for v in pt] | |
| def _match_sam_mask_to_contour(contour, sam_room_masks): | |
| if not sam_room_masks: | |
| return _contour_to_rle_and_flat(contour) | |
| sam_h, sam_w = sam_room_masks[0]["mask"].shape | |
| contour_mask = np.zeros((sam_h, sam_w), dtype=np.uint8) | |
| cv2.drawContours(contour_mask, [contour], -1, 255, thickness=-1) | |
| best_iou = 0.0; best_entry = None | |
| for entry in sam_room_masks: | |
| m = entry["mask"] | |
| if m.shape != contour_mask.shape: continue | |
| inter = np.count_nonzero(cv2.bitwise_and(m, contour_mask)) | |
| if inter == 0: continue | |
| union = np.count_nonzero(cv2.bitwise_or(m, contour_mask)) | |
| iou = inter / (union + 1e-6) | |
| if iou > best_iou: best_iou = iou; best_entry = entry | |
| if best_entry is None or best_iou < 0.05: | |
| return _contour_to_rle_and_flat(contour) | |
| sam_contour_flat = _mask_to_contour_flat(best_entry["mask"]) | |
| if not sam_contour_flat: | |
| raw_pts = contour[:, 0, :].tolist() | |
| sam_contour_flat = [v for pt in raw_pts for v in pt] | |
| return mask_to_rle(best_entry["mask"]), sam_contour_flat, best_entry["score"] | |
| def _contour_to_rle_and_flat(contour): | |
| x, y, rw, rh = cv2.boundingRect(contour) | |
| canvas = np.zeros((rh+y+20, rw+x+20), dtype=np.uint8) | |
| cv2.drawContours(canvas, [contour], -1, 255, thickness=-1) | |
| raw_pts = contour[:, 0, :].tolist() | |
| flat_pts = [v for pt in raw_pts for v in pt] | |
| return mask_to_rle(canvas), flat_pts, 1.0 | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # BATCHED OCR (BOTTLENECK 7 FIX) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _prepare_ocr_roi(img_bgr: np.ndarray, contour: np.ndarray) -> Optional[np.ndarray]: | |
| """Prepare a single ROI for OCR (CLAHE + Otsu + medianBlur β RGB).""" | |
| x, y, rw, rh = cv2.boundingRect(contour) | |
| pad = 20 | |
| roi = img_bgr[max(0,y-pad):min(img_bgr.shape[0],y+rh+pad), | |
| max(0,x-pad):min(img_bgr.shape[1],x+rw+pad)] | |
| if roi.size == 0: return None | |
| gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) | |
| clahe = cv2.createCLAHE(2.0, (8,8)) | |
| proc = clahe.apply(gray) | |
| _, bin_img = _cuda_threshold(proc, 0, 255, cv2.THRESH_BINARY+cv2.THRESH_OTSU) | |
| rgb = cv2.cvtColor( | |
| cv2.medianBlur(bin_img.astype(np.uint8), 3), cv2.COLOR_GRAY2RGB | |
| ) | |
| return rgb | |
| def _get_ocr_reader(): | |
| """Singleton EasyOCR reader with GPU=True when CUDA available.""" | |
| if not hasattr(_get_ocr_reader, "_reader"): | |
| try: | |
| import easyocr | |
| _get_ocr_reader._reader = easyocr.Reader(["en"], gpu=_TORCH_CUDA) | |
| print(f"[OCR] EasyOCR initialised gpu={_TORCH_CUDA}") | |
| except ImportError: | |
| _get_ocr_reader._reader = None | |
| return _get_ocr_reader._reader | |
| def run_ocr_batch(img_bgr: np.ndarray, | |
| contours: List[np.ndarray]) -> List[Optional[str]]: | |
| """ | |
| BOTTLENECK 7 FIX β batch all room crops into a single EasyOCR call. | |
| readtext_batched() pushes all crops through the GPU text recognition | |
| network in one forward pass instead of one-at-a-time. | |
| Falls back to sequential readtext() if readtext_batched unavailable. | |
| """ | |
| reader = _get_ocr_reader() | |
| if reader is None: | |
| return [None] * len(contours) | |
| rois: List[Optional[np.ndarray]] = [_prepare_ocr_roi(img_bgr, c) for c in contours] | |
| labels: List[Optional[str]] = [None] * len(contours) | |
| valid_idx = [i for i, r in enumerate(rois) if r is not None] | |
| valid_rois = [rois[i] for i in valid_idx] | |
| if not valid_rois: | |
| return labels | |
| try: | |
| # ββ preferred: GPU batched inference βββββββββββββββββββββββββββββ | |
| batch_results = reader.readtext_batched(valid_rois, detail=1, | |
| paragraph=False, | |
| batch_size=len(valid_rois)) | |
| for out_i, orig_i in enumerate(valid_idx): | |
| cands = [ | |
| (t.strip().upper(), c) | |
| for _, t, c in batch_results[out_i] | |
| if c >= OCR_CONF_THR and len(t.strip()) >= 2 | |
| and any(ch.isalpha() for ch in t) | |
| ] | |
| labels[orig_i] = max(cands, key=lambda x: x[1])[0] if cands else None | |
| except (AttributeError, Exception): | |
| # ββ fallback: sequential (original behaviour) βββββββββββββββββββββ | |
| for out_i, orig_i in enumerate(valid_idx): | |
| try: | |
| results = reader.readtext(valid_rois[out_i], detail=1, paragraph=False) | |
| cands = [ | |
| (t.strip().upper(), c) | |
| for _, t, c in results | |
| if c >= OCR_CONF_THR and len(t.strip()) >= 2 | |
| and any(ch.isalpha() for ch in t) | |
| ] | |
| labels[orig_i] = max(cands, key=lambda x: x[1])[0] if cands else None | |
| except Exception: | |
| pass | |
| return labels | |
| def run_ocr_on_room(img_bgr: np.ndarray, contour: np.ndarray) -> Optional[str]: | |
| """Single-room OCR wrapper (kept for compatibility).""" | |
| results = run_ocr_batch(img_bgr, [contour]) | |
| return results[0] | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # FILTER ROOM REGIONS (BOTTLENECK 5 FIX β vectorised NumPy filtering) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def filter_room_regions(rooms_mask, img_shape): | |
| """ | |
| BOTTLENECK 5 FIX β all scalar filters (area, dim, aspect, border, extent) | |
| computed as vectorised NumPy boolean masks before entering any Python loop. | |
| The solidity / drawContours step is the only remaining per-contour work. | |
| """ | |
| h, w = img_shape[:2] | |
| img_area = float(h * w) | |
| min_area = img_area * MIN_ROOM_AREA_FRAC | |
| max_area = img_area * MAX_ROOM_AREA_FRAC | |
| min_dim = w * MIN_ROOM_DIM_FRAC | |
| margin = max(5.0, w * BORDER_MARGIN_FRAC) | |
| contours, _ = cv2.findContours(rooms_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE) | |
| if not contours: return np.zeros_like(rooms_mask), [] | |
| # ββ vectorised stats ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| bboxes = np.array([cv2.boundingRect(c) for c in contours], dtype=np.float32) | |
| areas = np.array([cv2.contourArea(c) for c in contours], dtype=np.float32) | |
| bx = bboxes[:,0]; by = bboxes[:,1] | |
| bw_arr = bboxes[:,2]; bh_arr = bboxes[:,3] | |
| area_ok = (areas >= min_area) & (areas <= max_area) | |
| border_ok = (bx >= margin) & (by >= margin) & \ | |
| (bx + bw_arr <= w - margin) & (by + bh_arr <= h - margin) | |
| dim_ok = (bw_arr >= min_dim) | (bh_arr >= min_dim) | |
| aspect = np.maximum(bw_arr, bh_arr) / (np.minimum(bw_arr, bh_arr) + 1e-6) | |
| aspect_ok = aspect <= MAX_ASPECT_RATIO | |
| extent_ok = (areas / (bw_arr * bh_arr + 1e-6)) >= MIN_EXTENT | |
| # All scalar checks in one shot β only compute solidity for survivors | |
| cheap_pass = np.where(area_ok & border_ok & dim_ok & aspect_ok & extent_ok)[0] | |
| valid_mask = np.zeros_like(rooms_mask) | |
| valid_rooms = [] | |
| for i in cheap_pass: | |
| cnt = contours[i] | |
| hull = cv2.convexHull(cnt) | |
| ha = cv2.contourArea(hull) | |
| if ha > 0 and (areas[i] / ha) >= MIN_SOLIDITY: | |
| cv2.drawContours(valid_mask, [cnt], -1, 255, -1) | |
| valid_rooms.append(cnt) | |
| return valid_mask, valid_rooms | |
| def pixel_area_to_m2(area_px): | |
| return area_px * (2.54 / DPI) ** 2 * (SCALE_FACTOR ** 2) / 10000 | |
| def validate_label(label): | |
| if not label: return False | |
| label = label.strip() | |
| if not label[0].isalpha(): return False | |
| lc = sum(1 for c in label if c.isalpha()) | |
| return lc == 1 or lc >= 3 | |
| def measure_and_label_rooms(img, valid_rooms, sam_room_masks): | |
| """ | |
| BOTTLENECK 7 FIX β all OCR crops sent to run_ocr_batch() in one call | |
| instead of sequential run_ocr_on_room() per room. | |
| """ | |
| if not valid_rooms: | |
| return [] | |
| # ββ batch OCR βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| ocr_labels = run_ocr_batch(img, valid_rooms) | |
| room_data = [] | |
| for idx, (contour, label) in enumerate(zip(valid_rooms, ocr_labels), 1): | |
| if not label or not validate_label(label): | |
| label = f"ROOM {idx}" | |
| x, y, rw, rh = cv2.boundingRect(contour) | |
| area_px = cv2.contourArea(contour) | |
| M = cv2.moments(contour) | |
| cx = int(M["m10"] / M["m00"]) if M["m00"] else x + rw // 2 | |
| cy = int(M["m01"] / M["m00"]) if M["m00"] else y + rh // 2 | |
| _, raw_seg_flat, sam_score = _match_sam_mask_to_contour(contour, sam_room_masks) | |
| room_data.append({ | |
| "id": len(room_data)+1, "label": label, "contour": contour, | |
| "segmentation": [raw_seg_flat], "raw_segmentation": [raw_seg_flat], | |
| "sam_score": round(sam_score,4), "score": round(sam_score,4), | |
| "area": area_px, "area_px": area_px, | |
| "area_m2": round(pixel_area_to_m2(area_px),2), | |
| "bbox": [x,y,rw,rh], "centroid": [cx,cy], | |
| "confidence": 0.95, "isAi": True, | |
| }) | |
| return room_data | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SAM β BATCHED INFERENCE with set_image inside autocast (BOTTLENECK 9 FIX) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def segment_with_sam(img_rgb, walls, sam_ckpt, rooms_flood=None): | |
| """ | |
| BOTTLENECK 9 FIX: predictor.set_image() moved INSIDE torch.no_grad() + | |
| autocast so the ViT image encoder runs in FP16 (was FP32 in v1). | |
| All other GPU optimisations from v1 retained. | |
| """ | |
| if rooms_flood is None: | |
| rooms_flood = segment_rooms_flood(walls.copy()) | |
| sam_room_masks: List[Dict] = [] | |
| try: | |
| import torch | |
| from segment_anything import sam_model_registry, SamPredictor | |
| if not Path(sam_ckpt).exists(): | |
| print(" [SAM] Model not found β using flood-fill") | |
| return rooms_flood, [] | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| print(f" [SAM] Loading vit_h on {device} (encoder FP16 autocast enabled)") | |
| sam = sam_model_registry["vit_h"](checkpoint=sam_ckpt) | |
| sam.to(device); sam.eval() | |
| predictor = SamPredictor(sam) | |
| except Exception as e: | |
| print(f" [SAM] Load failed ({e}) β using flood-fill") | |
| return rooms_flood, [] | |
| all_points, all_labels = generate_prompts(walls, rooms_flood) | |
| if len(all_points) == 0: | |
| return rooms_flood, [] | |
| pos_pts = [(p, l) for p, l in zip(all_points, all_labels) if l == 1] | |
| neg_pts = [p for p, l in zip(all_points, all_labels) if l == 0] | |
| print(f" [SAM] {len(pos_pts)} room prompts + {len(neg_pts)} wall-neg prompts") | |
| autocast_ctx = ( | |
| torch.autocast("cuda", dtype=torch.float16) | |
| if _TORCH_CUDA else | |
| torch.autocast("cpu", dtype=torch.bfloat16) | |
| ) | |
| # ββ BOTTLENECK 9 FIX: encoder runs in FP16 autocast ββββββββββββββββββ | |
| with torch.no_grad(), autocast_ctx: | |
| predictor.set_image(img_rgb) # β moved inside autocast | |
| h, w = walls.shape | |
| sam_mask = np.zeros((h, w), dtype=np.uint8) | |
| accepted = 0 | |
| neg_coords = np.array(neg_pts, dtype=np.float32) if neg_pts else None | |
| neg_lbls = np.zeros(len(neg_pts), dtype=np.int32) if neg_pts else None | |
| denoise_k = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5)) | |
| with torch.no_grad(), autocast_ctx: | |
| for (px, py), lbl in pos_pts: | |
| px, py = int(px), int(py) | |
| if neg_coords is not None: | |
| pt_c = np.vstack([[[px, py]], neg_coords]) | |
| pt_l = np.concatenate([[lbl], neg_lbls]) | |
| else: | |
| pt_c = np.array([[px, py]], dtype=np.float32) | |
| pt_l = np.array([lbl], dtype=np.int32) | |
| try: | |
| masks, scores, _ = predictor.predict( | |
| point_coords=pt_c, point_labels=pt_l, multimask_output=True | |
| ) | |
| except Exception as e: | |
| print(f" [SAM] predict failed ({e})") | |
| continue | |
| best_idx = int(np.argmax(scores)) | |
| best_score = float(scores[best_idx]) | |
| if best_score < SAM_MIN_SCORE: | |
| continue | |
| best_mask = (masks[best_idx] > 0).astype(np.uint8) * 255 | |
| best_mask = cv2.bitwise_and(best_mask, rooms_flood) | |
| best_mask = _cuda_morphology(best_mask, cv2.MORPH_OPEN, denoise_k, iterations=1) | |
| if not np.any(best_mask): | |
| continue | |
| sam_room_masks.append({ | |
| "mask" : best_mask.copy(), | |
| "score" : best_score, | |
| "prompt": (px, py), | |
| }) | |
| sam_mask = cv2.bitwise_or(sam_mask, best_mask) | |
| accepted += 1 | |
| if _TORCH_CUDA: | |
| torch.cuda.empty_cache() | |
| print(f" [SAM] VRAM freed. Accepted {accepted}/{len(pos_pts)} masks") | |
| else: | |
| print(f" [SAM] Accepted {accepted}/{len(pos_pts)} masks") | |
| if accepted == 0: | |
| return rooms_flood, [] | |
| return sam_mask, sam_room_masks | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # BUILD ANNOTATED IMAGE (BOTTLENECK 11 FIX) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def build_annotated_image(img_bgr, rooms, selected_ids=None): | |
| """ | |
| BOTTLENECK 11 FIX β accumulate ALL room fills into a single overlay | |
| array, then call cv2.addWeighted ONCE instead of per-room. | |
| Border drawing and text labels remain per-room (unavoidable). | |
| """ | |
| vis = img_bgr.copy() | |
| overlay = img_bgr.copy() | |
| # ββ single-pass fill accumulation βββββββββββββββββββββββββββββββββββββ | |
| for i, room in enumerate(rooms): | |
| cnt = room.get("contour") | |
| if cnt is None: continue | |
| color = ROOM_COLORS[i % len(ROOM_COLORS)] | |
| bgr = (color[2], color[1], color[0]) | |
| cv2.drawContours(overlay, [cnt], -1, bgr, -1) | |
| # single blend for ALL fills | |
| vis = cv2.addWeighted(overlay, 0.35, vis, 0.65, 0) | |
| # ββ per-room: border + text βββββββββββββββββββββββββββββββββββββββββββ | |
| for i, room in enumerate(rooms): | |
| cnt = room.get("contour") | |
| if cnt is None: continue | |
| color = ROOM_COLORS[i % len(ROOM_COLORS)] | |
| bgr = (color[2], color[1], color[0]) | |
| is_sel = selected_ids and room["id"] in selected_ids | |
| cv2.drawContours(vis, [cnt], -1, (0,255,255) if is_sel else bgr, | |
| 4 if is_sel else 2) | |
| M = cv2.moments(cnt) | |
| cx = int(M["m10"]/M["m00"]) if M["m00"] else 0 | |
| cy = int(M["m01"]/M["m00"]) if M["m00"] else 0 | |
| label = room.get("label", f"Room {room['id']}") | |
| area = room.get("area_m2", 0.0) | |
| fs = 0.55; th = 1 | |
| (tw1, th1), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, fs, th) | |
| (tw2, th2), _ = cv2.getTextSize(f"{area:.1f} mΒ²", cv2.FONT_HERSHEY_SIMPLEX, fs-0.1, th) | |
| bx2 = cx - max(tw1,tw2)//2 - 4; by2 = cy - th1 - th2 - 12 | |
| bw2 = max(tw1,tw2)+8; bh2 = th1+th2+16 | |
| sub = vis[max(0,by2):max(0,by2)+bh2, max(0,bx2):max(0,bx2)+bw2] | |
| if sub.size > 0: | |
| vis[max(0,by2):max(0,by2)+bh2, max(0,bx2):max(0,bx2)+bw2] = \ | |
| cv2.addWeighted(sub, 0.3, np.ones_like(sub)*255, 0.7, 0) | |
| cv2.putText(vis, label, (cx-tw1//2, cy-th2-6), | |
| cv2.FONT_HERSHEY_SIMPLEX, fs, (20,20,20), th+1, cv2.LINE_AA) | |
| cv2.putText(vis, f"{area:.1f} mΒ²", (cx-tw2//2, cy+th2+2), | |
| cv2.FONT_HERSHEY_SIMPLEX, fs-0.1, (20,20,20), th, cv2.LINE_AA) | |
| return vis | |
| def export_to_excel(rooms): | |
| wb = openpyxl.Workbook(); ws = wb.active; ws.title = "Room Analysis" | |
| headers = ["ID","Label","Area (px)","Area (mΒ²)","Centroid X","Centroid Y", | |
| "Bbox X","Bbox Y","Bbox W","Bbox H","SAM Score","Confidence"] | |
| hf = PatternFill("solid", fgColor="1F4E79"); hfont = Font(bold=True, color="FFFFFF", size=11) | |
| for col, h in enumerate(headers,1): | |
| cell=ws.cell(row=1,column=col,value=h) | |
| cell.fill=hf; cell.font=hfont; cell.alignment=Alignment(horizontal="center") | |
| alt = PatternFill("solid", fgColor="D6E4F0") | |
| for rn, room in enumerate(rooms, 2): | |
| cnt = room.get("contour") | |
| M = cv2.moments(cnt) if cnt is not None else {} | |
| cx = int(M["m10"]/M["m00"]) if M.get("m00") else 0 | |
| cy = int(M["m01"]/M["m00"]) if M.get("m00") else 0 | |
| bbox = cv2.boundingRect(cnt) if cnt is not None else (0,0,0,0) | |
| row_data=[room.get("id"), room.get("label","?"), | |
| round(room.get("area_px",0),1), round(room.get("area_m2",0.0),2), | |
| cx, cy, bbox[0], bbox[1], bbox[2], bbox[3], | |
| round(room.get("score",1.0),4), round(room.get("confidence",0.95),2)] | |
| fill = alt if rn%2==0 else None | |
| for col,val in enumerate(row_data,1): | |
| cell=ws.cell(row=rn,column=col,value=val) | |
| cell.alignment=Alignment(horizontal="center") | |
| if fill: cell.fill=fill | |
| for col in ws.columns: | |
| mx=max(len(str(c.value or "")) for c in col)+4 | |
| ws.column_dimensions[col[0].column_letter].width=min(mx,25) | |
| out = Path(tempfile.gettempdir()) / f"floorplan_rooms_{int(time.time())}.xlsx" | |
| wb.save(str(out)); return str(out) | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # STATE | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def init_state(): | |
| return {"img_orig":None,"img_cropped":None,"img_clean":None, | |
| "walls":None,"walls_base":None,"wall_cal":None, | |
| "user_lines":[],"draw_start":None,"walls_thickness":8, | |
| "rooms":[],"selected_ids":[],"annotated":None,"status":"Idle"} | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # GRADIO CALLBACKS | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def cb_load_image(upload, state): | |
| if upload is None: | |
| return None, state, "Upload a floor-plan image to begin." | |
| try: | |
| if hasattr(upload,"name"): file_path=upload.name | |
| elif isinstance(upload,dict) and "name" in upload: file_path=upload["name"] | |
| elif isinstance(upload,str): file_path=upload | |
| else: | |
| img_bgr=cv2.imdecode(np.frombuffer(bytes(upload),dtype=np.uint8),cv2.IMREAD_COLOR) | |
| file_path=None | |
| if file_path is not None: img_bgr=cv2.imread(file_path) | |
| except Exception as e: | |
| return None, state, f"β Error reading upload: {e}" | |
| if img_bgr is None: return None, state, "β Could not decode image." | |
| state=init_state(); state["img_orig"]=img_bgr; state["status"]="Image loaded." | |
| return cv2.cvtColor(img_bgr,cv2.COLOR_BGR2RGB), state, f"β Loaded {img_bgr.shape[1]}Γ{img_bgr.shape[0]} px" | |
| def cb_preprocess(state): | |
| img=state.get("img_orig") | |
| if img is None: return None,None,state,"Load an image first." | |
| cropped = remove_title_block(img) | |
| img_clean = remove_colors(cropped) | |
| img_clean = detect_and_close_door_arcs(img_clean) | |
| img_stats = analyze_image_characteristics(cropped) | |
| walls, thick = extract_walls_adaptive(img_clean, img_stats) | |
| walls = remove_fixture_symbols(walls) | |
| walls, cal = reconstruct_walls(walls) | |
| walls = remove_dangling_lines(walls, cal) | |
| walls = close_large_door_gaps(walls, cal) | |
| state["img_cropped"]=cropped; state["img_clean"]=img_clean | |
| state["walls"]=walls.copy(); state["walls_base"]=walls.copy() | |
| state["walls_thickness"]=thick; state["wall_cal"]=cal | |
| walls_rgb = cv2.cvtColor(walls,cv2.COLOR_GRAY2RGB) | |
| clean_rgb = cv2.cvtColor(img_clean,cv2.COLOR_BGR2RGB) | |
| msg=(f"β Pipeline done | strokeβ{cal.stroke_width}px bodyβ{thick}px " | |
| f"bridge=[{cal.bridge_min_gap},{cal.bridge_max_gap}] door={cal.door_gap}px " | |
| f"| GPU: torch={_TORCH_CUDA} cupy={_CUPY} cv2_cuda={_CV2_CUDA}") | |
| return clean_rgb, walls_rgb, state, msg | |
| def cb_add_door_line(evt: gr.SelectData, state): | |
| walls=state.get("walls") | |
| if walls is None: return None,state,"Run preprocessing first." | |
| x,y=int(evt.index[0]),int(evt.index[1]) | |
| if state["draw_start"] is None: | |
| state["draw_start"]=(x,y); msg=f"π Start ({x},{y}). Click end." | |
| else: | |
| x1,y1=state["draw_start"]; state["user_lines"].append((x1,y1,x,y)) | |
| state["draw_start"]=None | |
| walls_upd=apply_user_lines_to_walls(state["walls"],state["user_lines"],state["walls_thickness"]) | |
| state["walls"]=walls_upd | |
| vis=cv2.cvtColor(walls_upd,cv2.COLOR_GRAY2RGB) | |
| for lx1,ly1,lx2,ly2 in state["user_lines"]: cv2.line(vis,(lx1,ly1),(lx2,ly2),(255,80,80),3) | |
| return vis,state,f"β Line drawn ({x1},{y1})β({x},{y}) Total:{len(state['user_lines'])}" | |
| vis=cv2.cvtColor(walls,cv2.COLOR_GRAY2RGB) | |
| for lx1,ly1,lx2,ly2 in state["user_lines"]: cv2.line(vis,(lx1,ly1),(lx2,ly2),(255,80,80),3) | |
| if state["draw_start"]: cv2.circle(vis,state["draw_start"],6,(0,200,255),-1) | |
| return vis,state,msg | |
| def cb_undo_door_line(state): | |
| if not state["user_lines"]: return None,state,"No lines to undo." | |
| state["user_lines"].pop(); state["draw_start"]=None | |
| walls_base=state.get("walls_base") | |
| if walls_base is None: return None,state,"Re-run preprocessing." | |
| thick=state.get("walls_thickness",8) | |
| walls_upd=apply_user_lines_to_walls(walls_base,state["user_lines"],thick) | |
| state["walls"]=walls_upd | |
| vis=cv2.cvtColor(walls_upd,cv2.COLOR_GRAY2RGB) | |
| for lx1,ly1,lx2,ly2 in state["user_lines"]: cv2.line(vis,(lx1,ly1),(lx2,ly2),(255,80,80),3) | |
| return vis,state,f"β© Removed. Remaining:{len(state['user_lines'])}" | |
| def cb_run_sam(state): | |
| walls=state.get("walls"); img=state.get("img_cropped"); img_clean=state.get("img_clean") | |
| if walls is None or img is None: return None,None,state,"Run preprocessing first." | |
| img_rgb=cv2.cvtColor(img,cv2.COLOR_BGR2RGB) | |
| ckpt=download_sam_if_needed() | |
| sam_enabled=ckpt is not None and Path(ckpt).exists() | |
| if sam_enabled: | |
| rooms_mask,sam_room_masks=segment_with_sam(img_rgb,walls.copy(),ckpt) | |
| else: | |
| rooms_mask=segment_rooms_flood(walls.copy()); sam_room_masks=[] | |
| state["_sam_room_masks"]=sam_room_masks | |
| if not np.count_nonzero(rooms_mask): | |
| return None,None,state,"β rooms_mask empty." | |
| valid_mask,valid_rooms=filter_room_regions(rooms_mask,img.shape) | |
| if not valid_rooms: return None,None,state,"β No valid rooms." | |
| src=img_clean if img_clean is not None else img | |
| rooms=measure_and_label_rooms(src,valid_rooms,sam_room_masks) | |
| if not rooms: return None,None,state,"β No rooms after OCR." | |
| state["rooms"]=rooms; state["selected_ids"]=[] | |
| annotated=build_annotated_image(img,rooms); state["annotated"]=annotated | |
| table=[[r["id"],r["label"],f"{r['area_m2']} mΒ²",f"{r['score']:.2f}"] for r in rooms] | |
| return cv2.cvtColor(annotated,cv2.COLOR_BGR2RGB),table,state,f"β {len(rooms)} rooms detected." | |
| def cb_click_room(evt: gr.SelectData, state): | |
| annotated=state.get("annotated"); rooms=state.get("rooms",[]); img=state.get("img_cropped") | |
| if annotated is None or not rooms: return None,state,"Run SAM first." | |
| x,y=int(evt.index[0]),int(evt.index[1]); clicked_id=None | |
| for room in rooms: | |
| cnt=room.get("contour") | |
| if cnt is None: continue | |
| if cv2.pointPolygonTest(cnt,(float(x),float(y)),False)>=0: | |
| clicked_id=room["id"]; break | |
| if clicked_id is None: | |
| state["selected_ids"]=[]; msg="Clicked outside β selection cleared." | |
| else: | |
| sel=state["selected_ids"] | |
| if clicked_id in sel: sel.remove(clicked_id); msg=f"Room {clicked_id} deselected." | |
| else: sel.append(clicked_id); msg=f"Room {clicked_id} selected." | |
| state["selected_ids"]=sel | |
| new_ann=build_annotated_image(img,rooms,state["selected_ids"]); state["annotated"]=new_ann | |
| return cv2.cvtColor(new_ann,cv2.COLOR_BGR2RGB),state,msg | |
| def cb_remove_selected(state): | |
| sel=state.get("selected_ids",[]); rooms=state.get("rooms",[]); img=state.get("img_cropped") | |
| if not sel: return None,None,state,"No rooms selected." | |
| removed=[r["label"] for r in rooms if r["id"] in sel] | |
| rooms=[r for r in rooms if r["id"] not in sel] | |
| for i,r in enumerate(rooms,1): r["id"]=i | |
| state["rooms"]=rooms; state["selected_ids"]=[] | |
| ann=build_annotated_image(img,rooms); state["annotated"]=ann | |
| table=[[r["id"],r["label"],f"{r['area_m2']} mΒ²",f"{r['score']:.2f}"] for r in rooms] | |
| return cv2.cvtColor(ann,cv2.COLOR_BGR2RGB),table,state,f"π Removed:{', '.join(removed)}" | |
| def cb_rename_selected(new_label, state): | |
| sel=state.get("selected_ids",[]); rooms=state.get("rooms",[]); img=state.get("img_cropped") | |
| if not sel: return None,None,state,"Select a room first." | |
| if not new_label.strip(): return None,None,state,"Enter a non-empty label." | |
| for r in rooms: | |
| if r["id"] in sel: r["label"]=new_label.strip().upper() | |
| state["rooms"]=rooms | |
| ann=build_annotated_image(img,rooms,sel); state["annotated"]=ann | |
| table=[[r["id"],r["label"],f"{r['area_m2']} mΒ²",f"{r['score']:.2f}"] for r in rooms] | |
| return cv2.cvtColor(ann,cv2.COLOR_BGR2RGB),table,state,f"β Renamed to '{new_label.strip().upper()}'" | |
| def cb_export_excel(state): | |
| rooms=state.get("rooms",[]) | |
| if not rooms: return None,"No rooms to export." | |
| path=export_to_excel(rooms) | |
| return path,f"β Exported {len(rooms)} rooms β {Path(path).name}" | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # GRADIO UI | |
| # ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| CSS = """ | |
| #title{text-align:center;font-size:1.8em;font-weight:700;color:#1F4E79} | |
| #subtitle{text-align:center;color:#555;margin-top:-8px;margin-bottom:16px} | |
| .step-card{border-left:4px solid #1F4E79!important;padding-left:10px!important} | |
| """ | |
| def _walls_to_rgb(s): | |
| w=s.get("walls") | |
| return None if w is None else cv2.cvtColor(w,cv2.COLOR_GRAY2RGB) | |
| with gr.Blocks(title="FloorPlan Analyser (GPU v2)") as app: | |
| state=gr.State(init_state()) | |
| gr.Markdown("# π’ Floor Plan Room Analyser β NVIDIA GPU Build v2", elem_id="title") | |
| gr.Markdown( | |
| f"EasyOCR gpu={'β ' if _TORCH_CUDA else 'β'} | " | |
| f"SAM encoder FP16={'β ' if _TORCH_CUDA else 'β'} | " | |
| f"CuPy={'β ' if _CUPY else 'β'} | " | |
| f"cucim={'β ' if _CUCIM else 'β'} | " | |
| f"cv2.cuda={'β ' if _CV2_CUDA else 'β'}", | |
| elem_id="subtitle", | |
| ) | |
| status_box=gr.Textbox(label="Status",interactive=False,value="Idle.") | |
| with gr.Row(): | |
| with gr.Column(scale=1,elem_classes="step-card"): | |
| gr.Markdown("### 1οΈβ£ Upload Floor Plan") | |
| upload_btn=gr.UploadButton("π Upload Image",file_types=["image"],size="sm") | |
| raw_preview=gr.Image(label="Loaded Image",height=320) | |
| with gr.Column(scale=1,elem_classes="step-card"): | |
| gr.Markdown("### 2οΈβ£ Pre-process") | |
| preprocess_btn=gr.Button("β Run Preprocessing",variant="primary") | |
| with gr.Tabs(): | |
| with gr.Tab("Clean Image"): clean_img=gr.Image(label="After color removal",height=300) | |
| with gr.Tab("Walls"): walls_img=gr.Image(label="Extracted walls",height=300) | |
| with gr.Row(): | |
| with gr.Column(elem_classes="step-card"): | |
| gr.Markdown("### 3οΈβ£ Draw Door-Closing Lines") | |
| undo_line_btn=gr.Button("β© Undo Last Line",size="sm") | |
| wall_draw_img=gr.Image(label="Wall mask",height=380,interactive=False) | |
| with gr.Row(): | |
| with gr.Column(scale=2,elem_classes="step-card"): | |
| gr.Markdown("### 4οΈβ£ SAM Segmentation + OCR") | |
| sam_btn=gr.Button("π€ Run SAM + OCR",variant="primary") | |
| ann_img=gr.Image(label="Annotated rooms",height=480,interactive=False) | |
| with gr.Column(scale=1,elem_classes="step-card"): | |
| gr.Markdown("### 5οΈβ£ Room Table & Actions") | |
| room_table=gr.Dataframe(headers=["ID","Label","Area","SAM Score"], | |
| datatype=["number","str","str","str"], | |
| interactive=False,label="Detected Rooms") | |
| with gr.Group(): | |
| rename_txt=gr.Textbox(placeholder="New labelβ¦",label="Rename Label") | |
| with gr.Row(): | |
| rename_btn=gr.Button("β Rename",size="sm") | |
| remove_btn=gr.Button("π Remove Selected",size="sm",variant="stop") | |
| gr.Markdown("---") | |
| export_btn=gr.Button("π Export to Excel",variant="secondary") | |
| excel_file=gr.File(label="Download Excel",visible=True) | |
| upload_btn.upload(cb_load_image,[upload_btn,state],[raw_preview,state,status_box]) | |
| preprocess_btn.click(cb_preprocess,[state],[clean_img,walls_img,state,status_box])\ | |
| .then(_walls_to_rgb,[state],[wall_draw_img]) | |
| wall_draw_img.select(cb_add_door_line,[state],[wall_draw_img,state,status_box]) | |
| undo_line_btn.click(cb_undo_door_line,[state],[wall_draw_img,state,status_box]) | |
| sam_btn.click(cb_run_sam,[state],[ann_img,room_table,state,status_box]) | |
| ann_img.select(cb_click_room,[state],[ann_img,state,status_box]) | |
| remove_btn.click(cb_remove_selected,[state],[ann_img,room_table,state,status_box]) | |
| rename_btn.click(cb_rename_selected,[rename_txt,state],[ann_img,room_table,state,status_box]) | |
| export_btn.click(cb_export_excel,[state],[excel_file,status_box]) | |
| if __name__ == "__main__": | |
| app.launch(share=False, debug=True, css=CSS) |