import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import os
import numpy as np
import matplotlib
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
import streamlit as st
# Mahesh start
f_df= pd.read_csv('newborn_health_monitoring_with_risk.csv')
# drop apgar_score
f_df.drop(columns=['apgar_score'],inplace=True)
# get rid of risk level influence
# drop rows with null values
f_df.dropna(inplace=True)
# reset index
f_df.reset_index(inplace=True,drop=True)
f_df['risk_level'] = f_df['risk_level'].map({'At Risk':1,'Healthy':0})
risk_level = f_df['risk_level']
f_df.drop(columns=['risk_level'],inplace=True)
data_for_analysis = f_df.select_dtypes(include=['number'])
data_for_analysis.dropna(inplace=True)
data_for_analysis.reset_index(inplace=True,drop=True)
# we need to scale the data help the algorithm not to weight some "big" number data.
from sklearn.preprocessing import MinMaxScaler
scaler_minmax = MinMaxScaler()
data_minmax_scaled = scaler_minmax.fit_transform(data_for_analysis)
data_minmax_df = pd.DataFrame(data_minmax_scaled, columns=data_for_analysis.columns)
print("\nMinMax Scaling: Range = [0,1]")
print(f"birth_weight_kg - Min: {data_minmax_df['birth_weight_kg'].min():.3f}, Max: {data_minmax_df['birth_weight_kg'].max():.3f}")
# init PCA and feed data into it
from sklearn.decomposition import PCA
pca = PCA()
pca_results = pca.fit_transform(data_minmax_df)
pca_with_cluster = None
# Examine explained variance
explained_variance_ratio = pca.explained_variance_ratio_
cumulative_variance = np.cumsum(explained_variance_ratio)
print("Explained Variance by Component:")
for i in range(min(10, len(explained_variance_ratio))):
print(f"PC{i+1}: {explained_variance_ratio[i]:.3f} ({explained_variance_ratio[i]*100:.1f}%)")
print(f"\nFirst 3 components explain {cumulative_variance[2]*100:.1f}% of total variance")
print(f"First 5 components explain {cumulative_variance[4]*100:.1f}% of total variance")
# Mahesh end
st.set_page_config(page_title="PCA Variance Explorer", layout="wide")
# --- Expect these to be defined upstream in your notebook/session ---
# explained_variance_ratio: np.ndarray shape (n_components,)
# cumulative_variance: np.ndarray shape (n_components,) -> optional; will be computed if missing.
# If cumulative_variance isn't defined, compute it safely:
if "cumulative_variance" not in globals() or "explained_variance_ratio" not in globals():
st.error("Please ensure `explained_variance_ratio` is defined in the session before running this app.")
st.stop()
if "cumulative_variance" not in globals() or cumulative_variance is None:
cumulative_variance = np.cumsum(explained_variance_ratio)
pca_df = pd.DataFrame(pca_results[:, :3], columns=['PC1', 'PC2', 'PC3'])
# Attach key clinical fields for profiling (keep only those that exist)
attach_cols = [
'age_days', 'jaundice_level_mg_dl', 'feeding_frequency_per_day',
'stool_count', 'urine_output_count', 'weight_kg', 'length_cm',
'head_circumference_cm', 'oxygen_saturation', 'temperature_c',
'heart_rate_bpm', 'respiratory_rate_bpm'
]
attach_cols = [c for c in attach_cols if c in data_for_analysis.columns]
pca_df = pd.concat([pca_df, data_for_analysis[attach_cols]], axis=1)
from scipy.stats import pointbiserialr
st.set_page_config(page_title="Newborn EDA", layout="wide")
# --- Tabs ---
tabs = st.tabs(["Exploratory Data Analysis", "Variance Analysis", "risk_level vs. PCA spaces", "Cluster Visualization", "Summary"])
# =============== TAB 0: Exploratory Data Analysis ===============
with tabs[0]:
st.subheader("Exploratory Data Analysis")
# --- Load data ---
df = f_df
# --- Basic dataset info ---
st.markdown("### Dataset overview")
c1, c2, c3, c4 = st.columns(4)
with c1:
st.metric("Rows", len(df))
with c2:
st.metric("Columns", df.shape[1])
with c3:
num_cols = df.select_dtypes(include=[np.number]).columns.tolist()
st.metric("Numeric columns", len(num_cols))
with c4:
cat_cols = [c for c in df.columns if c not in num_cols]
st.metric("Non-numeric columns", len(cat_cols))
st.markdown("#### Peek at data")
st.dataframe(df.head(10), use_container_width=True)
# --- Baseline risk rate (overall and by health) ---
st.markdown("### Baseline risk rate")
overall_rate = risk_level.mean()
c1, c2 = st.columns(2)
c1.metric("Overall risk rate", f"{overall_rate:.2%}")
st.divider()
# --- Groupby health and risk, bar plot of average differences across columns ---
st.markdown("### Groupby health & risk: feature differences")
column_selector = st.selectbox(options=data_for_analysis.columns, label="Select column to analyze")
value_slider_range = st.slider(min_value=float(data_for_analysis[column_selector].min()), max_value=float(data_for_analysis[column_selector].max()), value=(float(data_for_analysis[column_selector].min()), float(data_for_analysis[column_selector].max())), step=0.1, label="Select value range to filter")
filtered_df = data_for_analysis[(data_for_analysis[column_selector] >= value_slider_range[0]) & (data_for_analysis[column_selector] <= value_slider_range[1])]
# using filtered df to calculate current average risk level
current_risk_level = risk_level[filtered_df.index]
current_risk_rate = current_risk_level.mean()
c2.metric(f"Risk rate for {column_selector} in [{value_slider_range[0]}, {value_slider_range[1]}]", f"{current_risk_rate:.2%}")
st.markdown(f"**Note:** Current risk rate is based on filtering `{column_selector}` in the range [{value_slider_range[0]}, {value_slider_range[1]}].")
# plot a streamlit bar chart that shows the average of each numeric column grouped by risk_level, but risk_level should be string healthy or at risk
avg_by_risk = pd.concat([filtered_df, current_risk_level], axis=1).groupby(current_risk_level).mean().T
avg_by_risk.columns = ['Healthy', 'At Risk']
st.bar_chart(avg_by_risk, use_container_width=True)
with tabs[1]:
st.subheader("Explained & Cumulative Variance")
# Slider controls the target cumulative variance percentage (horizontal line)
pct = st.slider(
"Target cumulative variance (%)",
min_value=50,
max_value=99,
value=80,
step=1,
help="Move the slider to set the target cumulative variance line."
)
thr = pct / 100.0
# How many components are needed to reach the threshold?
# np.searchsorted finds the first index where cumulative_variance >= thr
# +1 to convert zero-based index to component count
n_components_needed = int(np.searchsorted(cumulative_variance, thr) + 1)
# Limit to first 15 for the view (like your original)
k = int(min(15, len(explained_variance_ratio)))
x_axis = range(1, k + 1)
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
# Scree plot
ax1.plot(
x_axis,
explained_variance_ratio[:k],
'bo-',
linewidth=2,
markersize=8
)
ax1.set_xlabel('Principal Component')
ax1.set_ylabel('Explained Variance Ratio')
ax1.set_title('Scree Plot: Variance Explained by Each Component')
ax1.grid(True, alpha=0.3)
# Cumulative variance with dynamic threshold line
ax2.plot(
x_axis,
cumulative_variance[:k],
'ro-',
linewidth=2,
markersize=8
)
ax2.axhline(y=thr, color='gray', linestyle='--', alpha=0.8, label=f'{pct}% Variance')
ax2.set_xlabel('Number of Components')
ax2.set_ylabel('Cumulative Explained Variance')
ax2.set_title('Cumulative Variance Explained')
ax2.legend()
ax2.grid(True, alpha=0.3)
st.pyplot(fig)
st.markdown(
f"**Components needed to reach {pct}% variance:** `{n_components_needed}` "
f"(cumulative variance at that component: "
f"{cumulative_variance[n_components_needed-1]:.3f})"
)
st.subheader("Principal Component Loadings & Projection")
# Safety checks
missing = []
for name in ["pca", "explained_variance_ratio", "pca_results", "data_for_analysis"]:
if name not in globals():
missing.append(name)
if missing:
st.error(f"Missing variables in session: {', '.join(missing)}. Please run the PCA step first.")
st.stop()
# Build components_df (first up to 5 PCs)
n_avail_pcs = min(5, pca.components_.shape[0])
components_df = pd.DataFrame(
pca.components_[:n_avail_pcs].T,
columns=[f'PC{i + 1}' for i in range(n_avail_pcs)],
index=data_for_analysis.columns
)
st.markdown("**Principal Component Loadings (each variable’s contribution to each PC):**")
st.dataframe(components_df.round(3), use_container_width=True)
# Controls
st.markdown("**Display options**")
left, mid, right = st.columns([2, 1, 1])
with left:
top_k = st.slider(
"Top variables per PC by |loading|",
min_value=5,
max_value=min(25, components_df.shape[0]),
value=min(12, components_df.shape[0]),
help="Controls how many strongest-loading variables to show per PC."
)
with mid:
show_labels = st.checkbox("Label points in PC1–PC2 scatter", value=False)
with right:
max_labels = st.number_input(
"Max labels (if enabled)",
min_value=5, max_value=300, value=50, step=5,
help="Upper bound on how many points get text labels."
)
# Prepare figure (higher DPI for sharpness; constrained layout to reduce overlap)
fig, axes = plt.subplots(
2, 2,
figsize=(10, 9),
constrained_layout=True
)
# Helper to draw one PC’s horizontal bar chart
def draw_pc_bar(ax, pc_idx):
pc_name = f"PC{pc_idx + 1}"
if pc_name not in components_df.columns:
ax.axis("off")
return
series = components_df[pc_name]
# pick top-k by |loading|
series = series.reindex(series.abs().sort_values(ascending=False).head(top_k).index)
series = series.sort_values() # small->large for tidy barh
bars = ax.barh(range(len(series)), series.values, linewidth=0.6)
ax.set_yticks(range(len(series)))
ax.set_yticklabels(series.index, fontsize=8)
ax.invert_yaxis() # largest at top
var_pct = explained_variance_ratio[pc_idx] * 100 if pc_idx < len(explained_variance_ratio) else np.nan
ax.set_title(f'{pc_name} Loadings (Explains {var_pct:.1f}% of variance)')
ax.axvline(x=0, linestyle='-', alpha=0.35, linewidth=0.8)
ax.margins(y=0.02)
ax.grid(True, alpha=0.25, linewidth=0.5)
# Subtle edges for crispness
for b in bars:
b.set_edgecolor('black')
b.set_linewidth(0.4)
b.set_alpha(0.9)
# PC1, PC2, PC3 loadings (only if available)
draw_pc_bar(axes[0, 0], 0)
draw_pc_bar(axes[0, 1], 1)
draw_pc_bar(axes[1, 0], 2)
# PC1 vs PC2 scatter
ax = axes[1, 1]
if pca_results.shape[1] >= 2:
# leaner markers for a less "vivid" look
ax.scatter(
pca_results[:, 0],
pca_results[:, 1],
s=14, alpha=0.7, linewidths=0.3
)
ax.set_xlabel('PC1')
ax.set_ylabel('PC2')
ax.set_title('Samples in PC1–PC2 Space')
ax.grid(True, alpha=0.25, linewidth=0.5)
# optional labels (limit collisions by labeling the most "extreme" points)
if show_labels:
try:
# priority by radial distance from origin in PC1-PC2 plane
xy = pca_results[:, :2]
dist = np.sqrt((xy ** 2).sum(axis=1))
order = np.argsort(-dist)[: int(max_labels)]
labels = getattr(data_for_analysis, "index", pd.RangeIndex(len(pca_results)))
# alternate slight offsets to reduce collisions
offsets = [(3, 2), (-3, -2), (4, -2), (-4, 2)]
for i, idx in enumerate(order):
dx, dy = offsets[i % len(offsets)]
ax.annotate(
str(labels[idx]),
(xy[idx, 0], xy[idx, 1]),
xytext=(dx, dy),
textcoords="offset points",
fontsize=7,
alpha=0.9,
ha="left", va="bottom"
)
except Exception:
pass
else:
ax.axis("off")
st.warning("pca_results has fewer than 2 components; cannot draw PC1–PC2 scatter.")
# Apply tight layout as a final pass
try:
fig.tight_layout()
except Exception:
pass
st.pyplot(fig, clear_figure=True, use_container_width=True)
with tabs[2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
st.subheader("Validation: risk_level vs. PCA spaces")
# --- Safety checks ---
missing = []
for name in ["pca_df", "risk_level"]:
if name not in globals():
missing.append(name)
if missing:
st.error(f"Missing variables in session: {', '.join(missing)}. "
"Please ensure PCA results (`pca_df`) and `risk_level` are defined.")
st.stop()
# Align a copy and attach risk_level JUST FOR VALIDATION
_pca_df = pca_df.copy()
try:
_pca_df["risk_level"] = pd.Series(risk_level, index=_pca_df.index).values
except Exception as e:
st.error(f"Failed to align `risk_level` with `pca_df`: {e}")
st.stop()
# Basic sanity + info
st.markdown("**risk_level value counts** (for validation only):")
st.write(_pca_df["risk_level"].value_counts(dropna=False))
classes = sorted(_pca_df["risk_level"].dropna().unique())
is_binary = (len(classes) == 2)
baseline_prevalence = None
if is_binary:
try:
baseline_prevalence = float((_pca_df["risk_level"] == 1).mean())
except Exception:
pass
c1, c2, c3 = st.columns(3)
with c1:
st.metric("Samples", len(_pca_df))
with c2:
st.metric("Distinct risk classes", len(classes))
with c3:
st.metric("Baseline risk prevalence", f"{baseline_prevalence:.2%}" if baseline_prevalence is not None else "—")
st.info("`risk_level` is reused **only** for validation/visualization. Do **not** feed it into model training.")
# --- Controls ---
st.markdown("**Display options**")
cc1, cc2, cc3, cc4 = st.columns([1,1,1,1])
with cc1:
pt_size = st.slider("Scatter point size", 6, 24, 12, step=2)
with cc2:
pt_alpha = st.slider("Scatter alpha", 0.2, 1.0, 0.7, step=0.1)
with cc3:
grid_size = st.slider("Hexbin gridsize", 10, 80, 40, step=2)
with cc4:
vmax_override = st.checkbox("Clamp hexbin to [0,1]", value=True)
# -------------------- Side-by-side: Scatter & Hexbin --------------------
st.markdown("### PC1 × PC2 — Scatter vs. Hexbin (side-by-side)")
col_scatter, col_hex = st.columns(2, gap="medium")
# Data
xvals = _pca_df["PC1"].values
yvals = _pca_df["PC2"].values
cvals = _pca_df["risk_level"].values
# Scatter (left)
with col_scatter:
fig1, ax1 = plt.subplots(figsize=(6, 5), dpi=150, constrained_layout=True)
if is_binary:
cmap = ListedColormap(["#7aa2ff", "#ff7a7a"])
sc = ax1.scatter(
xvals, yvals,
c=(cvals > 0).astype(int),
s=pt_size, alpha=pt_alpha, cmap=cmap, linewidths=0.3
)
cbar = fig1.colorbar(sc, ax=ax1, ticks=[0, 1])
cbar.ax.set_yticklabels(["Non-risk (0)", "Risk (1)"])
else:
sc = ax1.scatter(xvals, yvals, c=cvals, s=pt_size, alpha=pt_alpha, linewidths=0.3)
cbar = fig1.colorbar(sc, ax=ax1)
cbar.set_label("risk_level")
ax1.axvline(0, lw=0.8, ls='--', alpha=0.6)
ax1.axhline(0, lw=0.8, ls='--', alpha=0.6)
ax1.set_xlabel("PC1")
ax1.set_ylabel("PC2")
ax1.set_title("PC1 × PC2 Colored by risk_level")
ax1.grid(True, alpha=0.25, linewidth=0.5)
st.pyplot(fig1, use_container_width=True, clear_figure=True)
# Hexbin (right)
with col_hex:
df_hex = _pca_df.dropna(subset=["PC1", "PC2", "risk_level"]).copy()
x = df_hex["PC1"].values
y = df_hex["PC2"].values
z = df_hex["risk_level"].values
fig2, ax2 = plt.subplots(figsize=(6, 5), dpi=150, constrained_layout=True)
hb = ax2.hexbin(
x, y,
C=z,
reduce_C_function=np.mean,
gridsize=grid_size,
mincnt=1,
cmap="viridis",
vmin=0 if vmax_override else None,
vmax=1 if vmax_override else None,
)
cbar2 = fig2.colorbar(hb, ax=ax2)
cbar2.set_label("High-risk prevalence")
ax2.axvline(0, lw=0.8, ls='--', color='k', alpha=0.4)
ax2.axhline(0, lw=0.8, ls='--', color='k', alpha=0.4)
ax2.set_xlabel("PC1")
ax2.set_ylabel("PC2")
ax2.set_title("Risk prevalence in PC1 × PC2 (hexbin mean of 0/1)")
ax2.grid(True, alpha=0.2, linewidth=0.4)
st.pyplot(fig2, use_container_width=True, clear_figure=True)
# -------------------- Boxplots of PCs by risk_level --------------------
st.markdown("### Distribution of PCs by `risk_level` (boxplots)")
pc_candidates = [c for c in _pca_df.columns if c.startswith("PC")]
default_pcs = [c for c in ["PC1", "PC2", "PC3"] if c in pc_candidates]
sel_pcs = st.multiselect(
"Select PCs to plot",
options=pc_candidates,
default=default_pcs if default_pcs else pc_candidates[:3]
)
if len(sel_pcs) == 0:
st.info("Select at least one PC to plot.")
else:
fig3, axes3 = plt.subplots(
1, len(sel_pcs),
figsize=(4.5 * len(sel_pcs), 4.2),
dpi=150,
constrained_layout=True
)
axes_list = [axes3] if len(sel_pcs) == 1 else list(axes3)
for ax, pc in zip(axes_list, sel_pcs):
try:
_pca_df.boxplot(column=pc, by="risk_level", ax=ax)
ax.set_title(f"{pc} by risk_level")
ax.set_xlabel("risk_level")
ax.set_ylabel(pc)
ax.grid(True, alpha=0.25, linewidth=0.5)
except Exception as e:
ax.axis("off")
ax.set_title(f"Failed to plot {pc}: {e}")
try:
fig3.suptitle("")
fig3.tight_layout()
except Exception:
pass
st.pyplot(fig3, use_container_width=True, clear_figure=True)
# =============== TAB 3: PCA 3D Visualization (side-by-side + comparison coloring) ===============
with tabs[3]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import streamlit as st
import plotly.express as px
import plotly.graph_objects as go
from matplotlib.colors import ListedColormap
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans, AgglomerativeClustering
from sklearn.metrics import silhouette_score, silhouette_samples
st.subheader("PCA 2D + 3D Visualization (risk-free clustering)")
# -------- Safety checks --------
# --- Optional columns detection for comparison modes ---
def detect_col(df, candidates):
for c in candidates:
if c in df.columns:
return c
return None
health_col = detect_col(pca_df, ["health", "health_status", "status", "healthgroup", "health_group"])
# risk_level may live outside pca_df; we’ll try both
risk_series = None
try:
risk_series = pd.Series(risk_level, index=pca_df.index)
except Exception:
# maybe it's already inside pca_df
if "risk_level" in pca_df.columns:
risk_series = pca_df["risk_level"]
# -------- Clustering controls --------
st.markdown("**Clustering setup**")
cc1, cc2, cc3 = st.columns(3)
with cc1:
cum_target = st.slider("Target cumulative variance for PCs", 0.60, 0.99, 0.85, 0.01,
help="Use PCs until cumulative explained variance ≥ this value (capped at 6 PCs).")
with cc2:
k_min, k_max = st.select_slider("K range (min,max)", options=list(range(2, 11)), value=(2, 8))
with cc3:
st.write("") # spacer
st.caption("Both KMeans and Ward are evaluated; best silhouette is selected.")
# --- Choose PCs (cap at 6) ---
try:
evr = np.asarray(explained_variance_ratio)
cum = evr.cumsum()
n_pc = min(int(np.searchsorted(cum, cum_target) + 1),
min(6, pca_df.filter(like='PC').shape[1]))
except Exception:
n_pc = min(3, pca_df.filter(like='PC').shape[1])
pc_cols = [f'PC{i}' for i in range(1, n_pc + 1)]
st.caption(f"Using **{n_pc} PCs**: {pc_cols}")
# --- Scale and select best clustering by silhouette ---
X = StandardScaler().fit_transform(pca_df[pc_cols].values)
rows = []
best = {'method': None, 'K': None, 'sil': -1, 'labels': None, 'model': None}
for K in range(k_min, k_max + 1):
# KMeans
km = KMeans(n_clusters=K, n_init=50, random_state=42)
lab_km = km.fit_predict(X)
sil_km = silhouette_score(X, lab_km)
rows.append(['kmeans', K, sil_km])
if sil_km > best['sil']:
best.update({'method': 'kmeans', 'K': K, 'sil': sil_km, 'labels': lab_km, 'model': km})
# Ward
ag = AgglomerativeClustering(n_clusters=K, linkage='ward')
lab_wd = ag.fit_predict(X)
sil_wd = silhouette_score(X, lab_wd)
rows.append(['ward', K, sil_wd])
if sil_wd > best['sil']:
best.update({'method': 'ward', 'K': K, 'sil': sil_wd, 'labels': lab_wd, 'model': ag})
sel_df = pd.DataFrame(rows, columns=['method', 'K', 'silhouette']).pivot(
index='K', columns='method', values='silhouette'
)
st.markdown("**Silhouette by K (higher is better)**")
st.dataframe(sel_df.round(3), use_container_width=True)
st.success(
f"Selected → **{best['method'].upper()}** with **K={best['K']}** "
f"(silhouette = **{best['sil']:.3f}**) on **{n_pc} PCs**"
)
# Work on a copy with cluster labels
df_vis = pca_df.copy()
df_vis['cluster'] = best['labels']
pca_with_cluster = df_vis.copy()
# make df_vis global useage
globals()['_df_vis'] = df_vis
# --- Quality and profiling (expanders) ---
sil_vals = silhouette_samples(X, df_vis['cluster'].values)
sil_summary = (
pd.DataFrame({'cluster': df_vis['cluster'], 'sil': sil_vals})
.groupby('cluster')['sil'].agg(['count', 'mean', 'median', 'min', 'max'])
.round(3)
)
with st.expander("Silhouette summary by cluster"):
st.dataframe(sil_summary, use_container_width=True)
clinical_cols = [
'age_days','jaundice_level_mg_dl','feeding_frequency_per_day','stool_count','urine_output_count',
'weight_kg','length_cm','head_circumference_cm','temperature_c','heart_rate_bpm','respiratory_rate_bpm'
]
clinical_cols = [c for c in clinical_cols if c in df_vis.columns]
if clinical_cols:
prof = df_vis.groupby('cluster')[clinical_cols + pc_cols].median().round(2)
with st.expander("Cluster median profiles (clinical + PCs)"):
st.dataframe(prof, use_container_width=True)
# -------- Coloring mode controls --------
st.markdown("**Color by**")
col_left, col_right = st.columns([1, 2])
with col_left:
color_mode = st.radio(
"Select",
["Cluster", "Risk vs Non-risk"],
index=0,
help="Comparison modes use red/black only."
)
with col_right:
point_size = st.slider("Point size", 3, 12, 5)
point_alpha = st.slider("Point alpha", 0.2, 1.0, 0.85, 0.05)
# Build color arrays for comparison modes (red/black only)
# red = 'positive' class; black = others
red_black = np.array(["black", "red"])
# Helper: get risk binary (0/1) if available
risk_bin = None
if color_mode == "Risk vs Non-risk":
if risk_series is None:
st.warning("`risk_level` not found; falling back to cluster colors.")
color_mode = "Cluster"
else:
# Try to coerce to 0/1
r = risk_series.copy()
if not np.issubdtype(r.dtype, np.number):
mapping = {
"yes": 1, "y": 1, "true": 1, "t": 1, "risk": 1, "at_risk": 1, "high": 1, "1": 1,
"no": 0, "n": 0, "false": 0, "f": 0, "non-risk": 0, "low": 0, "0": 0
}
r = r.astype(str).str.lower().map(mapping).fillna(0).astype(int)
else:
r = (r > 0).astype(int)
risk_bin = r.reindex(df_vis.index).astype(int)
# -------- Side-by-side plots --------
p2d, p3d = st.columns(2, gap="large")
# === 2D matplotlib (PC1 vs PC2) ===
with p2d:
st.markdown("**PC1 vs PC2 (2D)**")
fig2d, ax2d = plt.subplots(figsize=(5.6, 5.0), dpi=160, constrained_layout=True)
if color_mode == "Cluster":
sc = ax2d.scatter(df_vis['PC1'], df_vis['PC2'],
c=df_vis['cluster'], s=point_size*6, alpha=point_alpha, cmap='tab10', linewidths=0.3)
cbar = fig2d.colorbar(sc, ax=ax2d)
cbar.set_label("cluster")
elif color_mode == "Risk vs Non-risk":
colors = red_black[risk_bin.values]
ax2d.scatter(df_vis['PC1'], df_vis['PC2'],
c=colors, s=point_size*6, alpha=point_alpha, linewidths=0.3)
# legend proxies
ax2d.scatter([], [], c="red", label="Risk (1)")
ax2d.scatter([], [], c="black", label="Non-risk (0)")
ax2d.legend(loc="best", frameon=False)
else: # Health (choose group)
pass
ax2d.axvline(0, ls='--', lw=0.8, color='grey'); ax2d.axhline(0, ls='--', lw=0.8, color='grey')
ax2d.set_xlabel('PC1'); ax2d.set_ylabel('PC2')
ax2d.grid(True, alpha=0.25, linewidth=0.5)
st.pyplot(fig2d, use_container_width=True, clear_figure=True)
# === 3D Plotly (PC1, PC2, PC3) ===
with p3d:
st.markdown("**PC1–PC3 (3D)**")
if all(col in df_vis.columns for col in ['PC1', 'PC2', 'PC3']):
df3d = df_vis.copy()
hover_cols = [
'age_days','jaundice_level_mg_dl','feeding_frequency_per_day','stool_count','urine_output_count',
'weight_kg','length_cm','head_circumference_cm','temperature_c','heart_rate_bpm','respiratory_rate_bpm'
]
hover_cols = [c for c in hover_cols if c in df3d.columns]
if color_mode == "Cluster":
fig = px.scatter_3d(
df3d, x='PC1', y='PC2', z='PC3',
color=df3d['cluster'].astype(str),
hover_data=hover_cols,
opacity=point_alpha,
title=f'Clusters in PCA space — {best["method"].upper()} (K={best["K"]})'
)
fig.update_traces(marker=dict(size=point_size))
# optional centroids for KMeans only
try:
if best['method'] == 'kmeans' and len(pc_cols) >= 3:
scaler_pc = StandardScaler().fit(df_vis[pc_cols].values)
centers_pc = scaler_pc.inverse_transform(best['model'].cluster_centers_)
# pad if fewer than 3 PCs used
if centers_pc.shape[1] < 3:
pad = np.zeros((centers_pc.shape[0], 3 - centers_pc.shape[1]))
centers_pc = np.hstack([centers_pc, pad])
fig.add_trace(go.Scatter3d(
x=centers_pc[:, 0], y=centers_pc[:, 1], z=centers_pc[:, 2],
mode='markers+text',
marker=dict(size=max(point_size+3, 6), color='black'),
text=[f'C{i}' for i in range(centers_pc.shape[0])],
textposition='top center',
name='Centroids'
))
except Exception:
pass
elif color_mode == "Risk vs Non-risk":
if risk_bin is None:
st.warning("`risk_level` not found; cannot show risk comparison.")
st.stop()
color_list = red_black[risk_bin.values]
fig = go.Figure(data=[go.Scatter3d(
x=df3d['PC1'], y=df3d['PC2'], z=df3d['PC3'],
mode='markers',
marker=dict(size=point_size, opacity=point_alpha, color=color_list),
text=None,
hovertemplate="PC1: %{x:.3f}
PC2: %{y:.3f}
PC3: %{z:.3f}"
)])
fig.update_layout(
title="Risk vs Non-risk (red/black)",
showlegend=False
)
else: # Health (choose group)
pass
fig.update_layout(
legend_title_text='cluster' if color_mode == "Cluster" else None,
scene=dict(xaxis_title='PC1', yaxis_title='PC2', zaxis_title='PC3'),
margin=dict(l=0, r=0, t=40, b=0),
height=520
)
st.plotly_chart(fig, use_container_width=True)
else:
st.warning("Need PC1, PC2, and PC3 to render the 3D scatter.")
# =============== TAB 4: Summary & Decision Support ===============
with tabs[4]:
import numpy as np
import pandas as pd
import streamlit as st
st.subheader("Summary of Findings & Decision Support")
# ---- Safety / inputs ----
if 'pca_df' not in globals() or 'cluster' not in pca_with_cluster.columns:
st.error("Missing `pca_df` with `cluster` labels. Please run clustering first (Tab 3).")
st.stop()
# risk_level may be external or already in pca_df
risk_series = None
try:
risk_series = pd.Series(risk_level, index=pca_with_cluster.index)
except Exception:
if "risk_level" in pca_with_cluster.columns:
risk_series = pca_with_cluster["risk_level"]
if risk_series is None:
st.warning("`risk_level` not found. Summary will exclude risk rates.")
# Minimal info
st.dataframe(
pca_with_cluster['cluster'].value_counts().rename_axis('cluster')
.to_frame('count').sort_index(),
use_container_width=True
)
st.stop()
# ---- Normalize risk to 0/1 (just in case) ----
r = risk_series.copy()
if not np.issubdtype(r.dtype, np.number):
mapping = {
"yes": 1, "y": 1, "true": 1, "t": 1, "risk": 1, "at_risk": 1, "high": 1, "1": 1,
"no": 0, "n": 0, "false": 0, "f": 0, "non-risk": 0, "low": 0, "0": 0
}
r = r.astype(str).str.lower().map(mapping).fillna(0).astype(int)
else:
r = (r > 0).astype(int)
r = r.reindex(pca_with_cluster.index)
df = pca_with_cluster.copy()
df["risk_bin"] = r
# ---- KPIs ----
total_n = len(df)
baseline = df["risk_bin"].mean() if total_n else 0.0
k_clusters = int(df["cluster"].nunique())
c1, c2, c3 = st.columns(3)
c1.metric("Total samples", f"{total_n:,}")
c2.metric("Baseline risk rate", f"{baseline:.2%}")
c3.metric("Number of clusters", f"{k_clusters}")
# ---- Risk by cluster ----
grp = df.groupby("cluster").agg(
count=("risk_bin", "size"),
risk_rate=("risk_bin", "mean")
).sort_index()
grp["expected_at_risk"] = (grp["risk_rate"] * grp["count"]).round(1)
# Lift vs baseline (handle baseline 0)
grp["lift_vs_baseline"] = np.where(
baseline > 0,
grp["risk_rate"] / baseline,
np.nan
)
# Priority tiers (tweak thresholds if you like)
def assign_priority(lift):
if np.isnan(lift):
return "Unknown"
if lift >= 2.0:
return "Critical"
if lift >= 1.5:
return "High"
if lift >= 1.2:
return "Medium"
return "Routine"
grp["priority"] = grp["lift_vs_baseline"].apply(assign_priority)
# Rank table for quick decisions
summary = grp.sort_values(["priority", "lift_vs_baseline", "risk_rate"], ascending=[True, False, False])
st.markdown("### Cluster Risk Summary")
st.dataframe(
summary.style.format({
"risk_rate": "{:.2%}",
"lift_vs_baseline": "{:.2f}"
}),
use_container_width=True
)
# ---- Callouts / insights ----
st.markdown("### Insights")
hi = grp.sort_values("risk_rate", ascending=False).head(1)
hi_cluster = hi.index[0]
hi_rate = float(hi["risk_rate"].iloc[0])
hi_lift = float(hi["lift_vs_baseline"].iloc[0])
bullets = []
bullets.append(f"- **Baseline risk rate** is **{baseline:.2%}** across **{total_n:,}** samples and **{k_clusters}** clusters.")
bullets.append(f"- **Highest-risk cluster** is **Cluster {hi_cluster}** at **{hi_rate:.2%}** (lift **{hi_lift:.2f}×**).")
# “twice the baseline” trigger
twice = grp[grp["lift_vs_baseline"] >= 2.0].index.tolist()
if twice:
bullets.append(f"- ⚠️ Cluster(s) **{', '.join(map(str, twice))}** show **≥2×** the baseline risk — prioritize immediate monitoring.")
# Which clusters are High / Critical
focus = summary[summary["priority"].isin(["Critical", "High"])].index.tolist()
if focus:
bullets.append(f"- **Focus tiers**: {', '.join(map(lambda x: 'Cluster '+str(x), focus))} require elevated attention.")
st.markdown("\n".join(bullets))
# ---- Monitoring plan (data-driven) ----
st.markdown("### Monitoring Priority Plan")
st.caption("Suggested action levels derived from risk lift vs. baseline (configurable thresholds).")
plan = summary.reset_index().rename(columns={"index": "cluster"})
plan["monitoring_level"] = plan["priority"].map({
"Critical": "Level 1: Intensive (real-time alerts, frequent checks)",
"High": "Level 2: Enhanced (daily checks, targeted interventions)",
"Medium": "Level 3: Standard+ (periodic checks, watchlist)",
"Routine": "Level 4: Routine (baseline monitoring)",
"Unknown": "Assess data quality (missing baseline)"
})
plan_display = plan[["cluster", "count", "risk_rate", "lift_vs_baseline", "priority", "monitoring_level"]]
st.dataframe(
plan_display.style.format({"risk_rate":"{:.2%}", "lift_vs_baseline":"{:.2f}"}),
use_container_width=True
)
# ---- Quick decision numbers ----
st.markdown("### Quick Numbers for Decisions")
q1, q2, q3 = st.columns(3)
top2 = grp.sort_values("risk_rate", ascending=False).head(2)
total_expected = grp["expected_at_risk"].sum()
q1.metric("Expected at-risk (sum)", f"{int(round(total_expected))}")
q2.metric("Top-1 cluster expected at-risk", f"{int(round(float(top2['expected_at_risk'].iloc[0])))}")
if len(top2) > 1:
q3.metric("Top-2 cumulative expected at-risk", f"{int(round(top2['expected_at_risk'].sum()))}")
else:
q3.metric("Top-2 cumulative expected at-risk", "—")
# ---- Optional download ----
st.download_button(
"Download cluster summary (CSV)",
data=summary.reset_index().to_csv(index=False),
file_name="cluster_risk_summary.csv",
mime="text/csv"
)
# ---- Narrative (brief, editable) ----
st.markdown("### One-paragraph Executive Summary")
st.write(
f"Using PCA features and silhouette-selected clustering, we observe a baseline risk of **{baseline:.2%}**. "
f"**Cluster {hi_cluster}** exhibits the highest risk (**{hi_rate:.2%}**, **{hi_lift:.2f}×** baseline). "
f"Clusters classified as **Critical/High** should receive prioritized monitoring and interventions, while "
f"Medium/Routine tiers remain on standard follow-up. This stratification supports resource allocation "
f"and early alerts for newborns most likely to be at risk."
)