Harshith Reddy
Increase file size limits: upload 1 GB, processing 2 GB
6849f7a
import os
import numpy as np
import torch
import nibabel as nib
from monai import transforms
from scipy import ndimage
from scipy.ndimage import binary_closing, binary_opening, binary_fill_holes, median_filter
from skimage.morphology import ball
from datetime import datetime
def validate_nifti(nifti_img):
shape = nifti_img.shape
if len(shape) < 3:
raise ValueError(f"Invalid NIfTI shape: {shape}. Expected at least 3 dimensions.")
if any(s <= 0 for s in shape):
raise ValueError(f"Invalid NIfTI shape: {shape}. All dimensions must be positive.")
if any(s > 2000 for s in shape):
raise ValueError(f"Volume too large: {shape}. Maximum dimension size is 2000.")
voxel_spacing = nifti_img.header.get_zooms()[:3] if len(nifti_img.header.get_zooms()) >= 3 else (1.0, 1.0, 1.0)
if any(sp <= 0 for sp in voxel_spacing):
raise ValueError(f"Invalid voxel spacing: {voxel_spacing}. All values must be positive.")
raw_data = nifti_img.get_fdata()
if np.isnan(raw_data).any():
raise ValueError("NIfTI contains NaN values")
if np.isinf(raw_data).any():
raise ValueError("NIfTI contains Inf values")
return True
def preprocess_nifti(file_path, device=None):
try:
print(f"Preprocessing file: {file_path}")
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
file_size = os.path.getsize(file_path) / (1024**2)
file_size_kb = os.path.getsize(file_path) / 1024
if file_size == 0:
raise ValueError("NIfTI file is empty")
if file_size > 2000:
raise ValueError(f"NIfTI file too large: {file_size:.1f} MB. Maximum processing size is 2 GB. For larger files, consider compression or resampling.")
if file_size_kb < 100:
print(f" ⚠ WARNING: File size is very small ({file_size_kb:.1f} KB). This may indicate:")
print(f" - Low resolution/compressed data (may lose texture and boundary cues)")
print(f" - Single slice upload (incomplete anatomy)")
print(f" - Data compression artifacts (may distort intensity gradients)")
print(f"Loading NIfTI file with nibabel...")
if file_size > 100:
nifti_img = nib.load(file_path, mmap=True)
else:
nifti_img = nib.load(file_path)
print(f"NIfTI shape: {nifti_img.shape}, dtype: {nifti_img.get_fdata().dtype}")
if len(nifti_img.shape) == 3:
if any(s < 10 for s in nifti_img.shape):
print(f" ⚠ WARNING: Very small dimension detected ({nifti_img.shape}). May be a single slice or cropped volume.")
if nifti_img.shape[2] < 20:
print(f" ⚠ WARNING: Only {nifti_img.shape[2]} slices detected. Model expects full 3D volumes for best results.")
validate_nifti(nifti_img)
voxel_spacing = nifti_img.header.get_zooms()[:3] if len(nifti_img.header.get_zooms()) >= 3 else (1.0, 1.0, 1.0)
if voxel_spacing == (1.0, 1.0, 1.0):
print(f" ⚠ WARNING: Voxel spacing is (1.0, 1.0, 1.0) - metadata may be missing or lost during conversion.")
print(f" This can cause incorrect volume calculations and scaling issues.")
affine = nifti_img.affine
affine_det = np.linalg.det(affine[:3, :3])
print(f" → Voxel spacing: {voxel_spacing}")
print(f" → Affine determinant: {affine_det:.6f}")
if abs(affine_det) < 0.1 or abs(affine_det) > 100:
print(f" ⚠ WARNING: Unusual affine determinant ({affine_det:.6f}). Spatial metadata may be corrupted.")
raw_data_dtype = nifti_img.get_fdata().dtype
if raw_data_dtype == np.uint8 or raw_data_dtype == np.uint16:
print(f" ⚠ WARNING: Input data type is {raw_data_dtype} (integer). Model expects float32.")
print(f" Integer data may indicate compression or conversion artifacts.")
print(f" Converting to float32, but quality may be reduced.")
raw_data = nifti_img.get_fdata(dtype=np.float32)
print(f" → Raw data stats: min={raw_data.min():.4f}, max={raw_data.max():.4f}, mean={raw_data.mean():.4f}, std={raw_data.std():.4f}")
if raw_data.max() - raw_data.min() < 1e-6:
raise ValueError(f"Input NIfTI file contains constant values (min=max={raw_data.min():.4f}). Cannot process.")
if raw_data.std() < 1e-3:
print(f" ⚠ WARNING: Very low data variance (std={raw_data.std():.4f}). Data may be corrupted or over-compressed.")
if raw_data.max() > 10000 or raw_data.min() < -1000:
print(f" ⚠ WARNING: Extreme intensity values detected (range: [{raw_data.min():.1f}, {raw_data.max():.1f}]).")
print(f" Data may not be properly normalized. Model expects normalized float32 tensors.")
nonzero_mask = raw_data > 1e-6
nonzero_count = nonzero_mask.sum()
total_count = raw_data.size
nonzero_ratio = nonzero_count / total_count if total_count > 0 else 0.0
print(f" → Non-zero voxels: {nonzero_count:,} / {total_count:,} ({100*nonzero_ratio:.2f}%)")
is_prenormalized = (raw_data.max() <= 1.0 and raw_data.min() >= 0.0)
if is_prenormalized:
print(f" → Detected pre-normalized data (range [0, 1]). Using minimal preprocessing.")
use_enhanced_preprocessing = os.environ.get("USE_ENHANCED_PREPROCESSING", "false").lower() == "true"
if is_prenormalized:
transform = transforms.Compose([
transforms.LoadImaged(keys=["image"]),
transforms.EnsureChannelFirstD(keys=["image"], channel_dim="no_channel"),
transforms.ToTensord(keys=["image"])
])
elif use_enhanced_preprocessing:
try:
transform = transforms.Compose([
transforms.LoadImaged(keys=["image"]),
transforms.Orientationd(keys=["image"], axcodes="RAS"),
transforms.Spacingd(keys=["image"], pixdim=(1.5, 1.5, 3.0), mode="bilinear"),
transforms.EnsureChannelFirstD(keys=["image"], channel_dim="no_channel"),
transforms.ScaleIntensityRangePercentilesd(keys="image", lower=2, upper=98, b_min=0.0, b_max=1.0, clip=True),
transforms.NormalizeIntensityd(keys="image", nonzero=True, channel_wise=True),
transforms.ToTensord(keys=["image"])
])
print(" → Using enhanced preprocessing (orientation + spacing + percentile scaling)")
except Exception as e:
print(f" ⚠ Warning: Could not create enhanced transform pipeline: {e}. Falling back to training-matched preprocessing...")
use_enhanced_preprocessing = False
if not is_prenormalized and not use_enhanced_preprocessing:
transform = transforms.Compose([
transforms.LoadImaged(keys=["image"]),
transforms.EnsureChannelFirstD(keys=["image"], channel_dim="no_channel"),
transforms.NormalizeIntensityd(keys="image", nonzero=True, channel_wise=True),
transforms.ToTensord(keys=["image"])
])
print(" → Using training-matched preprocessing (for optimal accuracy)")
data = {"image": file_path}
print("Applying transforms...")
try:
augmented = transform(data)
image_data = augmented["image"]
except Exception as e:
print(f" ⚠ Transform failed: {e}. Trying fallback preprocessing...")
try:
raw_data_norm = (raw_data - raw_data.min()) / (raw_data.max() - raw_data.min() + 1e-8)
if raw_data_norm.std() < 1e-6:
raise ValueError("Normalized data is still constant")
image_data = torch.from_numpy(raw_data_norm).float()
image_data = image_data.unsqueeze(0)
print(" → Used fallback normalization (min-max scaling)")
except Exception as e2:
raise ValueError(f"Both standard and fallback preprocessing failed: {e2}")
if not isinstance(image_data, torch.Tensor):
image_data = torch.from_numpy(np.array(image_data))
if image_data.dtype != torch.float32:
image_data = image_data.float()
img_np = image_data.numpy() if not hasattr(image_data, 'device') or image_data.device.type == 'cpu' else image_data.cpu().numpy()
vmin, vmax = float(img_np.min()), float(img_np.max())
if vmax - vmin < 1e-6:
print(f" ⚠ WARNING: Preprocessing produced near-constant image (min={vmin:.6f}, max={vmax:.6f}). Trying alternative preprocessing...")
try:
if nonzero_ratio > 0.01:
nonzero_mean = raw_data[nonzero_mask].mean()
nonzero_std = raw_data[nonzero_mask].std() + 1e-8
raw_data_norm = np.zeros_like(raw_data)
raw_data_norm[nonzero_mask] = (raw_data[nonzero_mask] - nonzero_mean) / nonzero_std
raw_data_norm = (raw_data_norm - raw_data_norm.min()) / (raw_data_norm.max() - raw_data_norm.min() + 1e-8)
else:
raw_data_norm = (raw_data - raw_data.min()) / (raw_data.max() - raw_data.min() + 1e-8)
if raw_data_norm.std() < 1e-6:
raise ValueError("Alternative normalization also produced constant data")
image_data = torch.from_numpy(raw_data_norm).float()
image_data = image_data.unsqueeze(0)
img_np = image_data.numpy()
vmin, vmax = float(img_np.min()), float(img_np.max())
print(f" → Alternative preprocessing successful: min={vmin:.4f}, max={vmax:.4f}, mean={img_np.mean():.4f}, std={img_np.std():.4f}")
except Exception as e3:
raise ValueError(f"Preprocessing produced near-constant image: min={vmin:.6f}, max={vmax:.6f}. Alternative preprocessing also failed: {e3}")
print(f" → After transforms: min={vmin:.4f}, max={vmax:.4f}, mean={img_np.mean():.4f}, std={img_np.std():.4f}")
if device is not None and device.type == 'cuda':
if image_data.is_pinned():
image_data = image_data.to(device, non_blocking=True)
else:
image_data = image_data.pin_memory().to(device, non_blocking=True)
if len(image_data.shape) >= 4:
try:
if hasattr(torch, "channels_last_3d"):
image_data = image_data.contiguous(memory_format=torch.channels_last_3d)
if image_data.is_contiguous(memory_format=torch.channels_last_3d):
print(f" → Using channels-last 3D memory layout (optimized for GPU)")
except:
pass
print(f"Preprocessed shape: {image_data.shape}, dtype: {image_data.dtype}, device: {image_data.device if hasattr(image_data, 'device') else 'CPU'}")
if image_data.numel() == 0:
raise ValueError("Preprocessed image is empty")
return image_data
except Exception as e:
error_msg = f"Preprocessing error: {e}"
print(f"✗ {error_msg}")
import traceback
traceback.print_exc()
raise ValueError(f"Failed to preprocess NIfTI file: {e}") from e
def refine_liver_mask_enhanced(mask, voxel_spacing, pred_probabilities, threshold, modality):
original_shape = mask.shape
original_sum = mask.sum()
was_4d = len(mask.shape) == 4
was_5d = len(mask.shape) == 5
if was_5d:
mask_3d = mask[0, 0, 0] if mask.shape[0] == 1 and mask.shape[1] == 1 and mask.shape[2] == 1 else mask[0, 0]
elif was_4d:
mask_3d = mask[0, 0] if mask.shape[0] == 1 and mask.shape[1] == 1 else mask[0]
else:
mask_3d = mask.copy()
if mask_3d.dtype != np.uint8:
mask_3d = (mask_3d > 0.5).astype(np.uint8)
if mask_3d.sum() == 0:
return np.zeros(original_shape, dtype=np.uint8), {
"original_voxels": 0, "refined_voxels": 0, "removed_voxels": 0,
"connected_components_before": 0, "connected_components_after": 0,
"volume_change_ml": 0.0, "volume_change_percent": 0.0,
"guards_ok": False
}, 0.0
H, W, D = mask_3d.shape
guards_ok = True
print(f" NOTE: Spatial priors assume RAS orientation (Right-Anterior-Superior).")
print(f" Input should be reoriented to RAS using nib.as_closest_canonical() before processing.")
print(f" If orientation is unknown, spatial priors may remove valid liver tissue.")
top_remove = max(1, int(0.15 * D))
mask_3d[:, :, :top_remove] = 0
if top_remove > 0:
print(f" Spatial prior: Removed top {top_remove} slices (15% - diaphragm protection, assumes Superior axis)")
right_trim = max(0, int(0.30 * W))
mask_3d[:, W-right_trim:, :] = 0
if right_trim > 0:
print(f" Spatial prior: Removed right {right_trim} pixels (30% - stomach protection, assumes Right axis)")
left_trim = max(0, int(0.15 * W))
mask_3d[:, :left_trim, :] = 0
if left_trim > 0:
print(f" Spatial prior: Removed left {left_trim} pixels (15% - spleen protection, assumes Left axis)")
bottom_remove = max(1, int(0.10 * D))
mask_3d[:, :, -bottom_remove:] = 0
if bottom_remove > 0:
print(f" Spatial prior: Removed bottom {bottom_remove} slices (10% - lower abdomen protection, assumes Inferior axis)")
if D > 2:
bottom_slices = mask_3d[:, :, -2:]
if bottom_slices.sum() > 0:
mask_3d[:, :, -2:] = 0
print(f" Bottom-cap trim: Removed bottom 2 slices (diaphragm protection)")
guards_ok = False
labels_before, num_components_before = ndimage.label(mask_3d)
if num_components_before == 0:
print(f" QC FAIL: No components after spatial priors. Attempting auto-rethreshold...")
guards_ok = False
if hasattr(pred_probabilities, 'shape') and len(pred_probabilities.shape) >= 3:
if len(pred_probabilities.shape) == 4:
pred_3d = pred_probabilities[0, 0]
elif len(pred_probabilities.shape) == 5:
pred_3d = pred_probabilities[0, 0, 0]
else:
pred_3d = pred_probabilities
top_remove = max(1, int(0.15 * D))
right_trim = max(0, int(0.30 * W))
left_trim = max(0, int(0.15 * W))
bottom_remove = max(1, int(0.10 * D))
for retry_threshold in [0.70, 0.65, 0.60, 0.55, 0.50]:
mask_retry = (pred_3d > retry_threshold).astype(np.uint8)
mask_retry[:, :, :top_remove] = 0
mask_retry[:, W-right_trim:, :] = 0
mask_retry[:, :left_trim, :] = 0
mask_retry[:, :, -bottom_remove:] = 0
if mask_retry.sum() > 1000:
mask_3d = mask_retry
print(f" Auto-rethreshold: Found mask at threshold {retry_threshold:.3f}")
break
else:
return np.zeros(original_shape, dtype=np.uint8), {
"original_voxels": original_sum, "refined_voxels": 0, "removed_voxels": int(original_sum),
"connected_components_before": 0, "connected_components_after": 0,
"volume_change_ml": 0.0, "volume_change_percent": -100.0,
"guards_ok": False
}, 0.0
labels_before, num_components_before = ndimage.label(mask_3d)
component_sizes = ndimage.sum(mask_3d, labels_before, range(1, num_components_before + 1))
largest_label = component_sizes.argmax() + 1
mask_3d = (labels_before == largest_label).astype(np.uint8)
print(f" Kept largest connected component ({component_sizes.max():,} voxels)")
coords = np.where(mask_3d > 0)
if len(coords[0]) > 0:
z_span = (coords[2].max() - coords[2].min() + 1) / D if D > 0 else 0
if z_span < 0.25:
print(f" QC FAIL: Z-span only {z_span*100:.1f}% (<25%). Attempting iterative rethreshold...")
guards_ok = False
if hasattr(pred_probabilities, 'shape') and len(pred_probabilities.shape) >= 3:
if len(pred_probabilities.shape) == 4:
pred_3d = pred_probabilities[0, 0]
elif len(pred_probabilities.shape) == 5:
pred_3d = pred_probabilities[0, 0, 0]
else:
pred_3d = pred_probabilities
best_mask = mask_3d
best_z_span = z_span
top_remove = max(1, int(0.12 * D))
right_trim = max(0, int(0.25 * W))
left_trim = max(0, int(0.10 * W))
for retry_threshold in [0.65, 0.60, 0.55, 0.50, 0.45, 0.40, 0.35]:
mask_retry = (pred_3d > retry_threshold).astype(np.uint8)
mask_retry[:, :, :top_remove] = 0
mask_retry[:, W-right_trim:, :] = 0
mask_retry[:, :left_trim, :] = 0
if mask_retry.sum() < 1000:
continue
labels_retry, _ = ndimage.label(mask_retry)
if labels_retry.max() > 0:
comp_sizes_retry = ndimage.sum(mask_retry, labels_retry, range(1, labels_retry.max() + 1))
largest_retry = comp_sizes_retry.argmax() + 1
mask_retry = (labels_retry == largest_retry).astype(np.uint8)
coords_retry = np.where(mask_retry > 0)
if len(coords_retry[0]) > 0:
z_span_retry = (coords_retry[2].max() - coords_retry[2].min() + 1) / D
if z_span_retry >= 0.25:
mask_3d = mask_retry
print(f" Auto-rethreshold SUCCESS: threshold={retry_threshold:.3f}, z-span={z_span_retry*100:.1f}%")
break
elif z_span_retry > best_z_span:
best_mask = mask_retry
best_z_span = z_span_retry
else:
if best_z_span > z_span:
mask_3d = best_mask
print(f" Auto-rethreshold: Using best z-span={best_z_span*100:.1f}% (still <25%)")
else:
print(f" Auto-rethreshold FAILED: No threshold yielded z-span >= 25%")
labels_before_morph, _ = ndimage.label(mask_3d)
if labels_before_morph.max() > 0:
component_sizes_before_morph = ndimage.sum(mask_3d, labels_before_morph, range(1, labels_before_morph.max() + 1))
if len(component_sizes_before_morph) > 0:
largest_label_before_morph = component_sizes_before_morph.argmax() + 1
mask_3d = (labels_before_morph == largest_label_before_morph).astype(np.uint8)
print(f" Kept largest component before morphology")
try:
mask_3d = mask_3d.astype(bool)
structure = ball(2)
mask_3d = binary_closing(mask_3d, structure=structure)
mask_3d = mask_3d.astype(np.uint8)
print(f" Applied binary closing (ball radius=2)")
except Exception as e:
print(f" Binary closing failed: {e}")
try:
mask_3d = mask_3d.astype(bool)
mask_3d = binary_fill_holes(mask_3d)
mask_3d = mask_3d.astype(np.uint8)
print(f" Filled holes")
except Exception as e:
print(f" Hole filling failed: {e}")
try:
mask_3d = median_filter(mask_3d, size=3)
print(f" Applied 3D median filter (size=3)")
except Exception as e:
print(f" Median filter failed: {e}")
labels_after_morph, _ = ndimage.label(mask_3d)
if labels_after_morph.max() > 0:
component_sizes_morph = ndimage.sum(mask_3d, labels_after_morph, range(1, labels_after_morph.max() + 1))
if len(component_sizes_morph) > 0:
largest_label_morph = component_sizes_morph.argmax() + 1
mask_3d = (labels_after_morph == largest_label_morph).astype(np.uint8)
print(f" Re-kept largest component after morphology")
labels_after, num_components_after = ndimage.label(mask_3d)
refined_sum = mask_3d.sum()
removed_voxels = int(np.int64(original_sum) - np.int64(refined_sum))
voxel_volume = voxel_spacing[0] * voxel_spacing[1] * voxel_spacing[2]
volume_change_ml = (removed_voxels * voxel_volume) / 1000.0
volume_change_percent = (removed_voxels / float(original_sum) * 100.0) if original_sum > 0 else 0.0
volume_ml = (refined_sum * voxel_volume) / 1000.0
coords_final = np.where(mask_3d > 0)
if len(coords_final[0]) > 0:
z_span_final = (coords_final[2].max() - coords_final[2].min() + 1) / D if D > 0 else 0
x_centroid = np.mean(coords_final[1]) if len(coords_final) > 1 else W / 2
y_centroid = np.mean(coords_final[0]) if len(coords_final) > 0 else H / 2
if volume_ml < 800 or volume_ml > 2500:
print(f" QC FAIL: Volume {volume_ml:.1f}ml outside normal range [800-2500ml]")
guards_ok = False
if z_span_final < 0.20:
print(f" QC FAIL: Z-span {z_span_final*100:.1f}% too small (<20%)")
guards_ok = False
liver_x_min = 0.15 * W
liver_x_max = 0.55 * W
if x_centroid < liver_x_min or x_centroid > liver_x_max:
print(f" QC FAIL: x-centroid {x_centroid:.1f} outside expected liver band [15%-55% of width]")
guards_ok = False
liver_y_min = 0.25 * H
liver_y_max = 0.75 * H
if y_centroid < liver_y_min or y_centroid > liver_y_max:
print(f" QC FAIL: y-centroid {y_centroid:.1f} outside expected liver band [25%-75% of height]")
guards_ok = False
if volume_ml < 800:
print(f" QC WARNING: Volume {volume_ml:.1f}ml suspiciously low - may be wrong organ")
guards_ok = False
if volume_change_percent > 80:
print(f" QC FAIL: Refinement removed {volume_change_percent:.1f}% - too aggressive")
guards_ok = False
if was_5d:
if original_shape[0] == 1 and original_shape[1] == 1 and original_shape[2] == 1:
mask_3d = mask_3d[np.newaxis, np.newaxis, np.newaxis, :, :, :]
else:
mask_3d = mask_3d[np.newaxis, np.newaxis, :, :, :]
elif was_4d:
if original_shape[0] == 1 and original_shape[1] == 1:
mask_3d = mask_3d[np.newaxis, np.newaxis, :, :, :]
else:
mask_3d = mask_3d[np.newaxis, :, :, :]
mask_3d = mask_3d.astype(np.uint8)
if mask_3d.shape != original_shape:
if len(original_shape) == 3:
while mask_3d.ndim > 3:
mask_3d = mask_3d.squeeze(0)
elif len(original_shape) == 4:
while mask_3d.ndim < 4:
mask_3d = mask_3d[np.newaxis, ...]
while mask_3d.ndim > 4:
mask_3d = mask_3d.squeeze(0)
elif len(original_shape) == 5:
while mask_3d.ndim < 5:
mask_3d = mask_3d[np.newaxis, ...]
while mask_3d.ndim > 5:
mask_3d = mask_3d.squeeze(0)
print(f" Refinement complete: {original_sum:,} -> {refined_sum:,} voxels ({removed_voxels:,} removed, {volume_change_percent:.2f}%)")
print(f" Connected components: {num_components_before} -> {num_components_after}")
confidence_score = calculate_confidence_score(mask_3d, pred_probabilities, threshold, num_components_after, volume_change_percent, guards_ok, voxel_spacing)
metrics = {
"original_voxels": int(original_sum),
"refined_voxels": int(refined_sum),
"removed_voxels": removed_voxels,
"connected_components_before": int(num_components_before),
"connected_components_after": int(num_components_after),
"volume_change_ml": float(volume_change_ml),
"volume_change_percent": float(volume_change_percent),
"guards_ok": guards_ok
}
return mask_3d, metrics, confidence_score
def calculate_confidence_score(mask, pred_probabilities, threshold, num_components, volume_change_percent, guards_ok=True, voxel_spacing=(1.0, 1.0, 1.0)):
if mask.sum() == 0:
return 0.0
if len(mask.shape) == 4:
mask_3d = mask[0, 0]
elif len(mask.shape) == 5:
mask_3d = mask[0, 0, 0]
else:
mask_3d = mask
if len(pred_probabilities.shape) == 4:
pred_3d = pred_probabilities[0, 0]
elif len(pred_probabilities.shape) == 5:
pred_3d = pred_probabilities[0, 0, 0]
else:
pred_3d = pred_probabilities
mask_indices = mask_3d > 0
if mask_indices.sum() == 0:
return 0.0
avg_p = float(np.clip(pred_3d[mask_indices].mean(), 0.0, 1.0))
comp_pen = 1.0 if num_components == 1 else max(0.5, 1.0 - 0.1 * (num_components - 1))
vol_pen = 1.0 if abs(volume_change_percent) < 50 else 0.7
if not guards_ok:
guard_pen = 0.5
else:
guard_pen = 1.0
volume_ml = (mask_3d.sum() * (voxel_spacing[0] * voxel_spacing[1] * voxel_spacing[2])) / 1000.0
if volume_ml < 800:
volume_penalty = 0.5
elif volume_ml < 1000:
volume_penalty = 0.7
elif volume_ml < 1200:
volume_penalty = 0.9
else:
volume_penalty = 1.0
confidence = 100 * avg_p * comp_pen * vol_pen * guard_pen * volume_penalty
confidence = float(np.clip(confidence, 0, 100))
return confidence
def refine_liver_mask(mask, voxel_spacing=(1.0, 1.0, 1.0), enable_smoothing=True, min_component_size=None):
"""
Refine liver segmentation mask to remove fragmentation, smooth boundaries, and ensure single connected component.
Args:
mask: 3D or 4D numpy array (H, W, D) or (1, 1, H, W, D) with binary values (0 or 1)
voxel_spacing: Tuple of (z, y, x) voxel spacing in mm
enable_smoothing: Whether to apply median filter smoothing (default: True)
min_component_size: Minimum size for connected components to keep (None = keep only largest)
Returns:
refined_mask: Refined binary mask (same shape as input)
metrics: Dictionary with refinement statistics
"""
original_shape = mask.shape
original_sum = mask.sum()
was_4d = len(mask.shape) == 4
was_5d = len(mask.shape) == 5
if was_5d:
mask = mask[0, 0, 0] if mask.shape[0] == 1 and mask.shape[1] == 1 and mask.shape[2] == 1 else mask[0, 0]
elif was_4d:
mask = mask[0, 0] if mask.shape[0] == 1 and mask.shape[1] == 1 else mask[0]
if mask.dtype != np.uint8:
mask = (mask > 0.5).astype(np.uint8)
if mask.sum() == 0:
print(" ⚠ Empty mask - no refinement possible")
return np.zeros(original_shape, dtype=np.uint8), {
"original_voxels": 0,
"refined_voxels": 0,
"removed_voxels": 0,
"connected_components_before": 0,
"connected_components_after": 0,
"volume_change_ml": 0.0,
"volume_change_percent": 0.0
}
labels_before, num_components_before = ndimage.label(mask)
if num_components_before == 0:
print(" ⚠ No connected components found")
return np.zeros(original_shape, dtype=np.uint8), {
"original_voxels": original_sum,
"refined_voxels": 0,
"removed_voxels": int(original_sum),
"connected_components_before": 0,
"connected_components_after": 0,
"volume_change_ml": 0.0,
"volume_change_percent": -100.0
}
component_sizes = ndimage.sum(mask, labels_before, range(1, num_components_before + 1))
if min_component_size is None:
largest_label = component_sizes.argmax() + 1
mask = (labels_before == largest_label).astype(np.uint8)
print(f" → Kept largest connected component ({component_sizes.max():,} voxels)")
else:
valid_labels = np.where(component_sizes >= min_component_size)[0] + 1
if len(valid_labels) == 0:
largest_label = component_sizes.argmax() + 1
mask = (labels_before == largest_label).astype(np.uint8)
print(f" → No components >= {min_component_size} voxels, kept largest ({component_sizes.max():,} voxels)")
else:
mask = np.isin(labels_before, valid_labels).astype(np.uint8)
print(f" → Kept {len(valid_labels)} component(s) >= {min_component_size} voxels")
after_cc = mask.sum()
try:
structure = ball(3)
mask = binary_closing(mask, structure=structure)
print(f" → Applied binary closing (ball radius=3)")
except Exception as e:
print(f" ⚠ Binary closing failed: {e}")
try:
mask = binary_fill_holes(mask)
print(f" → Filled holes")
except Exception as e:
print(f" ⚠ Hole filling failed: {e}")
try:
structure = ball(2)
mask = binary_opening(mask, structure=structure)
print(f" → Applied binary opening (ball radius=2)")
except Exception as e:
print(f" ⚠ Binary opening failed: {e}")
if enable_smoothing:
try:
mask = median_filter(mask, size=3)
print(f" → Applied 3D median filter (size=3)")
except Exception as e:
print(f" ⚠ Median filter failed: {e}")
labels_after, num_components_after = ndimage.label(mask)
refined_sum = mask.sum()
removed_voxels = int(original_sum - refined_sum)
voxel_volume = voxel_spacing[0] * voxel_spacing[1] * voxel_spacing[2]
volume_change_ml = (removed_voxels * voxel_volume) / 1000.0
volume_change_percent = (removed_voxels / original_sum * 100.0) if original_sum > 0 else 0.0
if was_5d:
if original_shape[0] == 1 and original_shape[1] == 1 and original_shape[2] == 1:
mask = mask[np.newaxis, np.newaxis, np.newaxis, :, :, :]
else:
mask = mask[np.newaxis, np.newaxis, :, :, :]
elif was_4d:
if original_shape[0] == 1 and original_shape[1] == 1:
mask = mask[np.newaxis, np.newaxis, :, :, :]
else:
mask = mask[np.newaxis, :, :, :]
mask = mask.astype(np.uint8)
if mask.shape != original_shape:
print(f" ⚠ Shape mismatch: {mask.shape} vs {original_shape}, fixing...")
if len(original_shape) == 3:
while mask.ndim > 3:
mask = mask.squeeze(0)
elif len(original_shape) == 4:
while mask.ndim < 4:
mask = mask[np.newaxis, ...]
while mask.ndim > 4:
mask = mask.squeeze(0)
elif len(original_shape) == 5:
while mask.ndim < 5:
mask = mask[np.newaxis, ...]
while mask.ndim > 5:
mask = mask.squeeze(0)
print(f" ✓ Refinement complete: {original_sum:,}{refined_sum:,} voxels ({removed_voxels:,} removed, {volume_change_percent:.2f}%)")
print(f" → Connected components: {num_components_before}{num_components_after}")
metrics = {
"original_voxels": int(original_sum),
"refined_voxels": int(refined_sum),
"removed_voxels": removed_voxels,
"connected_components_before": int(num_components_before),
"connected_components_after": int(num_components_after),
"volume_change_ml": float(volume_change_ml),
"volume_change_percent": float(volume_change_percent)
}
return mask, metrics
def calculate_liver_volume(pred_binary, voxel_spacing=(1.0, 1.0, 1.0)):
voxel_volume = voxel_spacing[0] * voxel_spacing[1] * voxel_spacing[2]
liver_voxels = pred_binary.sum()
volume_ml = liver_voxels * voxel_volume / 1000.0
return volume_ml
def analyze_liver_morphology(pred_binary):
if len(pred_binary.shape) == 4:
mask_3d = pred_binary[0]
elif len(pred_binary.shape) == 5:
mask_3d = pred_binary[0, 0]
else:
mask_3d = pred_binary
labeled_mask, num_features = ndimage.label(mask_3d)
if num_features == 0:
return {"connected_components": 0, "largest_component_ratio": 0.0, "fragmentation": "high"}
component_sizes = [np.sum(labeled_mask == i) for i in range(1, num_features + 1)]
largest_component = max(component_sizes)
total_liver = pred_binary.sum()
largest_ratio = largest_component / total_liver if total_liver > 0 else 0.0
if largest_ratio > 0.95:
fragmentation = "low"
elif largest_ratio > 0.80:
fragmentation = "moderate"
else:
fragmentation = "high"
return {
"connected_components": int(num_features),
"largest_component_ratio": float(largest_ratio),
"fragmentation": fragmentation
}
def check_volume_sanity(volume_ml):
normal_range = (float(os.getenv("LIVER_VOL_LOW", "1200")), float(os.getenv("LIVER_VOL_HIGH", "1800")))
if volume_ml < normal_range[0] * 0.5:
return "CRITICAL", f"Volume ({volume_ml:.1f} ml) is extremely low (<50% of normal). Please visually inspect overlay for segmentation errors."
elif volume_ml < normal_range[0]:
return "WARNING", f"Volume ({volume_ml:.1f} ml) is below normal range. Please visually inspect overlay."
elif volume_ml > normal_range[1] * 1.5:
return "CRITICAL", f"Volume ({volume_ml:.1f} ml) is extremely high (>150% of normal). Please visually inspect overlay for segmentation errors."
elif volume_ml > normal_range[1]:
return "WARNING", f"Volume ({volume_ml:.1f} ml) is above normal range. Please visually inspect overlay."
return "OK", None
def generate_medical_report(statistics, volume_ml, morphology, modality, confidence_score=0.0):
liver_percentage = statistics["liver_percentage"]
volume_shape = statistics["volume_shape"]
liver_voxels = statistics.get("liver_voxels", 0)
total_voxels = statistics.get("total_voxels", 0)
normal_liver_volume_range = (1200, 1800)
normal_liver_percentage_range = (2.0, 3.5)
findings = []
recommendations = []
clinical_notes = []
quality_assessment = []
if liver_voxels == 0:
severity = "failure"
status = "FAILURE"
findings.append("**SEGMENTATION FAILURE:** No liver tissue detected (0 voxels segmented).")
recommendations.append("**CRITICAL:** Segmentation failed completely. Possible causes:")
recommendations.append(" • Input quality issues (low resolution, compression, missing metadata)")
recommendations.append(" • Threshold too high for prediction distribution")
recommendations.append(" • Model mismatch with input modality or preprocessing")
recommendations.append(" • Please check input file quality and try again, or contact support.")
clinical_notes.append("The automated segmentation system failed to identify any liver tissue. This indicates a technical failure rather than an anatomical finding.")
quality_assessment.append("**Segmentation Failure:** No voxels were segmented. Manual review and re-processing required.")
impression_parts = ["Automated liver segmentation FAILED. No liver tissue was detected."]
impression_parts.append("This is a technical failure requiring investigation of input quality and model compatibility.")
else:
severity = "normal"
status = "NORMAL"
num_components = morphology.get("connected_components", 1)
largest_ratio = morphology.get("largest_component_ratio", 1.0)
if num_components > 1 and largest_ratio < 0.9:
severity = "critical"
status = "CRITICAL"
findings.append(f"**CRITICAL: Fragmented Segmentation:** {num_components} disconnected components detected. Largest component is only {largest_ratio*100:.1f}% of total volume.")
recommendations.append("**URGENT:** Segmentation shows severe fragmentation. Manual correction required.")
clinical_notes.append("The segmentation contains multiple disconnected regions, indicating possible segmentation artifacts or severe anatomical abnormalities.")
elif volume_ml < normal_liver_volume_range[0] * 0.5 or volume_ml > normal_liver_volume_range[1] * 1.5:
if confidence_score < 50:
severity = "critical"
status = "CRITICAL"
else:
severity = "moderate"
status = "WARNING"
elif volume_ml < normal_liver_volume_range[0] or volume_ml > normal_liver_volume_range[1]:
severity = "moderate"
status = "WARNING"
volume_sanity_status, volume_sanity_msg = check_volume_sanity(volume_ml)
if volume_sanity_status == "CRITICAL":
if severity != "critical":
severity = "critical"
status = "CRITICAL"
findings.append(f"**CRITICAL FINDING:** {volume_sanity_msg}")
recommendations.append("**URGENT:** Visual inspection and manual review required. Segmentation may contain significant errors that could affect clinical interpretation.")
clinical_notes.append("The automated segmentation has produced results that fall outside expected physiological ranges. This may indicate technical issues with the segmentation algorithm or unusual patient anatomy.")
elif volume_sanity_status == "WARNING":
if severity == "normal":
severity = "moderate"
status = "WARNING"
findings.append(f"**WARNING:** {volume_sanity_msg}")
recommendations.append("Visual inspection recommended to verify segmentation accuracy and ensure clinical validity.")
clinical_notes.append("The segmentation results are outside the typical range but may still be clinically valid depending on patient-specific factors.")
clinical_notes.append("Note: Normal liver volume range (1200-1800 ml) is for average adult body size. Pediatric patients or extreme body sizes may have different normal ranges.")
if volume_ml < normal_liver_volume_range[0]:
findings.append(f"**Liver Volume Assessment:** Measured liver volume is **{volume_ml:.1f} ml**, which is below the normal reference range of {normal_liver_volume_range[0]}-{normal_liver_volume_range[1]} ml.")
clinical_notes.append(f"This represents approximately **{((normal_liver_volume_range[0] - volume_ml) / normal_liver_volume_range[0] * 100):.1f}% reduction** compared to the lower limit of normal. Possible etiologies include:")
clinical_notes.append(" • Chronic liver disease with parenchymal loss")
clinical_notes.append(" • Post-surgical resection")
clinical_notes.append(" • Cirrhosis with volume loss")
clinical_notes.append(" • Age-related atrophy")
recommendations.append("Consider follow-up imaging to monitor liver volume changes over time. Correlation with clinical history and liver function tests is recommended.")
if severity == "normal":
severity = "mild" if volume_ml > normal_liver_volume_range[0] * 0.7 else "moderate"
elif volume_ml > normal_liver_volume_range[1]:
findings.append(f"**Liver Volume Assessment:** Measured liver volume is **{volume_ml:.1f} ml**, which exceeds the normal reference range of {normal_liver_volume_range[0]}-{normal_liver_volume_range[1]} ml.")
clinical_notes.append(f"This represents approximately **{((volume_ml - normal_liver_volume_range[1]) / normal_liver_volume_range[1] * 100):.1f}% increase** compared to the upper limit of normal, consistent with hepatomegaly. Potential causes include:")
clinical_notes.append(" • Fatty liver disease (steatosis)")
clinical_notes.append(" • Congestive hepatopathy")
clinical_notes.append(" • Inflammatory conditions")
clinical_notes.append(" • Storage diseases")
clinical_notes.append(" • Neoplastic processes")
recommendations.append("Further clinical evaluation recommended to identify underlying etiology. Consider correlation with laboratory findings, clinical history, and additional imaging studies.")
if severity == "normal":
severity = "mild" if volume_ml < normal_liver_volume_range[1] * 1.3 else "moderate"
else:
findings.append(f"**Liver Volume Assessment:** Measured liver volume is **{volume_ml:.1f} ml**, which falls within the normal reference range of {normal_liver_volume_range[0]}-{normal_liver_volume_range[1]} ml.")
clinical_notes.append("The liver volume is within expected physiological parameters for an adult patient.")
if morphology["connected_components"] > 1:
if morphology["largest_component_ratio"] < 0.9:
if severity != "critical":
severity = "critical"
status = "CRITICAL"
findings.append(f"**CRITICAL: Fragmented Segmentation:** The liver segmentation identified **{morphology['connected_components']} separate connected components**. The largest component represents only **{morphology['largest_component_ratio']*100:.1f}%** of the total segmented volume.")
quality_assessment.append("**Severe Fragmentation Detected:** Multiple disconnected regions suggest possible segmentation artifacts or severe anatomical variations.")
recommendations.append("**URGENT:** Manual review and correction required. Fragmentation indicates potential segmentation errors.")
elif morphology["largest_component_ratio"] < 0.95:
if severity == "normal":
severity = "moderate"
status = "WARNING"
findings.append(f"**Segmentation Quality:** The liver segmentation identified **{morphology['connected_components']} separate connected components**. The largest component represents **{morphology['largest_component_ratio']*100:.1f}%** of the total segmented volume.")
quality_assessment.append("**Moderate Fragmentation Detected:** Multiple disconnected regions suggest possible segmentation artifacts or anatomical variations.")
quality_assessment.append("Post-processing filters (largest-component selection, hole-filling, morphological operations) have been applied to optimize the segmentation.")
recommendations.append("Review the segmentation overlay carefully. The presence of multiple components may indicate:")
recommendations.append(" • Segmentation artifacts requiring manual correction")
recommendations.append(" • Anatomical variants (e.g., accessory liver lobes)")
recommendations.append(" • Pathological processes causing liver fragmentation")
else:
findings.append(f"**Segmentation Quality:** The liver segmentation shows **{morphology['connected_components']} components**, with the largest component comprising **{morphology['largest_component_ratio']*100:.1f}%** of the total volume, indicating good segmentation continuity.")
quality_assessment.append("The segmentation demonstrates good connectivity with a dominant main component.")
else:
quality_assessment.append("**Excellent Segmentation Quality:** Single connected component indicates robust segmentation with good anatomical continuity.")
if morphology["fragmentation"] == "high":
findings.append(f"**High Fragmentation Detected:** The liver segmentation demonstrates high morphological fragmentation, which may reflect irregular liver surface or segmentation challenges.")
quality_assessment.append("High fragmentation suggests the liver may have irregular borders or that the segmentation encountered challenging anatomical features.")
recommendations.append("Manual review and potential refinement of the segmentation may be beneficial for optimal clinical interpretation.")
if severity == "normal":
severity = "mild"
elif morphology["fragmentation"] == "moderate":
quality_assessment.append("Moderate fragmentation observed, which is acceptable for clinical use but may benefit from review.")
else:
quality_assessment.append("Low fragmentation indicates smooth, well-defined liver boundaries.")
if liver_percentage < normal_liver_percentage_range[0]:
findings.append(f"**Spatial Distribution:** The liver occupies **{liver_percentage:.2f}%** of the total scan volume, which is below the typical range of {normal_liver_percentage_range[0]}-{normal_liver_percentage_range[1]}%.")
clinical_notes.append("This may reflect a smaller liver relative to the field of view, or indicate that the scan includes a larger portion of the abdomen.")
elif liver_percentage > normal_liver_percentage_range[1]:
findings.append(f"**Spatial Distribution:** The liver occupies **{liver_percentage:.2f}%** of the total scan volume, which is above the typical range.")
clinical_notes.append("This may indicate an enlarged liver or a scan field of view focused on the upper abdomen.")
else:
findings.append(f"**Spatial Distribution:** The liver occupies **{liver_percentage:.2f}%** of the scan volume, within the expected range.")
if total_voxels > 0:
voxel_density = liver_voxels / total_voxels * 100
quality_assessment.append(f"**Segmentation Coverage:** {liver_voxels:,} voxels segmented out of {total_voxels:,} total voxels ({voxel_density:.2f}% coverage).")
if volume_shape:
quality_assessment.append(f"**Image Dimensions:** {volume_shape[0]} × {volume_shape[1]} × {volume_shape[2]} voxels")
impression_parts = []
if severity == "normal":
impression_parts.append("Automated liver segmentation completed successfully using the SRMA-Mamba deep learning model.")
impression_parts.append("The segmentation demonstrates good quality with measurements within expected physiological ranges.")
elif severity == "mild":
impression_parts.append("Automated liver segmentation completed with minor findings.")
impression_parts.append("The segmentation is generally acceptable but requires clinical correlation and visual review.")
elif severity == "moderate":
impression_parts.append("Automated liver segmentation completed with notable findings requiring attention.")
impression_parts.append("Visual inspection and clinical correlation are recommended to ensure accuracy.")
elif severity == "critical":
impression_parts.append("Automated liver segmentation completed with critical findings.")
impression_parts.append("Immediate visual inspection and manual review are strongly recommended.")
impression_parts.append(f"**{len(findings)} key finding(s)** identified during automated analysis.")
report = {
"patient_id": "N/A",
"study_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"modality": modality.upper(),
"status": status,
"findings": findings,
"clinical_notes": clinical_notes,
"quality_assessment": quality_assessment,
"measurements": {
"liver_volume_ml": round(volume_ml, 2),
"liver_volume_liters": round(volume_ml / 1000.0, 3),
"liver_percentage": round(liver_percentage, 2),
"liver_voxels": int(liver_voxels),
"total_voxels": int(total_voxels),
"volume_shape": volume_shape,
"morphology": morphology,
"confidence_score": round(confidence_score, 1)
},
"impression": " ".join(impression_parts) if liver_voxels > 0 else impression_parts[0] if impression_parts else "Segmentation failed.",
"recommendations": recommendations,
"severity": severity,
"methodology": "SRMA-Mamba: State Space Model for Medical Image Segmentation using Mamba architecture with sliding window inference",
"disclaimer": "**IMPORTANT:** This is an automated analysis generated by artificial intelligence. Results should be reviewed and validated by a qualified radiologist or physician. This report is not intended for diagnostic use without appropriate clinical correlation and professional medical interpretation."
}
return report