Update utils/cv_processing.py
Browse files- utils/cv_processing.py +106 -46
utils/cv_processing.py
CHANGED
|
@@ -101,6 +101,35 @@ class BackgroundReplacementError(Exception):
|
|
| 101 |
"""Custom exception for background replacement failures"""
|
| 102 |
pass
|
| 103 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 104 |
# ============================================================================
|
| 105 |
# MAIN SEGMENTATION FUNCTIONS
|
| 106 |
# ============================================================================
|
|
@@ -266,6 +295,7 @@ def segment_person_hq_original(image: np.ndarray, predictor: Any, fallback_enabl
|
|
| 266 |
return _fallback_segmentation(image)
|
| 267 |
else:
|
| 268 |
raise SegmentationError(f"Unexpected error: {e}")
|
|
|
|
| 269 |
# ============================================================================
|
| 270 |
# MASK REFINEMENT FUNCTIONS
|
| 271 |
# ============================================================================
|
|
@@ -379,11 +409,8 @@ def _matanyone_refine(image: np.ndarray, mask: np.ndarray, matanyone_processor:
|
|
| 379 |
objects = [1] # single object id
|
| 380 |
with torch.no_grad():
|
| 381 |
output_prob = matanyone_processor.step(img_tensor, mask_tensor, objects=objects)
|
| 382 |
-
# MatAnyOne returns output_prob as tensor
|
| 383 |
-
|
| 384 |
refined_mask_tensor = matanyone_processor.output_prob_to_mask(output_prob)
|
| 385 |
|
| 386 |
-
# Convert to numpy and to uint8
|
| 387 |
refined_mask = refined_mask_tensor.squeeze().detach().cpu().numpy()
|
| 388 |
if refined_mask.max() <= 1.0:
|
| 389 |
refined_mask = (refined_mask * 255).astype(np.uint8)
|
|
@@ -442,23 +469,56 @@ def replace_background_hq(frame: np.ndarray, mask: np.ndarray, background: np.nd
|
|
| 442 |
else:
|
| 443 |
raise BackgroundReplacementError(f"Unexpected error: {e}")
|
| 444 |
|
| 445 |
-
def create_professional_background(bg_config: Dict[str, Any], width: int, height: int) -> np.ndarray:
|
| 446 |
-
"""
|
| 447 |
-
|
| 448 |
-
|
| 449 |
-
|
| 450 |
-
|
| 451 |
-
|
| 452 |
-
|
| 453 |
-
|
| 454 |
-
|
| 455 |
-
|
| 456 |
-
|
| 457 |
-
|
| 458 |
-
|
| 459 |
-
|
| 460 |
-
|
| 461 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 462 |
# ============================================================================
|
| 463 |
# VALIDATION FUNCTION
|
| 464 |
# ============================================================================
|
|
@@ -730,6 +790,7 @@ def _auto_refine_mask_iteratively(image: np.ndarray, initial_mask: np.ndarray,
|
|
| 730 |
except Exception as e:
|
| 731 |
logger.warning(f"Iterative refinement failed: {e}")
|
| 732 |
return initial_mask
|
|
|
|
| 733 |
def _assess_mask_quality(mask: np.ndarray, image: np.ndarray) -> float:
|
| 734 |
"""Assess mask quality automatically"""
|
| 735 |
try:
|
|
@@ -863,35 +924,35 @@ def _process_mask(mask: np.ndarray) -> np.ndarray:
|
|
| 863 |
return fallback
|
| 864 |
|
| 865 |
def _validate_mask_quality(mask: np.ndarray, image_shape: Tuple[int, int]) -> bool:
|
| 866 |
-
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
| 867 |
try:
|
| 868 |
h, w = image_shape
|
| 869 |
-
|
| 870 |
-
|
| 871 |
-
|
| 872 |
area_ratio = mask_area / total_area
|
| 873 |
-
|
| 874 |
-
|
| 875 |
-
|
| 876 |
-
|
| 877 |
-
mask_binary = mask > 127
|
| 878 |
-
mask_center_y, mask_center_x = np.where(mask_binary)
|
| 879 |
-
|
| 880 |
-
if len(mask_center_y) == 0:
|
| 881 |
-
logger.warning("Empty mask")
|
| 882 |
return False
|
| 883 |
-
|
| 884 |
-
|
| 885 |
-
|
| 886 |
-
|
| 887 |
-
if center_y < h * 0.2 or center_y > h * 0.9:
|
| 888 |
-
logger.warning(f"Mask center too far from expected person location: y={center_y/h:.2f}")
|
| 889 |
return False
|
| 890 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
| 891 |
return True
|
| 892 |
-
|
| 893 |
except Exception as e:
|
| 894 |
-
logger.warning(f"Mask validation error: {e}")
|
| 895 |
return True
|
| 896 |
|
| 897 |
def _fallback_segmentation(image: np.ndarray) -> np.ndarray:
|
|
@@ -1064,6 +1125,7 @@ def _simple_compositing(frame: np.ndarray, mask: np.ndarray, background: np.ndar
|
|
| 1064 |
except Exception as e:
|
| 1065 |
logger.error(f"Simple compositing failed: {e}")
|
| 1066 |
return frame
|
|
|
|
| 1067 |
# ============================================================================
|
| 1068 |
# HELPER FUNCTIONS - BACKGROUND CREATION
|
| 1069 |
# ============================================================================
|
|
@@ -1121,12 +1183,10 @@ def _create_vertical_gradient(colors: list, width: int, height: int) -> np.ndarr
|
|
| 1121 |
def _create_horizontal_gradient(colors: list, width: int, height: int) -> np.ndarray:
|
| 1122 |
"""Create horizontal gradient using NumPy for performance"""
|
| 1123 |
gradient = np.zeros((height, width, 3), dtype=np.uint8)
|
| 1124 |
-
|
| 1125 |
for x in range(width):
|
| 1126 |
progress = x / width if width > 0 else 0
|
| 1127 |
color = _interpolate_color(colors, progress)
|
| 1128 |
gradient[:, x] = color
|
| 1129 |
-
|
| 1130 |
return gradient
|
| 1131 |
|
| 1132 |
def _create_diagonal_gradient(colors: list, width: int, height: int) -> np.ndarray:
|
|
@@ -1148,7 +1208,7 @@ def _create_radial_gradient(colors: list, width: int, height: int, soft: bool =
|
|
| 1148 |
max_distance = np.sqrt(center_x**2 + center_y**2)
|
| 1149 |
|
| 1150 |
y_coords, x_coords = np.mgrid[0:height, 0:width]
|
| 1151 |
-
distances = np.sqrt((
|
| 1152 |
progress = distances / max_distance
|
| 1153 |
progress = np.clip(progress, 0, 1)
|
| 1154 |
|
|
@@ -1217,4 +1277,4 @@ def _apply_background_adjustments(background: np.ndarray, bg_config: Dict[str, A
|
|
| 1217 |
|
| 1218 |
except Exception as e:
|
| 1219 |
logger.warning(f"Background adjustment failed: {e}")
|
| 1220 |
-
return background
|
|
|
|
| 101 |
"""Custom exception for background replacement failures"""
|
| 102 |
pass
|
| 103 |
|
| 104 |
+
# ============================================================================
|
| 105 |
+
# BACKGROUND HELPERS (NEW)
|
| 106 |
+
# ============================================================================
|
| 107 |
+
|
| 108 |
+
def _fit_image_letterbox(img_rgb: np.ndarray, dst_w: int, dst_h: int, fill=(32, 32, 32)) -> np.ndarray:
|
| 109 |
+
"""
|
| 110 |
+
Fit an RGB image into (dst_h, dst_w) with letterboxing (no stretch), borders filled with `fill`.
|
| 111 |
+
"""
|
| 112 |
+
h, w = img_rgb.shape[:2]
|
| 113 |
+
if h == 0 or w == 0:
|
| 114 |
+
return np.full((dst_h, dst_w, 3), fill, dtype=np.uint8)
|
| 115 |
+
|
| 116 |
+
src_aspect = w / max(1, h)
|
| 117 |
+
dst_aspect = dst_w / max(1, dst_h)
|
| 118 |
+
|
| 119 |
+
if src_aspect > dst_aspect:
|
| 120 |
+
new_w = dst_w
|
| 121 |
+
new_h = int(round(dst_w / src_aspect))
|
| 122 |
+
else:
|
| 123 |
+
new_h = dst_h
|
| 124 |
+
new_w = int(round(dst_h * src_aspect))
|
| 125 |
+
|
| 126 |
+
resized = cv2.resize(img_rgb, (new_w, new_h), interpolation=cv2.INTER_AREA)
|
| 127 |
+
canvas = np.full((dst_h, dst_w, 3), fill, dtype=np.uint8)
|
| 128 |
+
y0 = (dst_h - new_h) // 2
|
| 129 |
+
x0 = (dst_w - new_w) // 2
|
| 130 |
+
canvas[y0:y0+new_h, x0:x0+new_w] = resized
|
| 131 |
+
return canvas
|
| 132 |
+
|
| 133 |
# ============================================================================
|
| 134 |
# MAIN SEGMENTATION FUNCTIONS
|
| 135 |
# ============================================================================
|
|
|
|
| 295 |
return _fallback_segmentation(image)
|
| 296 |
else:
|
| 297 |
raise SegmentationError(f"Unexpected error: {e}")
|
| 298 |
+
|
| 299 |
# ============================================================================
|
| 300 |
# MASK REFINEMENT FUNCTIONS
|
| 301 |
# ============================================================================
|
|
|
|
| 409 |
objects = [1] # single object id
|
| 410 |
with torch.no_grad():
|
| 411 |
output_prob = matanyone_processor.step(img_tensor, mask_tensor, objects=objects)
|
|
|
|
|
|
|
| 412 |
refined_mask_tensor = matanyone_processor.output_prob_to_mask(output_prob)
|
| 413 |
|
|
|
|
| 414 |
refined_mask = refined_mask_tensor.squeeze().detach().cpu().numpy()
|
| 415 |
if refined_mask.max() <= 1.0:
|
| 416 |
refined_mask = (refined_mask * 255).astype(np.uint8)
|
|
|
|
| 469 |
else:
|
| 470 |
raise BackgroundReplacementError(f"Unexpected error: {e}")
|
| 471 |
|
| 472 |
+
def create_professional_background(bg_config: Dict[str, Any] | str, width: int, height: int) -> np.ndarray:
|
| 473 |
+
"""
|
| 474 |
+
Central background builder.
|
| 475 |
+
- Accepts a style string OR a dict like:
|
| 476 |
+
{'background_choice': 'minimalist', 'custom_path': '/path/to/image.jpg'}
|
| 477 |
+
(also backwards compatible with older dicts that contained 'type'/'colors')
|
| 478 |
+
- If 'custom_path' exists, we load that image and letterbox-fit it.
|
| 479 |
+
- Returns RGB np.ndarray of shape (height, width, 3).
|
| 480 |
+
"""
|
| 481 |
+
# Normalize inputs
|
| 482 |
+
choice = "minimalist"
|
| 483 |
+
custom_path = None
|
| 484 |
+
|
| 485 |
+
if isinstance(bg_config, dict):
|
| 486 |
+
# new form
|
| 487 |
+
choice = bg_config.get("background_choice", bg_config.get("name", "minimalist"))
|
| 488 |
+
custom_path = bg_config.get("custom_path")
|
| 489 |
+
# Custom image takes precedence
|
| 490 |
+
if custom_path and os.path.exists(custom_path):
|
| 491 |
+
img_bgr = cv2.imread(custom_path, cv2.IMREAD_COLOR)
|
| 492 |
+
if img_bgr is not None:
|
| 493 |
+
img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
|
| 494 |
+
return _fit_image_letterbox(img_rgb, width, height, fill=(32, 32, 32))
|
| 495 |
+
else:
|
| 496 |
+
logger.warning(f"Failed to read custom background at {custom_path}. Falling back to style.")
|
| 497 |
+
# old form (has type/colors) – build from spec if present
|
| 498 |
+
if "type" in bg_config and "colors" in bg_config:
|
| 499 |
+
if bg_config["type"] == "color":
|
| 500 |
+
background = _create_solid_background(bg_config, width, height)
|
| 501 |
+
else:
|
| 502 |
+
background = _create_gradient_background_enhanced(bg_config, width, height)
|
| 503 |
+
return _apply_background_adjustments(background, bg_config)
|
| 504 |
+
|
| 505 |
+
elif isinstance(bg_config, str):
|
| 506 |
+
choice = bg_config
|
| 507 |
+
|
| 508 |
+
# No custom path → use our lightweight styles
|
| 509 |
+
choice = (choice or "minimalist").lower()
|
| 510 |
+
if choice not in PROFESSIONAL_BACKGROUNDS:
|
| 511 |
+
choice = "minimalist"
|
| 512 |
+
cfg = PROFESSIONAL_BACKGROUNDS[choice]
|
| 513 |
+
|
| 514 |
+
if cfg.get("type") == "color":
|
| 515 |
+
background = _create_solid_background(cfg, width, height)
|
| 516 |
+
else:
|
| 517 |
+
background = _create_gradient_background_enhanced(cfg, width, height)
|
| 518 |
+
|
| 519 |
+
background = _apply_background_adjustments(background, cfg)
|
| 520 |
+
return background
|
| 521 |
+
|
| 522 |
# ============================================================================
|
| 523 |
# VALIDATION FUNCTION
|
| 524 |
# ============================================================================
|
|
|
|
| 790 |
except Exception as e:
|
| 791 |
logger.warning(f"Iterative refinement failed: {e}")
|
| 792 |
return initial_mask
|
| 793 |
+
|
| 794 |
def _assess_mask_quality(mask: np.ndarray, image: np.ndarray) -> float:
|
| 795 |
"""Assess mask quality automatically"""
|
| 796 |
try:
|
|
|
|
| 924 |
return fallback
|
| 925 |
|
| 926 |
def _validate_mask_quality(mask: np.ndarray, image_shape: Tuple[int, int]) -> bool:
|
| 927 |
+
"""
|
| 928 |
+
Soft validator: only reject clearly broken masks.
|
| 929 |
+
- Accept area ratios roughly between 2% and 95%.
|
| 930 |
+
- Don't fail on center; just warn.
|
| 931 |
+
"""
|
| 932 |
try:
|
| 933 |
h, w = image_shape
|
| 934 |
+
total_area = max(1, h * w)
|
| 935 |
+
mask_area = int(np.sum(mask > 127))
|
|
|
|
| 936 |
area_ratio = mask_area / total_area
|
| 937 |
+
|
| 938 |
+
# Only reject extreme cases
|
| 939 |
+
if area_ratio < 0.02 or area_ratio > 0.95:
|
| 940 |
+
logger.warning(f"Suspicious mask area ratio (hard reject): {area_ratio:.3f}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 941 |
return False
|
| 942 |
+
|
| 943 |
+
ys, xs = np.where(mask > 127)
|
| 944 |
+
if len(ys) == 0:
|
| 945 |
+
logger.warning("Empty mask (hard reject)")
|
|
|
|
|
|
|
| 946 |
return False
|
| 947 |
+
|
| 948 |
+
cy, cx = float(np.mean(ys)) / h, float(np.mean(xs)) / w
|
| 949 |
+
if cy < 0.10 or cy > 0.98:
|
| 950 |
+
logger.warning(f"Mask center unusual (advisory): y={cy:.2f}")
|
| 951 |
+
|
| 952 |
return True
|
| 953 |
+
|
| 954 |
except Exception as e:
|
| 955 |
+
logger.warning(f"Mask validation error (allowing): {e}")
|
| 956 |
return True
|
| 957 |
|
| 958 |
def _fallback_segmentation(image: np.ndarray) -> np.ndarray:
|
|
|
|
| 1125 |
except Exception as e:
|
| 1126 |
logger.error(f"Simple compositing failed: {e}")
|
| 1127 |
return frame
|
| 1128 |
+
|
| 1129 |
# ============================================================================
|
| 1130 |
# HELPER FUNCTIONS - BACKGROUND CREATION
|
| 1131 |
# ============================================================================
|
|
|
|
| 1183 |
def _create_horizontal_gradient(colors: list, width: int, height: int) -> np.ndarray:
|
| 1184 |
"""Create horizontal gradient using NumPy for performance"""
|
| 1185 |
gradient = np.zeros((height, width, 3), dtype=np.uint8)
|
|
|
|
| 1186 |
for x in range(width):
|
| 1187 |
progress = x / width if width > 0 else 0
|
| 1188 |
color = _interpolate_color(colors, progress)
|
| 1189 |
gradient[:, x] = color
|
|
|
|
| 1190 |
return gradient
|
| 1191 |
|
| 1192 |
def _create_diagonal_gradient(colors: list, width: int, height: int) -> np.ndarray:
|
|
|
|
| 1208 |
max_distance = np.sqrt(center_x**2 + center_y**2)
|
| 1209 |
|
| 1210 |
y_coords, x_coords = np.mgrid[0:height, 0:width]
|
| 1211 |
+
distances = np.sqrt((x - center_x)**2 + (y - center_y)**2)
|
| 1212 |
progress = distances / max_distance
|
| 1213 |
progress = np.clip(progress, 0, 1)
|
| 1214 |
|
|
|
|
| 1277 |
|
| 1278 |
except Exception as e:
|
| 1279 |
logger.warning(f"Background adjustment failed: {e}")
|
| 1280 |
+
return background
|