SmartCBC / utils /analysis.py
KPrashanth's picture
Upload 15 files
2c2fc49 verified
# utils/analysis.py
from __future__ import annotations
import random
from datetime import datetime
from pathlib import Path
from typing import Dict, Tuple, Optional
import pandas as pd
# -------------------------------------------------
# GLOBALS & LABELS
# -------------------------------------------------
# IMPORTANT: This order must match the training/order used for your WBC classifier.
CLASS_NAMES = [
"neutrophil",
"eosinophil",
"basophil",
"lymphocyte",
"monocyte",
"immature_granulocyte",
"erythroblast",
"platelet",
]
# -------------------------------------------------
# CSV RANGE PARSING + REFERENCE HELPERS
# -------------------------------------------------
def _parse_range(txt: str | float | int | None) -> Tuple[Optional[float], Optional[float]]:
"""
Parse a 'low-high' textual range (e.g. '40-70') into (low, high).
Returns (None, None) if parsing fails.
"""
if txt is None or pd.isna(txt):
return (None, None)
s = str(txt).strip()
if not s:
return (None, None)
s = s.replace("approx.", "")
parts = [p.strip() for p in s.split("-") if p.strip()]
if len(parts) < 2:
return (None, None)
try:
low = float(parts[0])
high = float(parts[1])
return (low, high)
except ValueError:
return (None, None)
def load_reference(
age_group: str = "Adults (18-60y)",
gender: Optional[str] = None,
csv_path: str | Path = "",
) -> Dict:
"""
Load a single reference row from the WBC differential reference CSV,
filtered by age group and optionally gender.
Expected CSV columns (example):
- Age Group
- Gender
- Neutrophils % (Range)
- Lymphocytes % (Range)
- Monocytes % (Range)
- Eosinophils % (Range)
- Basophils % (Range)
- Immature Granulocytes %
- Infection Insights (High)
- Infection Insights (Low)
"""
if not csv_path:
raise ValueError("csv_path must be provided to load_reference().")
csv_path = Path(csv_path)
if not csv_path.exists():
raise FileNotFoundError(f"Reference CSV not found: {csv_path}")
df = pd.read_csv(csv_path)
sub = df[df["Age Group"].astype(str).str.strip() == age_group]
if sub.empty:
raise ValueError(f"Age group '{age_group}' not found in reference file.")
if gender:
g = gender.strip().upper()
sub2 = sub[sub["Gender"].astype(str).str.strip().str.upper() == g]
if not sub2.empty:
sub = sub2
return sub.iloc[0].to_dict()
# -------------------------------------------------
# AGE/GENDER HELPERS
# -------------------------------------------------
def map_age_to_group(age_years: float) -> str:
"""
Map a numeric age (in years) to an age-group label
as described in the WBC reference CSV.
"""
if age_years < 0.01: # ~0–3 days
return "Newborn (0-3d)"
if age_years < 0.1: # ~4–28 days
return "Infant (4-28d)"
if age_years < 2: # 1m–2y
return "Children (1m-2y)"
if age_years < 6:
return "Children (2-6y)"
if age_years < 12:
return "Children (6-12y)"
if age_years < 18:
return "Adolescents (12-18y)"
if age_years <= 60:
return "Adults (18-60y)"
return "Elderly (>60y)"
def pick_gender_for_group(
age_group: str,
csv_path: str | Path,
) -> Optional[str]:
"""
If gender is unknown, pick a valid gender for that age group
from the reference CSV. Returns 'M', 'F', or None.
"""
csv_path = Path(csv_path)
if not csv_path.exists():
return None
df = pd.read_csv(csv_path)
sub = df[df["Age Group"].astype(str).str.strip() == age_group]
if sub.empty:
return None
genders = (
sub["Gender"]
.dropna()
.astype(str)
.str.strip()
.unique()
.tolist()
)
if not genders:
return None
# If "M/F" is present, just pick randomly
if any("M/F" in g or "M / F" in g for g in genders):
return random.choice(["M", "F"])
return random.choice(genders)
# -------------------------------------------------
# DIFFERENTIAL & REPORT GENERATION
# -------------------------------------------------
def compute_differential_percentages(
wbc_subtypes: Dict[str, int],
) -> Dict[str, float]:
"""
Convert WBC subtype counts to percentages.
Returns a dict with percentages, keyed by subtype name.
"""
total = sum(wbc_subtypes.values()) if wbc_subtypes else 0
if total == 0:
return {k: 0.0 for k in wbc_subtypes.keys()}
return {
k: round((v / total) * 100.0, 1)
for k, v in wbc_subtypes.items()
}
def generate_report_from_ai(
ai_result: Dict,
age_group: str,
gender: Optional[str],
csv_path: str | Path,
) -> str:
"""
Generate a human-readable report text using:
- AI-derived results (coarse counts & WBC subtypes)
- Reference ranges from CSV for the given age group & gender.
ai_result expected keys:
- patient_id
- coarse_counts: {"WBC": int, "RBC": int, "Platelet": int}
- wbc_subtypes: {subtype_name: int}
- fovs_analyzed (optional)
- calibration (optional, dict with FOV area, constant)
- timestamp (optional)
"""
ref = load_reference(age_group=age_group, gender=gender, csv_path=csv_path)
patient_id = ai_result.get("patient_id", "UNKNOWN")
ts = ai_result.get("timestamp") or datetime.now().isoformat(timespec="seconds")
fovs = ai_result.get("fovs_analyzed", 0)
coarse = ai_result.get("coarse_counts", {}) or {}
subtypes = ai_result.get("wbc_subtypes", {}) or {}
calib = ai_result.get("calibration", {}) or {}
fov_area = calib.get("fov_area_mm2")
calib_const = calib.get("calibration_constant")
total_wbc = coarse.get("WBC", 0)
total_rbc = coarse.get("RBC", 0)
total_plt = coarse.get("Platelet", 0)
classified_total = sum(subtypes.values()) if subtypes else 0
def ai_pct(name: str) -> float:
if not classified_total:
return 0.0
return round((subtypes.get(name, 0) / classified_total) * 100.0, 1)
ai_neut = ai_pct("neutrophil")
ai_lymph = ai_pct("lymphocyte")
ai_mono = ai_pct("monocyte")
ai_eos = ai_pct("eosinophil")
ai_baso = ai_pct("basophil")
ai_ig = ai_pct("immature_granulocyte")
ai_ery = ai_pct("erythroblast")
# parse reference ranges from CSV columns
ref_neut_lo, ref_neut_hi = _parse_range(ref.get("Neutrophils % (Range)"))
ref_lymph_lo, ref_lymph_hi = _parse_range(ref.get("Lymphocytes % (Range)"))
ref_mono_lo, ref_mono_hi = _parse_range(ref.get("Monocytes % (Range)"))
ref_eos_lo, ref_eos_hi = _parse_range(ref.get("Eosinophils % (Range)"))
ref_baso_lo, ref_baso_hi = _parse_range(ref.get("Basophils % (Range)"))
ref_ig_txt = str(ref.get("Immature Granulocytes %", "")).lower()
# crude check — if "3" is mentioned, set 3% as an upper threshold
ref_ig_max = 3.0 if "3" in ref_ig_txt else None
high_note = ref.get("Infection Insights (High)", "")
low_note = ref.get("Infection Insights (Low)", "")
insights: list[str] = []
def check_range(label: str, ai_val: float, lo: Optional[float], hi: Optional[float]):
if lo is None or hi is None:
return
# Above reference range
if ai_val > hi and high_note:
insights.append(
f"- {label} {ai_val}% is above reference ({lo}-{hi}%). {high_note}"
)
# Below reference range
elif ai_val < lo and low_note:
insights.append(
f"- {label} {ai_val}% is below reference ({lo}-{hi}%). {low_note}"
)
check_range("Neutrophils", ai_neut, ref_neut_lo, ref_neut_hi)
check_range("Lymphocytes", ai_lymph, ref_lymph_lo, ref_lymph_hi)
check_range("Monocytes", ai_mono, ref_mono_lo, ref_mono_hi)
check_range("Eosinophils", ai_eos, ref_eos_lo, ref_eos_hi)
check_range("Basophils", ai_baso, ref_baso_lo, ref_baso_hi)
if ref_ig_max is not None and ai_ig > ref_ig_max:
insights.append(
f"- Immature granulocytes {ai_ig}% > allowed ({ref_ig_max}%), "
"suggesting left shift or active marrow response. Recommend manual review."
)
if ai_ery > 0:
insights.append(
f"- Erythroblasts detected ({ai_ery}%), unusual in normal peripheral blood → manual review recommended."
)
if classified_total < 100:
insights.append(
f"- Only {classified_total} WBCs classified; differential may be statistically unstable. "
"Consider reviewing more fields."
)
if not fov_area or not calib_const:
insights.append(
"- Absolute counts per µL are not reported (FOV area and calibration "
"constant not provided). Use results as qualitative screening."
)
# Assemble multi-line report
lines: list[str] = []
lines.append("AI-Assisted Peripheral Blood Smear Report (Prototype)")
lines.append("================================================================")
lines.append(f"Patient ID : {patient_id}")
lines.append(f"Date/Time : {ts}")
lines.append(f"Age Group (ref) : {age_group}")
if gender:
lines.append(f"Gender (ref) : {gender}")
if fovs:
lines.append(f"FOVs Analyzed : {fovs}")
lines.append("")
lines.append("1. Coarse Counts (sum over analyzed fields)")
lines.append(f" WBC : {total_wbc}")
lines.append(f" RBC : {total_rbc}")
lines.append(f" Platelets : {total_plt}")
lines.append("")
lines.append("2. AI Differential vs Reference")
lines.append(f" Neutrophils : {ai_neut}% (ref {ref.get('Neutrophils % (Range)')})")
lines.append(f" Lymphocytes : {ai_lymph}% (ref {ref.get('Lymphocytes % (Range)')})")
lines.append(f" Monocytes : {ai_mono}% (ref {ref.get('Monocytes % (Range)')})")
lines.append(f" Eosinophils : {ai_eos}% (ref {ref.get('Eosinophils % (Range)')})")
lines.append(f" Basophils : {ai_baso}% (ref {ref.get('Basophils % (Range)')})")
lines.append(f" Imm. granulocytes : {ai_ig}% (ref {ref.get('Immature Granulocytes %')})")
lines.append(f" Erythroblasts : {ai_ery}% (no reference range in file)")
lines.append("")
lines.append("3. Calibration")
lines.append(f" FOV area (mm²) : {fov_area if fov_area else 'not provided'}")
lines.append(f" Calibration constant : {calib_const if calib_const else 'not provided'}")
lines.append("")
lines.append("4. AI Insights")
if insights:
for msg in insights:
lines.append(f" {msg}")
else:
lines.append(" All AI-derived percentages fall within reference ranges for this age group.")
lines.append("")
lines.append(
"Method: YOLO-based detector (RBC/WBC/Platelet) + WBC subtype classifier "
"compared against age-/gender-specific reference ranges from CSV.\n"
"This is a research prototype and not a substitute for formal lab testing."
)
return "\n".join(lines)