| | import os |
| | import csv |
| | import zipfile |
| | import subprocess |
| | import uuid |
| | import shutil |
| | from pathlib import Path |
| | from typing import Tuple, Dict, List, Optional |
| |
|
| | import gradio as gr |
| | import SimpleITK as sitk |
| | from huggingface_hub import hf_hub_download |
| |
|
| | import spaces |
| |
|
| |
|
| | |
| | @spaces.GPU |
| | def _init_gpu(): |
| | """Dummy function to ensure Spaces detects GPU usage at startup.""" |
| | import torch |
| | return torch.cuda.is_available() if torch.cuda.is_available() else True |
| |
|
| |
|
| | |
| | |
| | |
| | APP_NAME = "Multiple Sclerosis Lesion Tracker" |
| | ALLOW_CPU_FALLBACK = True |
| |
|
| | |
| | STRIPPED_DIR = "stripped" |
| | REGISTERED_DIR = "registered" |
| | SEG_DIR = "seg_flames" |
| | DIFF_SEG_DIR = "seg_flames/diff_seg" |
| |
|
| | |
| | JOBS_ROOT = Path(os.getenv("MSAPP_JOBS_ROOT", Path.cwd() / "jobs")).resolve() |
| | MODEL_ROOT = Path(os.getenv("MSAPP_MODEL_ROOT", Path.home() / ".cache" / "msapp" / "flames_model")).resolve() |
| | BIN_ROOT = Path(os.getenv("MSAPP_BIN_DIR", Path.home() / ".cache" / "msapp" / "bin")).resolve() |
| |
|
| | |
| | HF_REPO = "FrancescoLR/FLAMeS-model" |
| | HF_ZIP_NAME = "Dataset004_WML.zip" |
| | NNUNET_DS_ID = "004" |
| | NNUNET_CFG = "3d_fullres" |
| | NNUNET_TR = "nnUNetTrainer_8000epochs" |
| |
|
| | |
| | SYNTHSTRIP_MODEL = Path( |
| | os.getenv("MSAPP_SYNTHSTRIP_MODEL", "").strip() or "" |
| | ).expanduser() if os.getenv("MSAPP_SYNTHSTRIP_MODEL") else Path("") |
| |
|
| |
|
| | |
| | |
| | |
| | def ensure_dirs() -> None: |
| | JOBS_ROOT.mkdir(parents=True, exist_ok=True) |
| | MODEL_ROOT.mkdir(parents=True, exist_ok=True) |
| | BIN_ROOT.mkdir(parents=True, exist_ok=True) |
| |
|
| | |
| | try: |
| | _init_gpu() |
| | except Exception: |
| | pass |
| |
|
| |
|
| | def is_zip(path: Path) -> bool: |
| | return path.suffix.lower() == ".zip" |
| |
|
| |
|
| | def get_dcm2niix_bin() -> Optional[str]: |
| | p = os.getenv("DCM2NIIX_BIN") |
| | if p and Path(p).exists(): |
| | return str(Path(p).resolve()) |
| | return shutil.which("dcm2niix") |
| |
|
| |
|
| | |
| | def ensure_synthstrip_available() -> None: |
| | """ |
| | Ensure SynthStrip from NiPreps is available, either: |
| | - Python interface (preferred): pip install 'nipreps-synthstrip[nipype]' |
| | - CLI executable fallback: pip install nipreps-synthstrip |
| | """ |
| | try: |
| | import importlib |
| | importlib.import_module("nipreps.synthstrip.wrappers.nipype") |
| | return |
| | except Exception: |
| | pass |
| |
|
| | if shutil.which("nipreps-synthstrip") is None: |
| | raise RuntimeError( |
| | "SynthStrip is not available. Install one of:\n" |
| | " • Python/Nipype: pip install 'nipreps-synthstrip[nipype]'\n" |
| | " • CLI only: pip install nipreps-synthstrip\n" |
| | "and ensure 'nipreps-synthstrip' is on PATH." |
| | ) |
| |
|
| |
|
| | def skull_strip_synthstrip(in_nii: Path, out_dir: Path, prefix: str) -> Tuple[Path, Path]: |
| | """ |
| | Run SynthStrip via NiPreps (Nipype interface if available, else CLI). |
| | Honors SYNTHSTRIP_MODEL if set. |
| | Returns (stripped_path, mask_path). |
| | """ |
| | out_dir.mkdir(parents=True, exist_ok=True) |
| | stripped = out_dir / f"{prefix}_stripped.nii.gz" |
| | mask = out_dir / f"{prefix}_mask.nii.gz" |
| |
|
| | model_path = None |
| | if SYNTHSTRIP_MODEL and str(SYNTHSTRIP_MODEL).strip(): |
| | if not SYNTHSTRIP_MODEL.exists(): |
| | raise RuntimeError(f"SynthStrip model not found at: {SYNTHSTRIP_MODEL}") |
| | model_path = str(SYNTHSTRIP_MODEL) |
| |
|
| | |
| | try: |
| | from nipreps.synthstrip.wrappers.nipype import SynthStrip |
| | kwargs = dict(in_file=str(in_nii), out_file=str(stripped), out_mask=str(mask)) |
| | |
| | if model_path: |
| | try: |
| | kwargs["model"] = model_path |
| | except Exception: |
| | pass |
| | |
| | try: |
| | kwargs["no_csf"] = True |
| | except Exception: |
| | pass |
| |
|
| | node = SynthStrip(**kwargs) |
| | res = node.run() |
| |
|
| | |
| | if not stripped.exists(): |
| | out_file = getattr(res.outputs, "out_file", None) |
| | if out_file and Path(out_file).exists(): |
| | shutil.copy2(out_file, stripped) |
| | if not mask.exists(): |
| | out_mask = getattr(res.outputs, "out_mask", None) |
| | if out_mask and Path(out_mask).exists(): |
| | shutil.copy2(out_mask, mask) |
| | if not stripped.exists() or not mask.exists(): |
| | raise RuntimeError("SynthStrip (nipype) finished but outputs not found.") |
| | return stripped, mask |
| | except Exception: |
| | |
| | pass |
| |
|
| | |
| | |
| | if not model_path: |
| | raise RuntimeError( |
| | "Your SynthStrip CLI requires a model. Set MSAPP_SYNTHSTRIP_MODEL to the .pt file, e.g.\n" |
| | "export MSAPP_SYNTHSTRIP_MODEL='/home/karlo/Desktop/app_flames_10/synthstrip_model/synthstrip.nocsf.1.pt'" |
| | ) |
| |
|
| | cmd = [ |
| | shutil.which("nipreps-synthstrip") or "nipreps-synthstrip", |
| | "-i", str(in_nii), |
| | "-o", str(stripped), |
| | "-m", str(mask), |
| | "--model", model_path, |
| | |
| | |
| | |
| | ] |
| | run = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) |
| | if run.returncode != 0: |
| | raise RuntimeError(f"SynthStrip (CLI) failed:\n{run.stdout}") |
| | if not stripped.exists() or not mask.exists(): |
| | raise RuntimeError("SynthStrip (CLI) finished but outputs not found.") |
| | return stripped, mask |
| |
|
| |
|
| | def ensure_flames_model() -> None: |
| | """ |
| | Ensure FLAMeS weights (Dataset004_WML) exist under MODEL_ROOT. |
| | nnUNet expects: MODEL_ROOT / 'Dataset004_WML' / ... |
| | """ |
| | ensure_dirs() |
| | ds_dir = MODEL_ROOT / "Dataset004_WML" |
| | if ds_dir.exists(): |
| | return |
| | zip_path = hf_hub_download(repo_id=HF_REPO, filename=HF_ZIP_NAME, cache_dir=str(MODEL_ROOT)) |
| | subprocess.run(["unzip", "-o", zip_path, "-d", str(MODEL_ROOT)], check=True) |
| |
|
| |
|
| | def stage_upload(src_path: Path, incoming_dir: Path, name_hint: str) -> Path: |
| | """ |
| | Copy an uploaded file (which may live under /tmp/gradio) into the job's incoming dir. |
| | Returns the staged local path (under JOBS_ROOT). |
| | """ |
| | incoming_dir.mkdir(parents=True, exist_ok=True) |
| | dst = incoming_dir / f"{name_hint}{src_path.suffix}" |
| | |
| | if "".join(src_path.suffixes[-2:]).lower() == ".nii.gz": |
| | dst = incoming_dir / f"{name_hint}.nii.gz" |
| | shutil.copy2(src_path, dst) |
| | return dst |
| |
|
| |
|
| | def convert_dicom_zip_to_nifti(zip_path: Path, out_dir: Path) -> Path: |
| | extract_dir = out_dir / "dicom_extracted" |
| | extract_dir.mkdir(parents=True, exist_ok=True) |
| | with zipfile.ZipFile(zip_path, "r") as zf: |
| | zf.extractall(extract_dir) |
| |
|
| | nifti_dir = out_dir / "nifti" |
| | nifti_dir.mkdir(parents=True, exist_ok=True) |
| |
|
| | dcm2 = get_dcm2niix_bin() |
| | if not dcm2: |
| | raise RuntimeError( |
| | "dcm2niix not found. Set DCM2NIIX_BIN to its absolute path " |
| | "(e.g., /home/karlo/anaconda3/envs/nnunet-env/bin/dcm2niix) " |
| | "or launch the app from the activated env." |
| | ) |
| |
|
| | run = subprocess.run( |
| | [dcm2, "-d", "9", "-z", "y", "-f", "series_%p_%s", "-o", str(nifti_dir), str(extract_dir)], |
| | stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True |
| | ) |
| | if run.returncode != 0: |
| | raise RuntimeError(f"dcm2niix failed:\n{run.stdout}") |
| |
|
| | candidates = sorted(nifti_dir.glob("*.nii.gz"), key=lambda p: p.stat().st_size, reverse=True) |
| | if not candidates: |
| | raise RuntimeError("No NIfTI produced by dcm2niix.") |
| | return candidates[0] |
| |
|
| |
|
| | def read_float(path: Path) -> sitk.Image: |
| | return sitk.Cast(sitk.ReadImage(str(path)), sitk.sitkFloat32) |
| |
|
| |
|
| | def register_rigid_affine(prev_stripped: Path, new_stripped: Path, reg_dir: Path) -> Tuple[Path, sitk.Transform, sitk.Image]: |
| | """Register new->prev (rigid then affine). Returns (registered_path, affine_tx, fixed_img).""" |
| | reg_dir.mkdir(parents=True, exist_ok=True) |
| | fixed = read_float(prev_stripped) |
| | moving = read_float(new_stripped) |
| |
|
| | |
| | initial = sitk.CenteredTransformInitializer( |
| | fixed, moving, sitk.VersorRigid3DTransform(), |
| | sitk.CenteredTransformInitializerFilter.GEOMETRY |
| | ) |
| | R = sitk.ImageRegistrationMethod() |
| | R.SetMetricAsMattesMutualInformation(50) |
| | R.SetMetricSamplingStrategy(R.RANDOM) |
| | R.SetMetricSamplingPercentage(0.1) |
| | R.SetInterpolator(sitk.sitkLinear) |
| | R.SetOptimizerAsRegularStepGradientDescent(learningRate=2.0, minStep=1e-4, numberOfIterations=200, relaxationFactor=0.5) |
| | R.SetOptimizerScalesFromPhysicalShift() |
| | R.SetShrinkFactorsPerLevel([4, 2, 1]) |
| | R.SetSmoothingSigmasPerLevel([2, 1, 0]); R.SmoothingSigmasAreSpecifiedInPhysicalUnitsOn() |
| | R.SetInitialTransform(initial, inPlace=False) |
| | rigid_tx = R.Execute(fixed, moving) |
| |
|
| | if rigid_tx.GetName() == "CompositeTransform": |
| | rigid_only = rigid_tx.GetNthTransform(0) |
| | else: |
| | rigid_only = rigid_tx |
| | if rigid_only.GetName() != "VersorRigid3DTransform": |
| | rigid_only = sitk.VersorRigid3DTransform(rigid_only) |
| |
|
| | |
| | A = sitk.ImageRegistrationMethod() |
| | A.SetMetricAsMattesMutualInformation(50) |
| | A.SetMetricSamplingStrategy(A.RANDOM) |
| | A.SetMetricSamplingPercentage(0.1) |
| | A.SetInterpolator(sitk.sitkLinear) |
| | A.SetOptimizerAsRegularStepGradientDescent(learningRate=1.0, minStep=1e-4, numberOfIterations=150, relaxationFactor=0.5) |
| | A.SetOptimizerScalesFromPhysicalShift() |
| | A.SetShrinkFactorsPerLevel([4, 2, 1]) |
| | A.SetSmoothingSigmasPerLevel([2, 1, 0]); A.SmoothingSigmasAreSpecifiedInPhysicalUnitsOn() |
| | affine_init = sitk.AffineTransform(3) |
| | affine_init.SetMatrix(rigid_only.GetMatrix()) |
| | affine_init.SetTranslation(rigid_only.GetTranslation()) |
| | affine_init.SetCenter(rigid_only.GetCenter()) |
| | A.SetInitialTransform(affine_init, inPlace=False) |
| | affine_tx = A.Execute(fixed, moving) |
| |
|
| | |
| | registered_path = reg_dir / "new_in_prev_space.nii.gz" |
| | resampler = sitk.ResampleImageFilter() |
| | resampler.SetReferenceImage(fixed) |
| | resampler.SetInterpolator(sitk.sitkLinear) |
| | resampler.SetTransform(affine_tx) |
| | registered_img = resampler.Execute(moving) |
| | sitk.WriteImage(registered_img, str(registered_path)) |
| | return registered_path, affine_tx, fixed |
| |
|
| |
|
| | @spaces.GPU(duration=300) |
| | def run_flames_single(input_nii: Path, out_mask_path: Path, device: str = "cuda") -> Path: |
| | """Run FLAMeS (nnUNetv2) on a single input NIfTI and write a mask. Uses shared MODEL_ROOT.""" |
| | with (Path(input_nii).open("rb")): |
| | pass |
| | import tempfile |
| | with tempfile.TemporaryDirectory() as in_dir, tempfile.TemporaryDirectory() as out_dir: |
| | in_img = Path(in_dir) / "image_0000.nii.gz" |
| | shutil.copy2(input_nii, in_img) |
| | env = os.environ.copy() |
| | env["nnUNet_results"] = str(MODEL_ROOT) |
| | cmd = [ |
| | "nnUNetv2_predict", |
| | "-i", in_dir, "-o", out_dir, |
| | "-d", NNUNET_DS_ID, |
| | "-c", NNUNET_CFG, |
| | "-tr", NNUNET_TR, |
| | "-device", device |
| | ] |
| | run = subprocess.run(cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) |
| | if run.returncode != 0 and device == "cuda" and ALLOW_CPU_FALLBACK: |
| | cmd[-1] = "cpu" |
| | run = subprocess.run(cmd, env=env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) |
| | if run.returncode != 0: |
| | raise RuntimeError(f"nnUNetv2_predict failed:\n{run.stdout}") |
| | pred = Path(out_dir) / "image.nii.gz" |
| | if not pred.exists(): |
| | raise RuntimeError("nnUNet did not produce image.nii.gz") |
| | out_mask_path.parent.mkdir(parents=True, exist_ok=True) |
| | shutil.move(pred, out_mask_path) |
| | return out_mask_path |
| |
|
| |
|
| | def build_diff_maps(prev_mask: Path, new_mask_in_prev_space: Path, out_root: Path, |
| | dilate_prev_radius_vox: int = 1, |
| | min_lesion_vol_ml: float = 0.01, |
| | edge_buffer_vox_for_resolved: int = 1, |
| | use_26_connectivity: bool = False): |
| | out_dir = out_root / DIFF_SEG_DIR |
| | out_dir.mkdir(parents=True, exist_ok=True) |
| |
|
| | prev_img = sitk.ReadImage(str(prev_mask)) |
| | new_img = sitk.ReadImage(str(new_mask_in_prev_space)) |
| | prev_bin = sitk.Cast(sitk.BinaryThreshold(prev_img, 1, 1_000_000, 1, 0), sitk.sitkUInt8) |
| | new_bin = sitk.Cast(sitk.BinaryThreshold(new_img , 1, 1_000_000, 1, 0), sitk.sitkUInt8) |
| |
|
| | |
| | def geom_tuple(img): return (img.GetSize(), img.GetSpacing(), img.GetOrigin(), tuple(img.GetDirection())) |
| | if geom_tuple(prev_bin) != geom_tuple(new_bin): |
| | new_bin = sitk.Resample(new_bin, prev_bin, sitk.Transform(3, sitk.sitkIdentity), |
| | sitk.sitkNearestNeighbor, 0, sitk.sitkUInt8) |
| |
|
| | |
| | P_for_new = prev_bin |
| | if dilate_prev_radius_vox > 0: |
| | P_for_new = sitk.BinaryDilate(P_for_new, [dilate_prev_radius_vox]*3) |
| |
|
| | |
| | N_buffered = new_bin |
| | if edge_buffer_vox_for_resolved > 0: |
| | N_buffered = sitk.BinaryDilate(new_bin, [edge_buffer_vox_for_resolved]*3) |
| |
|
| | |
| | stable = sitk.And(prev_bin, new_bin) |
| | new_raw = sitk.And(new_bin, sitk.BinaryNot(P_for_new)) |
| | resolved_raw= sitk.And(prev_bin, sitk.BinaryNot(N_buffered)) |
| | xor_mask = sitk.Xor(prev_bin, new_bin) |
| |
|
| | |
| | sx, sy, sz = prev_bin.GetSpacing() |
| | voxel_ml = (sx * sy * sz) / 1000.0 |
| |
|
| | |
| | def connected_components(bin_img, fully_connected): |
| | cc_filter = sitk.ConnectedComponentImageFilter() |
| | if fully_connected: |
| | cc_filter.FullyConnectedOn() |
| | else: |
| | cc_filter.FullyConnectedOff() |
| | return cc_filter.Execute(bin_img) |
| |
|
| | def min_volume_filter(bin_img): |
| | if min_lesion_vol_ml <= 0: |
| | return sitk.Cast(bin_img > 0, sitk.sitkUInt8) |
| | cc = connected_components(bin_img, fully_connected=use_26_connectivity) |
| | stats = sitk.LabelShapeStatisticsImageFilter(); stats.Execute(cc) |
| | keep = sitk.Image(cc.GetSize(), sitk.sitkUInt8); keep.CopyInformation(cc) |
| | for lbl in stats.GetLabels(): |
| | if stats.GetNumberOfPixels(lbl) * voxel_ml >= min_lesion_vol_ml: |
| | keep = sitk.Or(keep, sitk.Equal(cc, lbl)) |
| | return sitk.Cast(keep > 0, sitk.sitkUInt8) |
| |
|
| | new_f = min_volume_filter(new_raw) |
| | resolved_f = min_volume_filter(resolved_raw) |
| |
|
| | paths = { |
| | "new_only": out_dir / "new_lesions_mask.nii.gz", |
| | "resolved_only": out_dir / "resolved_lesions_mask.nii.gz", |
| | "xor": out_dir / "xor_diff_mask.nii.gz", |
| | "stable": out_dir / "stable_overlap_mask.nii.gz", |
| | "combined": out_dir / "combined_label_map.nii.gz", |
| | "new_csv": out_dir / "new_lesions_summary.csv", |
| | "resolved_csv": out_dir / "resolved_lesions_summary.csv", |
| | } |
| |
|
| | sitk.WriteImage(new_f, str(paths["new_only"])) |
| | sitk.WriteImage(resolved_f, str(paths["resolved_only"])) |
| | sitk.WriteImage(xor_mask, str(paths["xor"])) |
| | sitk.WriteImage(stable, str(paths["stable"])) |
| |
|
| | combined = sitk.Image(prev_bin.GetSize(), sitk.sitkUInt8); combined.CopyInformation(prev_bin) |
| | combined = sitk.Add(combined, sitk.Multiply(stable, 1)) |
| | combined = sitk.Add(combined, sitk.Multiply(new_f, 2)) |
| | combined = sitk.Add(combined, sitk.Multiply(resolved_f, 3)) |
| | sitk.WriteImage(combined, str(paths["combined"])) |
| |
|
| | |
| | def write_csv(bin_img, csv_path: Path): |
| | cc = connected_components(bin_img, fully_connected=use_26_connectivity) |
| | rl = sitk.RelabelComponent(cc, sortByObjectSize=True) |
| | stats = sitk.LabelShapeStatisticsImageFilter(); stats.Execute(rl) |
| | with open(csv_path, "w", newline="") as f: |
| | w = csv.writer(f) |
| | w.writerow(["lesion_id","voxel_count","volume_mL","centroid_x_mm","centroid_y_mm","centroid_z_mm"]) |
| | for lbl in stats.GetLabels(): |
| | vox = stats.GetNumberOfPixels(lbl); vol_ml = vox * voxel_ml |
| | cx, cy, cz = stats.GetCentroid(lbl) |
| | w.writerow([int(lbl), int(vox), float(vol_ml), float(cx), float(cy), float(cz)]) |
| |
|
| | write_csv(new_f, paths["new_csv"]) |
| | write_csv(resolved_f, paths["resolved_csv"]) |
| | return paths |
| |
|
| |
|
| | |
| | def count_lesions_in_mask(mask_path: Path, |
| | min_lesion_vol_ml: float = 0.0, |
| | use_26_connectivity: bool = False) -> int: |
| | """ |
| | Count connected components in a binary lesion mask (values > 0). |
| | Applies a minimum lesion volume filter if > 0. |
| | """ |
| | img = sitk.ReadImage(str(mask_path)) |
| | bin_img = sitk.Cast(sitk.BinaryThreshold(img, 1, 1_000_000, 1, 0), sitk.sitkUInt8) |
| |
|
| | |
| | sx, sy, sz = bin_img.GetSpacing() |
| | voxel_ml = (sx * sy * sz) / 1000.0 |
| |
|
| | cc_filter = sitk.ConnectedComponentImageFilter() |
| | if use_26_connectivity: |
| | cc_filter.FullyConnectedOn() |
| | else: |
| | cc_filter.FullyConnectedOff() |
| | cc = cc_filter.Execute(bin_img) |
| |
|
| | stats = sitk.LabelShapeStatisticsImageFilter() |
| | stats.Execute(cc) |
| |
|
| | count = 0 |
| | for lbl in stats.GetLabels(): |
| | vox = stats.GetNumberOfPixels(lbl) |
| | vol_ml = vox * voxel_ml |
| | if vol_ml >= min_lesion_vol_ml: |
| | count += 1 |
| | return count |
| |
|
| |
|
| | def package_selected(job_dir: Path, |
| | prev_stripped: Path, |
| | registered_new: Path, |
| | diff_paths: Dict[str, Path], |
| | prev_mask_flames: Optional[Path] = None, |
| | new_mask_flames: Optional[Path] = None, |
| | zip_name: str = "outputs.zip") -> Path: |
| | """Zip key deliverables (optionally include FLAMeS masks).""" |
| | zpath = job_dir / zip_name |
| | to_add = [ |
| | prev_stripped, |
| | registered_new, |
| | diff_paths["new_only"], |
| | diff_paths["resolved_only"], |
| | diff_paths["xor"], |
| | diff_paths["stable"], |
| | diff_paths["combined"], |
| | ] |
| |
|
| | if prev_mask_flames is not None: |
| | to_add.append(prev_mask_flames) |
| | if new_mask_flames is not None: |
| | to_add.append(new_mask_flames) |
| |
|
| | with zipfile.ZipFile(zpath, "w", zipfile.ZIP_DEFLATED) as zf: |
| | for p in to_add: |
| | if not p.exists(): |
| | raise RuntimeError(f"Expected output missing: {p}") |
| | zf.write(p, p.relative_to(job_dir)) |
| | return zpath |
| |
|
| |
|
| | |
| | |
| | |
| | def _redact_paths(s: str) -> str: |
| | """Hide absolute paths from user-visible messages.""" |
| | for p in [str(JOBS_ROOT), str(MODEL_ROOT), str(BIN_ROOT), str(Path.home())]: |
| | if p: |
| | s = s.replace(p, "[redacted]") |
| | return s |
| |
|
| |
|
| | @spaces.GPU(duration=300) |
| | def run_pipeline(file1, file2, dilate_prev_radius_vox=1, min_lesion_vol_ml=0.01): |
| | """ |
| | file1: previous (baseline) FLAIR (.nii/.nii.gz or DICOM .zip) |
| | file2: new (follow-up) FLAIR (.nii/.nii.gz or DICOM .zip) |
| | Returns: (status_html, outputs_zip_path_or_None, report_html_update) |
| | """ |
| | if file1 is None or file2 is None: |
| | return ( |
| | "<div>⚠️ Please upload both the previous and the new scan.</div>", |
| | None, |
| | gr.update(value="", visible=False), |
| | ) |
| |
|
| | ensure_dirs() |
| | ensure_synthstrip_available() |
| | ensure_flames_model() |
| |
|
| | job_dir = (JOBS_ROOT / f"msjob_{uuid.uuid4().hex}").resolve() |
| | job_dir.mkdir(parents=True, exist_ok=True) |
| | incoming_dir = job_dir / "incoming" |
| | incoming_dir.mkdir(parents=True, exist_ok=True) |
| |
|
| | try: |
| | |
| | prev_up = Path(file1.name) |
| | new_up = Path(file2.name) |
| | prev_staged = stage_upload(prev_up, incoming_dir, "prev_upload") |
| | new_staged = stage_upload(new_up, incoming_dir, "new_upload") |
| |
|
| | prev_is_zip = is_zip(prev_staged) |
| | new_is_zip = is_zip(new_staged) |
| |
|
| | |
| | prev_nifti = convert_dicom_zip_to_nifti(prev_staged, job_dir) if prev_is_zip else prev_staged |
| | new_nifti = convert_dicom_zip_to_nifti(new_staged, job_dir) if new_is_zip else new_staged |
| |
|
| | |
| | stripped_dir = job_dir / STRIPPED_DIR |
| | prev_stripped, _ = skull_strip_synthstrip(prev_nifti, stripped_dir, "prev") |
| | new_stripped, _ = skull_strip_synthstrip(new_nifti, stripped_dir, "new") |
| |
|
| | |
| | reg_dir = job_dir / REGISTERED_DIR |
| | registered_path, _, _ = register_rigid_affine(prev_stripped, new_stripped, reg_dir) |
| | registered_path = registered_path.resolve() |
| |
|
| | |
| | seg_dir = job_dir / SEG_DIR; seg_dir.mkdir(parents=True, exist_ok=True) |
| | prev_mask_flames = seg_dir / "prev_flames_mask.nii.gz" |
| | new_mask_flames = seg_dir / "new_in_prev_space_flames_mask.nii.gz" |
| | run_flames_single(prev_stripped, prev_mask_flames, device="cuda") |
| | run_flames_single(registered_path, new_mask_flames, device="cuda") |
| |
|
| | |
| | diff_paths = build_diff_maps( |
| | prev_mask_flames, new_mask_flames, job_dir, |
| | dilate_prev_radius_vox=int(dilate_prev_radius_vox), |
| | min_lesion_vol_ml=float(min_lesion_vol_ml) |
| | ) |
| |
|
| | |
| | new_lesions_count = count_lesions_in_mask( |
| | diff_paths["new_only"], |
| | min_lesion_vol_ml=0.0, |
| | use_26_connectivity=False |
| | ) |
| | total_lesions_new_scan = count_lesions_in_mask( |
| | new_mask_flames, |
| | min_lesion_vol_ml=float(min_lesion_vol_ml), |
| | use_26_connectivity=False |
| | ) |
| |
|
| | |
| | outputs_zip = package_selected( |
| | job_dir, |
| | prev_stripped, |
| | registered_path, |
| | diff_paths, |
| | prev_mask_flames=prev_mask_flames, |
| | new_mask_flames=new_mask_flames |
| | ) |
| |
|
| | if not outputs_zip.exists(): |
| | raise RuntimeError("Packaging failed: outputs.zip not found.") |
| |
|
| | |
| | report_html = f""" |
| | <div> |
| | <h3>Textual report</h3> |
| | <ul> |
| | <li><strong>New lesions:</strong> {new_lesions_count}</li> |
| | <li><strong>Total lesions in new scan:</strong> {total_lesions_new_scan}</li> |
| | </ul> |
| | <div> |
| | Counts use the current minimum lesion volume filter of {float(min_lesion_vol_ml):.3f} mL (6-connectivity). |
| | </div> |
| | </div> |
| | """.strip() |
| |
|
| | status_html = "✅ Done. Download the results below to inspect them in your preferred viewer." |
| | return ( |
| | status_html, |
| | str(outputs_zip), |
| | gr.update(value=report_html, visible=True), |
| | ) |
| |
|
| | except Exception as e: |
| | msg = _redact_paths(str(e)) |
| | status_html = f"⚠️ Error: {msg}" |
| | return ( |
| | status_html, |
| | None, |
| | gr.update(value="", visible=False), |
| | ) |
| |
|
| |
|
| | |
| | |
| | |
| | with gr.Blocks( |
| | title=APP_NAME, |
| | analytics_enabled=False, |
| | theme=gr.themes.Default(), |
| | css=""" |
| | /* ----- Title styling ----- */ |
| | #title { |
| | text-align: center; |
| | margin-top: 1.5rem; |
| | margin-bottom: 2rem; |
| | font-size: clamp(2rem, 3vw, 2.8rem); |
| | font-weight: 700; |
| | letter-spacing: -0.4px; |
| | background: linear-gradient(90deg, #4cafef, #7affd6); |
| | -webkit-background-clip: text; |
| | -webkit-text-fill-color: transparent; |
| | } |
| | |
| | /* ----- Run pipeline button ----- */ |
| | #run_btn { |
| | display: block; |
| | margin: 28px auto 16px auto; |
| | background: linear-gradient(90deg, #4cafef, #7affd6); |
| | color: #000 !important; |
| | font-weight: 600 !important; |
| | font-size: 18px !important; |
| | border: none !important; |
| | border-radius: 10px !important; |
| | padding: 12px 28px !important; |
| | box-shadow: 0 4px 12px rgba(0, 0, 0, 0.15); |
| | transition: all 0.25s ease; |
| | text-transform: none !important; |
| | } |
| | |
| | #run_btn:hover { |
| | background: linear-gradient(90deg, #3da0df, #64eec8); |
| | transform: translateY(-1px); |
| | box-shadow: 0 6px 14px rgba(0, 0, 0, 0.18); |
| | color: #000 |
| | } |
| | |
| | /* ----- Status box ----- */ |
| | #status_box { |
| | overflow: visible !important; |
| | height: auto !important; |
| | max-height: none !important; |
| | min-height: 60px; |
| | padding: 14px 18px; |
| | border-radius: 10px; |
| | border: 1px solid var(--border-color-primary); |
| | background: var(--block-background-fill); |
| | font-size: 16px; |
| | line-height: 1.45; |
| | } |
| | |
| | /* ----- Info & reference sections ----- */ |
| | .info-section { |
| | font-size: 18px; |
| | line-height: 1.7; |
| | max-width: 1000px; |
| | margin: 0 auto 28px auto; |
| | padding: 24px 30px; |
| | border-radius: 14px; |
| | background-color: var(--block-background-fill); |
| | border: 1px solid var(--border-color-primary); |
| | box-shadow: 0 4px 14px rgba(0,0,0,0.06); |
| | } |
| | |
| | .info-section h3 { |
| | margin-top: 0; |
| | margin-bottom: 12px; |
| | font-size: 22px; |
| | font-weight: 600; |
| | color: #4cafef; |
| | letter-spacing: -0.3px; |
| | } |
| | |
| | .info-section p, .info-section li { |
| | color: var(--body-text-color); |
| | } |
| | |
| | .info-section ul { |
| | margin-top: 6px; |
| | margin-bottom: 6px; |
| | padding-left: 24px; |
| | list-style-type: disc; |
| | } |
| | |
| | .info-section code { |
| | background: var(--background-secondary); |
| | padding: 2px 5px; |
| | border-radius: 5px; |
| | font-size: 90%; |
| | } |
| | |
| | .info-section a { |
| | color: #4cafef; |
| | text-decoration: none; |
| | } |
| | |
| | .info-section a:hover { |
| | text-decoration: underline; |
| | } |
| | |
| | /* ----- Textual report styling ----- */ |
| | #report { |
| | font-size: 20px !important; |
| | line-height: 1.8 !important; |
| | color: var(--body_text_color, var(--body-text-color)); |
| | background: var(--block-background-fill); |
| | border: 1px solid var(--border-color-primary); |
| | border-radius: 16px; |
| | padding: 28px 34px; |
| | max-width: 1000px; |
| | margin: 0 auto 34px auto; |
| | box-shadow: 0 6px 18px rgba(0,0,0,0.08); |
| | } |
| | |
| | #report h3 { |
| | margin-top: 0; |
| | margin-bottom: 16px; |
| | font-size: 21px; |
| | font-weight: 700; |
| | color: #4cafef; |
| | letter-spacing: -0.3px; |
| | text-align: left; |
| | } |
| | |
| | #report ul { |
| | margin: 10px 0 0 22px; |
| | padding: 0; |
| | list-style-type: disc; |
| | } |
| | |
| | #report li { |
| | margin-bottom: 10px; |
| | font-size: 20px; |
| | line-height: 1.8; |
| | } |
| | |
| | #report strong { |
| | color: #4cafef; |
| | font-weight: 600; |
| | font-size: 20px; |
| | } |
| | |
| | #report .footnote { |
| | font-size: 16px; |
| | color: #999; |
| | margin-top: 14px; |
| | } |
| | """ |
| | ) as demo: |
| | |
| | gr.Markdown(f"# {APP_NAME}", elem_id="title") |
| |
|
| | |
| | gr.HTML( |
| | """ |
| | <div class="info-section"> |
| | <h3>Overview</h3> |
| | <p>This tool detects changes in <strong>multiple sclerosis (MS) lesions</strong> between two brain MRI scans.</p> |
| | |
| | <p>Input sequence must be <strong>isotropic 3D FLAIR</strong> in |
| | <code>.nii/.nii.gz</code> (NIfTI) or DICOM (<code>.zip</code>) format. <br> |
| | If DICOM is provided, images are automatically converted to NIfTI using |
| | <em>dcm2niix</em>.</p> |
| | |
| | <p>Processing includes skull stripping with <em>NiPreps SynthStrip</em> package, |
| | rigid/affine co-registration of the two scans with SimpleITK, |
| | and lesion segmentation using <em>FLAMeS</em> deep learning model.</p> |
| | |
| | <p>Lesion difference masks between the two scans are then calculated and made available for download.</p> |
| | |
| | <p><strong>Note: This application is a <em>research preview</em>. |
| | For clinical reporting, all results should be reviewed and validated by a qualified radiologist.</strong></p> |
| | </div> |
| | """ |
| | ) |
| |
|
| | |
| | gr.HTML( |
| | """ |
| | <div class="info-section"> |
| | <h3>How to use</h3> |
| | <ul> |
| | <li>Upload <em>previous (baseline)</em> and <em>new (follow-up)</em> |
| | isotropic 3D FLAIR scans (<code>.nii/.nii.gz</code> or DICOM <code>.zip</code>).</li> |
| | <li>Click <strong>Run pipeline</strong>. Processing time takes approximately 3 minutes on current hardware.</li> |
| | <li>After processing, download the ZIP file and open the NIfTI outputs in your preferred neuroimaging viewer |
| | (e.g. ITK-SNAP, FSLeyes, 3D Slicer) to inspect the lesions and overlays.</li> |
| | </ul> |
| | |
| | <p style="margin-top:16px;"><strong>Advanced options:</strong></p> |
| | <ul> |
| | <li><em>Dilate previous mask (voxels):</em> Expands the baseline lesion mask slightly |
| | to avoid missing small changes when comparing scans.</li> |
| | <li><em>Min lesion volume (mL):</em> Ignores very tiny spots below this volume, |
| | so that noise is not counted as lesions.</li> |
| | </ul> |
| | </div> |
| | """ |
| | ) |
| |
|
| | |
| | with gr.Row(): |
| | prev_in = gr.File(label="Previous (baseline) FLAIR (.nii/.nii.gz or DICOM .zip)") |
| | new_in = gr.File(label="New (follow-up) FLAIR (.nii/.nii.gz or DICOM .zip)") |
| |
|
| | with gr.Accordion("Advanced options", open=False): |
| | dil = gr.Slider(0, 3, value=1, step=1, label="Dilate previous mask (voxels)") |
| | minvol = gr.Slider(0.0, 0.2, value=0.01, step=0.005, label="Min lesion volume (mL)") |
| |
|
| | |
| | run_btn = gr.Button("Run pipeline", elem_id="run_btn") |
| |
|
| | status = gr.HTML(label="Status", elem_id="status_box") |
| | out_zip = gr.File(label="Download outputs (ZIP)") |
| | report = gr.HTML(visible=False, label="Textual report", elem_id="report") |
| |
|
| | run_btn.click( |
| | fn=run_pipeline, |
| | inputs=[prev_in, new_in, dil, minvol], |
| | outputs=[status, out_zip, report] |
| | ) |
| |
|
| | |
| | gr.HTML( |
| | """ |
| | <div class="info-section"> |
| | <h3>References</h3> |
| | <ol> |
| | <li> |
| | Li X, Morgan PS, Ashburner J, Smith J, Rorden C (2016). |
| | <em>The first step for neuroimaging data analysis: DICOM to NIfTI conversion.</em> |
| | <strong>J Neurosci Methods</strong> 264:47–56. |
| | <a href="https://doi.org/10.1016/j.jneumeth.2016.03.001" target="_blank">📄 DOI: 10.1016/j.jneumeth.2016.03.001</a> |
| | </li> |
| | |
| | <li> |
| | Hoopes A, Mora JS, Dalca AV, Fischl B*, Hoffmann M* (2022). |
| | <em>SynthStrip: Skull-Stripping for Any Brain Image.</em> |
| | <strong>NeuroImage</strong> 260:119474. |
| | <a href="https://doi.org/10.1016/j.neuroimage.2022.119474" target="_blank">📄 DOI: 10.1016/j.neuroimage.2022.119474</a> |
| | </li> |
| | |
| | <li> |
| | Dereskewicz E, La Rosa F, Dos Santos Silva J, et al. (2025). |
| | <em>FLAMeS: A Robust Deep Learning Model for Automated Multiple Sclerosis Lesion Segmentation.</em> |
| | <strong>medRxiv</strong>. |
| | <a href="https://doi.org/10.1101/2025.05.19.25327707" target="_blank">📄 DOI: 10.1101/2025.05.19.25327707</a> |
| | </li> |
| | </ol> |
| | </div> |
| | """ |
| | ) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | demo.launch( |
| | server_name="0.0.0.0", |
| | server_port=7860, |
| | allowed_paths=[str(JOBS_ROOT)], |
| | show_error=True |
| | ) |