Spaces:
Sleeping
Sleeping
| # ST_TOC — Total Organic Carbon Estimation Using AI | |
| # Abbrev-only UI + model-order-safe predictions (bypass sklearn feature-name check) | |
| import io, json, os, base64, math | |
| from pathlib import Path | |
| from datetime import datetime | |
| import streamlit as st | |
| import pandas as pd | |
| import numpy as np | |
| import joblib | |
| # Matplotlib (preview + cross-plot) | |
| import matplotlib | |
| matplotlib.use("Agg") | |
| import matplotlib.pyplot as plt | |
| from matplotlib.ticker import FuncFormatter | |
| import plotly.graph_objects as go | |
| from sklearn.metrics import mean_squared_error | |
| # ========================= | |
| # Constants / Defaults | |
| # ========================= | |
| APP_NAME = "ST_TOC" | |
| TAGLINE = "Total Organic Carbon Estimation Using AI" | |
| # UI feature list (abbreviations only) | |
| FEATURES = ["AHT90", "DT", "GR", "K", "RHOB", "TNPH", "Th", "Ur"] | |
| TARGET = "TOC" | |
| PRED_COL = "TOC_Pred" | |
| MODELS_DIR = Path("models") | |
| DEFAULT_MODEL = MODELS_DIR / "toc_rf.joblib" | |
| MODEL_FALLBACKS = [MODELS_DIR / "model.joblib", MODELS_DIR / "model.pkl"] | |
| COLORS = {"pred": "#1f77b4", "actual": "#f2b702", "ref": "#5a5a5a"} | |
| STRICT_VERSION_CHECK = False # optional env banner | |
| # ---- Plot sizing ---- | |
| CROSS_W = 350 | |
| CROSS_H = 350 | |
| TRACK_H = 1000 | |
| TRACK_W = 600 | |
| FONT_SZ = 13 | |
| BOLD_FONT = "Arial Black, Arial, sans-serif" | |
| # ========================= | |
| # Page / CSS | |
| # ========================= | |
| st.set_page_config(page_title=APP_NAME, page_icon="logo.png", layout="wide") | |
| st.markdown(""" | |
| <style> | |
| .brand-logo { width: 200px; height: auto; object-fit: contain; } | |
| .centered-container { display: flex; flex-direction: column; align-items: center; text-align: center; } | |
| .st-message-box { background-color: #f0f2f6; color: #333; padding: 10px; border-radius: 10px; border: 1px solid #e6e9ef; } | |
| .st-message-box.st-success { background-color: #d4edda; color: #155724; border-color: #c3e6cb; } | |
| .st-message-box.st-warning { background-color: #fff3cd; color: #856404; border-color: #ffeeba; } | |
| .st-message-box.st-error { background-color: #f8d7da; color: #721c24; border-color: #f5c6cb; } | |
| .main .block-container { overflow: unset !important; } | |
| div[data-testid="stVerticalBlock"] { overflow: unset !important; } | |
| /* Sticky expander & tab header inside preview modal */ | |
| div[data-testid="stExpander"] > details > summary { | |
| position: sticky; top: 0; z-index: 10; background: #fff; border-bottom: 1px solid #eee; | |
| } | |
| div[data-testid="stExpander"] div[data-baseweb="tab-list"] { | |
| position: sticky; top: 42px; z-index: 9; background: #fff; padding-top: 6px; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| TABLE_CENTER_CSS = [ | |
| dict(selector="th", props=[("text-align", "center")]), | |
| dict(selector="td", props=[("text-align", "center")]), | |
| ] | |
| # ========================= | |
| # Password gate | |
| # ========================= | |
| def inline_logo(path="logo.png") -> str: | |
| try: | |
| p = Path(path) | |
| if not p.exists(): return "" | |
| return f"data:image/png;base64,{base64.b64encode(p.read_bytes()).decode('ascii')}" | |
| except Exception: | |
| return "" | |
| def add_password_gate() -> None: | |
| try: | |
| required = st.secrets.get("APP_PASSWORD", "") | |
| except Exception: | |
| required = os.environ.get("APP_PASSWORD", "") | |
| if not required: | |
| st.warning("Set APP_PASSWORD in Secrets (or environment) and restart.") | |
| st.stop() | |
| if st.session_state.get("auth_ok", False): | |
| return | |
| st.sidebar.markdown(f""" | |
| <div class="centered-container"> | |
| <img src="{inline_logo('logo.png')}" style="width: 200px; height: auto; object-fit: contain;"> | |
| <div style='font-weight:800;font-size:1.2rem; margin-top: 10px;'>{APP_NAME}</div> | |
| <div style='color:#667085;'>Smart Thinking • Secure Access</div> | |
| </div> | |
| """, unsafe_allow_html=True | |
| ) | |
| pwd = st.sidebar.text_input("Access key", type="password", placeholder="••••••••") | |
| if st.sidebar.button("Unlock", type="primary"): | |
| if pwd == required: | |
| st.session_state.auth_ok = True | |
| st.rerun() | |
| else: | |
| st.error("Incorrect key.") | |
| st.stop() | |
| add_password_gate() | |
| # ========================= | |
| # Utilities | |
| # ========================= | |
| def rmse(y_true, y_pred) -> float: | |
| return float(np.sqrt(mean_squared_error(y_true, y_pred))) | |
| def pearson_r(y_true, y_pred) -> float: | |
| a = np.asarray(y_true, dtype=float) | |
| p = np.asarray(y_pred, dtype=float) | |
| if a.size < 2: return float("nan") | |
| if np.all(a == a[0]) or np.all(p == p[0]): return float("nan") | |
| return float(np.corrcoef(a, p)[0, 1]) | |
| def mape(y_true, y_pred, eps: float = 1e-8) -> float: | |
| a = np.asarray(y_true, dtype=float) | |
| p = np.asarray(y_pred, dtype=float) | |
| denom = np.where(np.abs(a) < eps, np.nan, np.abs(a)) | |
| pct = np.abs(a - p) / denom * 100.0 | |
| val = np.nanmean(pct) | |
| return float(val) if np.isfinite(val) else float("nan") | |
| def load_model(model_path: str): | |
| return joblib.load(model_path) | |
| def parse_excel(data_bytes: bytes): | |
| bio = io.BytesIO(data_bytes) | |
| xl = pd.ExcelFile(bio) | |
| return {sh: xl.parse(sh) for sh in xl.sheet_names} | |
| def read_book_bytes(b: bytes): | |
| return parse_excel(b) if b else {} | |
| # ---------- Header normalization (to abbreviations for UI) ---------- | |
| def _strip_parens(name: str) -> str: | |
| s = str(name).strip() | |
| if "(" in s and s.endswith(")"): | |
| s = s.split("(", 1)[0].strip() | |
| return s | |
| def _abbr(name: str) -> str: | |
| """Turn any variant into the canonical abbreviation used in UI FEATURES.""" | |
| n = _strip_parens(name) | |
| n = n.replace(" ", "").replace("_", "").replace("-", "") | |
| alias = { | |
| "AC": "DT", | |
| "DTus/ft": "DT", "DTusft": "DT", | |
| "NPHI": "TNPH", "TNPHPercent": "TNPH", "TNPH%": "TNPH", | |
| "GammaRay": "GR", "GRAPI": "GR", | |
| "BulkDensity": "RHOB", "RHOBgcc": "RHOB", | |
| "Thorium": "Th", "TH": "Th", | |
| "U": "Ur", "UR": "Ur", "Uranium": "Ur", | |
| "KPercent": "K", "K%": "K", "Potassium": "K", | |
| "AHT_90": "AHT90", "AHT90AverageHydrocarbonTool90°Phase": "AHT90", | |
| } | |
| if n.upper() in {"GR", "DT", "RHOB"}: return n.upper() if n.upper() != "DT" else "DT" | |
| if n.upper() == "AHT90": return "AHT90" | |
| if n.upper() == "TNPH": return "TNPH" | |
| if n.capitalize() == "Th": return "Th" | |
| if n.capitalize() == "Ur": return "Ur" | |
| return alias.get(n, n) | |
| def normalize_to_abbr(df: pd.DataFrame) -> pd.DataFrame: | |
| out = df.copy() | |
| newcols = [] | |
| for c in out.columns: | |
| ac = _abbr(c) | |
| if ac in FEATURES: | |
| newcols.append(ac) | |
| elif str(c).strip().lower() in {"toc", "toc (%)", "totalorganiccarbon"}: | |
| newcols.append(TARGET) | |
| elif "depth" in str(c).lower(): | |
| newcols.append("Depth") | |
| else: | |
| newcols.append(str(c)) | |
| out.columns = newcols | |
| return out | |
| # ---- Model feature order + X builder (returns NumPy to bypass name checks) ---- | |
| def _training_feature_order(model, fallback_features: list[str]) -> list[str]: | |
| names = list(getattr(model, "feature_names_in_", [])) | |
| if names: | |
| return [str(n) for n in names] | |
| return list(fallback_features) | |
| def _make_X(df_raw: pd.DataFrame, model, fallback_features: list[str]) -> np.ndarray: | |
| df_abbr = normalize_to_abbr(df_raw) | |
| colmap = { _abbr(c): c for c in df_abbr.columns } | |
| train_names = _training_feature_order(model, fallback_features) | |
| order_cols, missing = [], [] | |
| for nm in train_names: | |
| ab = _abbr(nm) | |
| if ab in colmap: | |
| order_cols.append(colmap[ab]) | |
| else: | |
| missing.append(nm) | |
| if missing: | |
| st.markdown( | |
| '<div class="st-message-box st-error">Missing required columns for prediction (by model training): ' | |
| + ", ".join(missing) + '</div>', unsafe_allow_html=True | |
| ) | |
| st.stop() | |
| X_df = df_abbr[order_cols].apply(pd.to_numeric, errors="coerce") | |
| return np.asarray(X_df.to_numpy(dtype=float, copy=False), dtype=float) | |
| def ensure_required_features(df: pd.DataFrame, model, fallback_features: list[str]) -> bool: | |
| df_abbr = normalize_to_abbr(df) | |
| need = [_abbr(nm) for nm in _training_feature_order(model, fallback_features)] | |
| have = {_abbr(c) for c in df_abbr.columns} | |
| miss = [n for n in need if n not in have] | |
| if miss: | |
| st.error(f"Missing columns: {miss}\nFound: {sorted(list(have))}") | |
| return False | |
| return True | |
| def safe_predict(model, df_raw: pd.DataFrame, fallback_features: list[str]) -> np.ndarray: | |
| X = _make_X(df_raw, model, fallback_features) | |
| try: | |
| return model.predict(X) | |
| except Exception: | |
| return model.predict(np.asarray(X, dtype=float)) | |
| def find_sheet(book, names): | |
| low2orig = {k.lower(): k for k in book.keys()} | |
| for nm in names: | |
| if nm.lower() in low2orig: return low2orig[nm.lower()] | |
| return None | |
| def _nice_tick0(xmin: float, step: int = 100) -> float: | |
| return step * math.floor(xmin / step) if np.isfinite(xmin) else xmin | |
| def df_centered_rounded(df: pd.DataFrame, hide_index=True): | |
| out = df.copy() | |
| numcols = out.select_dtypes(include=[np.number]).columns | |
| styler = ( | |
| out.style | |
| .format({c: "{:.2f}" for c in numcols}) | |
| .set_properties(**{"text-align": "center"}) | |
| .set_table_styles(TABLE_CENTER_CSS) | |
| ) | |
| st.dataframe(styler, use_container_width=True, hide_index=hide_index) | |
| # ---- Excel writer engine (robust to missing xlsxwriter) ---- | |
| def _excel_engine() -> str | None: | |
| try: | |
| import xlsxwriter # noqa: F401 | |
| return "xlsxwriter" | |
| except Exception: | |
| try: | |
| import openpyxl # noqa: F401 | |
| return "openpyxl" | |
| except Exception: | |
| return None # let pandas choose if possible | |
| # ========================= | |
| # Cross plot (Matplotlib) | |
| # ========================= | |
| def cross_plot_static(actual, pred): | |
| a = pd.Series(actual, dtype=float) | |
| p = pd.Series(pred, dtype=float) | |
| lo = float(min(a.min(), p.min())) | |
| hi = float(max(a.max(), p.max())) | |
| pad = 0.03 * (hi - lo if hi > lo else 1.0) | |
| lo2, hi2 = lo - pad, hi + pad | |
| ticks = np.linspace(lo2, hi2, 5) | |
| dpi = 110 | |
| fig, ax = plt.subplots(figsize=(CROSS_W / dpi, CROSS_H / dpi), dpi=dpi, constrained_layout=False) | |
| ax.scatter(a, p, s=14, c=COLORS["pred"], alpha=0.9, linewidths=0) | |
| ax.plot([lo2, hi2], [lo2, hi2], linestyle="--", linewidth=1.2, color=COLORS["ref"]) | |
| ax.set_xlim(lo2, hi2); ax.set_ylim(lo2, hi2) | |
| ax.set_xticks(ticks); ax.set_yticks(ticks) | |
| ax.set_aspect("equal", adjustable="box") | |
| fmt = FuncFormatter(lambda x, _: f"{x:,.1f}") | |
| ax.xaxis.set_major_formatter(fmt); ax.yaxis.set_major_formatter(fmt) | |
| ax.set_xlabel("Actual TOC (%)", fontweight="bold", fontsize=10, color="black") | |
| ax.set_ylabel("Predicted TOC (%)", fontweight="bold", fontsize=10, color="black") | |
| ax.tick_params(labelsize=6, colors="black") | |
| ax.grid(True, linestyle=":", alpha=0.3) | |
| for spine in ax.spines.values(): | |
| spine.set_linewidth(1.1); spine.set_color("#444") | |
| fig.subplots_adjust(left=0.16, bottom=0.16, right=0.98, top=0.98) | |
| return fig | |
| # ========================= | |
| # Track plot (Plotly) | |
| # ========================= | |
| def track_plot(df, include_actual=True): | |
| df = normalize_to_abbr(df) | |
| depth_col = next((c for c in df.columns if 'depth' in str(c).lower() or c == "Depth"), None) | |
| if depth_col is not None: | |
| y = pd.Series(df[depth_col]).astype(float); ylab = depth_col | |
| y_range = [float(y.max()), float(y.min())] | |
| else: | |
| y = pd.Series(np.arange(1, len(df) + 1)); ylab = "Point Index" | |
| y_range = [float(y.max()), float(y.min())] | |
| x_series = pd.Series(df.get(PRED_COL, pd.Series(dtype=float))).astype(float) | |
| if include_actual and TARGET in df.columns: | |
| x_series = pd.concat([x_series, pd.Series(df[TARGET]).astype(float)], ignore_index=True) | |
| x_lo, x_hi = float(x_series.min()), float(x_series.max()) | |
| x_pad = 0.03 * (x_hi - x_lo if x_hi > x_lo else 1.0) | |
| xmin, xmax = x_lo - x_pad, x_hi + x_pad | |
| tick0 = _nice_tick0(xmin, step=0.5) | |
| fig = go.Figure() | |
| if PRED_COL in df.columns: | |
| fig.add_trace(go.Scatter( | |
| x=df[PRED_COL], y=y, mode="lines", | |
| line=dict(color=COLORS["pred"], width=1.8), | |
| name=PRED_COL, | |
| hovertemplate=f"{PRED_COL}: "+"%{x:.2f}<br>"+ylab+": %{y}<extra></extra>" | |
| )) | |
| if include_actual and TARGET in df.columns: | |
| fig.add_trace(go.Scatter( | |
| x=df[TARGET], y=y, mode="lines", | |
| line=dict(color=COLORS["actual"], width=2.0, dash="dot"), | |
| name=f"{TARGET} (actual)", | |
| hovertemplate=f"{TARGET}: "+"%{x:.2f}<br>"+ylab+": %{y}<extra></extra>" | |
| )) | |
| fig.update_layout( | |
| height=TRACK_H, width=TRACK_W, autosize=False, | |
| paper_bgcolor="#fff", plot_bgcolor="#fff", | |
| margin=dict(l=64, r=16, t=36, b=48), hovermode="closest", | |
| font=dict(size=FONT_SZ, color="#000"), | |
| legend=dict(x=0.98, y=0.05, xanchor="right", yanchor="bottom", | |
| bgcolor="rgba(255,255,255,0.75)", bordercolor="#ccc", borderwidth=1), | |
| legend_title_text="" | |
| ) | |
| fig.update_xaxes( | |
| title_text="TOC (%)", | |
| title_font=dict(size=20, family=BOLD_FONT, color="#000"), | |
| tickfont=dict(size=12, family=BOLD_FONT, color="#000"), | |
| side="top", range=[xmin, xmax], | |
| ticks="outside", tickformat=",.2f", tickmode="auto", tick0=tick0, | |
| showline=True, linewidth=1.2, linecolor="#444", mirror=True, | |
| showgrid=True, gridcolor="rgba(0,0,0,0.12)", automargin=True | |
| ) | |
| fig.update_yaxes( | |
| title_text=ylab, | |
| title_font=dict(size=20, family=BOLD_FONT, color="#000"), | |
| tickfont=dict(size=15, family=BOLD_FONT, color="#000"), | |
| range=y_range, ticks="outside", | |
| showline=True, linewidth=1.2, linecolor="#444", mirror=True, | |
| showgrid=True, gridcolor="rgba(0,0,0,0.12)", automargin=True | |
| ) | |
| return fig | |
| # ---------- Preview tracks (Matplotlib) ---------- | |
| def preview_tracks(df: pd.DataFrame, cols: list[str]): | |
| df = normalize_to_abbr(df) | |
| cols = [c for c in cols if c in df.columns] | |
| n = len(cols) | |
| if n == 0: | |
| fig, ax = plt.subplots(figsize=(4, 2)) | |
| ax.text(0.5, 0.5, "No selected columns", ha="center", va="center"); ax.axis("off") | |
| return fig | |
| depth_col = next((c for c in df.columns if 'depth' in str(c).lower() or c == "Depth"), None) | |
| if depth_col is not None: | |
| idx = pd.to_numeric(df[depth_col], errors="coerce") | |
| y_label = depth_col | |
| else: | |
| idx = pd.Series(np.arange(1, len(df) + 1)) | |
| y_label = "Point Index" | |
| cmap = plt.get_cmap("tab20") | |
| col_colors = {col: cmap(i % cmap.N) for i, col in enumerate(cols)} | |
| fig, axes = plt.subplots(1, n, figsize=(2.3 * n, 7.0), sharey=True, dpi=100) | |
| if n == 1: axes = [axes] | |
| y_min, y_max = float(idx.min()), float(idx.max()) | |
| for i, (ax, col) in enumerate(zip(axes, cols)): | |
| x = pd.to_numeric(df[col], errors="coerce") | |
| ax.plot(x, idx, '-', lw=1.8, color=col_colors[col]) | |
| ax.set_xlabel(col) | |
| ax.xaxis.set_label_position('top'); ax.xaxis.tick_top() | |
| ax.set_ylim(y_max, y_min) | |
| ax.grid(True, linestyle=":", alpha=0.3) | |
| if i == 0: | |
| ax.set_ylabel(y_label) | |
| else: | |
| ax.tick_params(labelleft=False) | |
| ax.set_ylabel("") | |
| fig.tight_layout() | |
| return fig | |
| # ========================= | |
| # Load model + meta | |
| # ========================= | |
| def ensure_model() -> Path|None: | |
| for p in [DEFAULT_MODEL, *MODEL_FALLBACKS]: | |
| if p.exists() and p.stat().st_size > 0: return p | |
| url = os.environ.get("MODEL_URL", "") | |
| if not url: return None | |
| try: | |
| import requests | |
| DEFAULT_MODEL.parent.mkdir(parents=True, exist_ok=True) | |
| with requests.get(url, stream=True, timeout=30) as r: | |
| r.raise_for_status() | |
| with open(DEFAULT_MODEL, "wb") as f: | |
| for chunk in r.iter_content(1<<20): | |
| if chunk: f.write(chunk) | |
| return DEFAULT_MODEL | |
| except Exception: | |
| return None | |
| mpath = ensure_model() | |
| if not mpath: | |
| st.error("Model not found. Upload models/toc_rf.joblib (or set MODEL_URL).") | |
| st.stop() | |
| try: | |
| model = load_model(str(mpath)) | |
| except Exception as e: | |
| st.error(f"Failed to load model: {e}") | |
| st.stop() | |
| # Optional meta to override defaults | |
| meta = {} | |
| meta_candidates = [MODELS_DIR / "toc_meta.json", MODELS_DIR / "meta.json"] | |
| meta_path = next((p for p in meta_candidates if p.exists()), None) | |
| if meta_path: | |
| try: | |
| meta = json.loads(meta_path.read_text(encoding="utf-8")) | |
| FEATURES = meta.get("features", FEATURES) | |
| TARGET = meta.get("target", TARGET) | |
| PRED_COL = meta.get("pred_col", PRED_COL) | |
| except Exception as e: | |
| st.warning(f"Could not parse meta file ({meta_path.name}): {e}") | |
| if STRICT_VERSION_CHECK and meta.get("versions"): | |
| import numpy as _np, sklearn as _skl | |
| mv = meta["versions"]; msg=[] | |
| if mv.get("numpy") and mv["numpy"] != _np.__version__: | |
| msg.append(f"NumPy {mv['numpy']} expected, running {_np.__version__}") | |
| if mv.get("scikit_learn") and mv["scikit_learn"] != _skl.__version__: | |
| msg.append(f"scikit-learn {mv['scikit_learn']} expected, running {_skl.__version__}") | |
| if msg: | |
| st.warning("Environment mismatch: " + " | ".join(msg)) | |
| # ========================= | |
| # Session state | |
| # ========================= | |
| st.session_state.setdefault("app_step", "intro") | |
| st.session_state.setdefault("results", {}) | |
| st.session_state.setdefault("train_ranges", None) | |
| st.session_state.setdefault("dev_file_name","") | |
| st.session_state.setdefault("dev_file_bytes",b"") | |
| st.session_state.setdefault("dev_file_loaded",False) | |
| st.session_state.setdefault("dev_preview",False) | |
| st.session_state.setdefault("show_preview_modal", False) | |
| # ========================= | |
| # Sidebar branding | |
| # ========================= | |
| st.sidebar.markdown(f""" | |
| <div class="centered-container"> | |
| <img src="{inline_logo('logo.png')}" style="width: 200px; height: auto; object-fit: contain;"> | |
| <div style='font-weight:800;font-size:1.2rem;'>{APP_NAME}</div> | |
| <div style='color:#667085;'>{TAGLINE}</div> | |
| </div> | |
| """, unsafe_allow_html=True | |
| ) | |
| def sticky_header(title, message): | |
| st.markdown( | |
| f""" | |
| <style> | |
| .sticky-container {{ | |
| position: sticky; top: 0; background-color: white; z-index: 100; | |
| padding-top: 10px; padding-bottom: 10px; border-bottom: 1px solid #eee; | |
| }} | |
| </style> | |
| <div class="sticky-container"> | |
| <h3>{title}</h3> | |
| <p>{message}</p> | |
| </div> | |
| """, | |
| unsafe_allow_html=True | |
| ) | |
| # ========================= | |
| # INTRO | |
| # ========================= | |
| if st.session_state.app_step == "intro": | |
| st.header("Welcome!") | |
| st.markdown("This software is developed by *Smart Thinking AI-Solutions Team* to estimate **Total Organic Carbon (TOC)** from logging data.") | |
| st.subheader("How It Works") | |
| st.markdown( | |
| "1) **Upload your data to build the case and preview the model performance.** \n" | |
| "2) Click **Run Model** to compute metrics and plots. \n" | |
| "3) **Proceed to Validation** (with actual TOC) or **Proceed to Prediction** (no TOC)." | |
| ) | |
| if st.button("Start Showcase", type="primary"): | |
| st.session_state.app_step = "dev"; st.rerun() | |
| # ========================= | |
| # CASE BUILDING | |
| # ========================= | |
| if st.session_state.app_step == "dev": | |
| st.sidebar.header("Case Building") | |
| up = st.sidebar.file_uploader("Upload Your Data File", type=["xlsx","xls"]) | |
| if up is not None: | |
| st.session_state.dev_file_bytes = up.getvalue() | |
| st.session_state.dev_file_name = up.name | |
| st.session_state.dev_file_loaded = True | |
| st.session_state.dev_preview = False | |
| if st.session_state.dev_file_loaded: | |
| tmp = read_book_bytes(st.session_state.dev_file_bytes) | |
| if tmp: | |
| df0 = next(iter(tmp.values())) | |
| st.sidebar.caption(f"**Data loaded:** {st.session_state.dev_file_name} • {df0.shape[0]} rows × {df0.shape[1]} cols") | |
| if st.sidebar.button("Preview data", use_container_width=True, disabled=not st.session_state.dev_file_loaded): | |
| st.session_state.show_preview_modal = True | |
| st.session_state.dev_preview = True | |
| run = st.sidebar.button("Run Model", type="primary", use_container_width=True) | |
| if st.sidebar.button("Proceed to Validation ▶", use_container_width=True): st.session_state.app_step="validate"; st.rerun() | |
| if st.sidebar.button("Proceed to Prediction ▶", use_container_width=True): st.session_state.app_step="predict"; st.rerun() | |
| if st.session_state.dev_file_loaded and st.session_state.dev_preview: | |
| sticky_header("Case Building", "Previewed ✓ — now click **Run Model**.") | |
| elif st.session_state.dev_file_loaded: | |
| sticky_header("Case Building", "📄 **Preview uploaded data** using the sidebar button, then click **Run Model**.") | |
| else: | |
| sticky_header("Case Building", "**Upload your data to build a case, then run the model to review development performance.**") | |
| if run and st.session_state.dev_file_bytes: | |
| book = read_book_bytes(st.session_state.dev_file_bytes) | |
| sh_train = find_sheet(book, ["Train","Training","training2","train","training"]) | |
| sh_test = find_sheet(book, ["Test","Testing","testing2","test","testing"]) | |
| if sh_train is None or sh_test is None: | |
| st.markdown('<div class="st-message-box st-error">Workbook must include Train/Training/training2 and Test/Testing/testing2 sheets.</div>', unsafe_allow_html=True) | |
| st.stop() | |
| tr_raw = book[sh_train].copy() | |
| te_raw = book[sh_test].copy() | |
| if not (ensure_required_features(tr_raw, model, FEATURES) and ensure_required_features(te_raw, model, FEATURES)): | |
| st.stop() | |
| tr = normalize_to_abbr(tr_raw) | |
| te = normalize_to_abbr(te_raw) | |
| tr[PRED_COL] = safe_predict(model, tr_raw, FEATURES) | |
| te[PRED_COL] = safe_predict(model, te_raw, FEATURES) | |
| st.session_state.results["Train"]=tr; st.session_state.results["Test"]=te | |
| st.session_state.results["m_train"]={ | |
| "R": pearson_r(tr[TARGET], tr[PRED_COL]), | |
| "RMSE": rmse(tr[TARGET], tr[PRED_COL]), | |
| "MAPE": mape(tr[TARGET], tr[PRED_COL]) | |
| } | |
| st.session_state.results["m_test"]={ | |
| "R": pearson_r(te[TARGET], te[PRED_COL]), | |
| "RMSE": rmse(te[TARGET], te[PRED_COL]), | |
| "MAPE": mape(te[TARGET], te[PRED_COL]) | |
| } | |
| tr_min = tr[FEATURES].min().to_dict(); tr_max = tr[FEATURES].max().to_dict() | |
| st.session_state.train_ranges = {f:(float(tr_min[f]), float(tr_max[f])) for f in FEATURES} | |
| st.markdown('<div class="st-message-box st-success">Case has been built and results are displayed below.</div>', unsafe_allow_html=True) | |
| def _dev_block(df, m): | |
| c1,c2,c3 = st.columns(3) | |
| c1.metric("R", f"{m['R']:.3f}") | |
| c2.metric("RMSE", f"{m['RMSE']:.2f}") | |
| c3.metric("MAPE (%)", f"{m['MAPE']:.2f}%") | |
| st.markdown(""" | |
| <div style='text-align: left; font-size: 0.8em; color: #6b7280; margin-top: -16px; margin-bottom: 8px;'> | |
| <strong>R:</strong> Pearson Correlation Coefficient<br> | |
| <strong>RMSE:</strong> Root Mean Square Error<br> | |
| <strong>MAPE:</strong> Mean Absolute Percentage Error (percent of actual; rows with near-zero actuals are ignored). | |
| </div> | |
| """, unsafe_allow_html=True) | |
| col_track, col_cross = st.columns([2, 3], gap="large") | |
| with col_track: | |
| st.plotly_chart(track_plot(df, include_actual=True), use_container_width=False, config={"displayModeBar": False, "scrollZoom": True}) | |
| with col_cross: | |
| st.pyplot(cross_plot_static(df[TARGET], df[PRED_COL]), use_container_width=False) | |
| if "Train" in st.session_state.results or "Test" in st.session_state.results: | |
| tab1, tab2 = st.tabs(["Training", "Testing"]) | |
| if "Train" in st.session_state.results: | |
| with tab1: _dev_block(st.session_state.results["Train"], st.session_state.results["m_train"]) | |
| if "Test" in st.session_state.results: | |
| with tab2: _dev_block(st.session_state.results["Test"], st.session_state.results["m_test"]) | |
| st.divider() | |
| st.markdown("### Export to Excel") | |
| def _excel_safe_name(name: str) -> str: | |
| bad = '[]:*?/\\' | |
| safe = ''.join('_' if ch in bad else ch for ch in str(name)) | |
| return safe[:31] | |
| def _round_numeric(df: pd.DataFrame, ndigits: int = 2) -> pd.DataFrame: | |
| out = df.copy() | |
| for c in out.columns: | |
| if pd.api.types.is_float_dtype(out[c]) or pd.api.types.is_integer_dtype(out[c]): | |
| out[c] = pd.to_numeric(out[c], errors="coerce").round(ndigits) | |
| return out | |
| def _summary_table(df: pd.DataFrame, cols: list[str]) -> pd.DataFrame: | |
| cols = [c for c in cols if c in df.columns] | |
| if not cols: | |
| return pd.DataFrame() | |
| tbl = (df[cols] | |
| .agg(['min','max','mean','std']) | |
| .T.rename(columns={"min":"Min","max":"Max","mean":"Mean","std":"Std"}) | |
| .reset_index(names="Field")) | |
| return _round_numeric(tbl) | |
| def _train_ranges_df(ranges: dict[str, tuple[float, float]]) -> pd.DataFrame: | |
| if not ranges: | |
| return pd.DataFrame() | |
| df = pd.DataFrame(ranges).T.reset_index() | |
| df.columns = ["Feature", "Min", "Max"] | |
| return _round_numeric(df) | |
| def _available_sections() -> list[str]: | |
| res = st.session_state.get("results", {}) | |
| sections = [] | |
| if "Train" in res: sections += ["Training","Training_Metrics","Training_Summary"] | |
| if "Test" in res: sections += ["Testing","Testing_Metrics","Testing_Summary"] | |
| if "Validate" in res: sections += ["Validation","Validation_Metrics","Validation_Summary","Validation_OOR"] | |
| if "PredictOnly" in res: sections += ["Prediction","Prediction_Summary"] | |
| if st.session_state.get("train_ranges"): sections += ["Training_Ranges"] | |
| sections += ["Info"] | |
| return sections | |
| def build_export_workbook(selected: list[str] | None = None) -> tuple[bytes|None, str|None, list[str]]: | |
| res = st.session_state.get("results", {}) | |
| if not res: return None, None, [] | |
| sheets: dict[str, pd.DataFrame] = {} | |
| order: list[str] = [] | |
| if ("Training" in (selected or _available_sections())) and "Train" in res: | |
| tr = _round_numeric(res["Train"]); sheets["Training"] = tr; order.append("Training") | |
| m = st.session_state.get("results", {}).get("m_train", {}) | |
| if m: sheets["Training_Metrics"] = _round_numeric(pd.DataFrame([m])); order.append("Training_Metrics") | |
| s = _summary_table(tr, FEATURES + [c for c in [TARGET, PRED_COL] if c in tr.columns]) | |
| if not s.empty: sheets["Training_Summary"] = s; order.append("Training_Summary") | |
| if ("Testing" in (selected or _available_sections())) and "Test" in res: | |
| te = _round_numeric(res["Test"]); sheets["Testing"] = te; order.append("Testing") | |
| m = st.session_state.get("results", {}).get("m_test", {}) | |
| if m: sheets["Testing_Metrics"] = _round_numeric(pd.DataFrame([m])); order.append("Testing_Metrics") | |
| s = _summary_table(te, FEATURES + [c for c in [TARGET, PRED_COL] if c in te.columns]) | |
| if not s.empty: sheets["Testing_Summary"] = s; order.append("Testing_Summary") | |
| if ("Validation" in (selected or _available_sections())) and "Validate" in res: | |
| va = _round_numeric(res["Validate"]); sheets["Validation"] = va; order.append("Validation") | |
| m = st.session_state.get("results", {}).get("m_val", {}) | |
| if m: sheets["Validation_Metrics"] = _round_numeric(pd.DataFrame([m])); order.append("Validation_Metrics") | |
| sv = st.session_state.get("results", {}).get("sv_val", {}) | |
| if sv: sheets["Validation_Summary"] = _round_numeric(pd.DataFrame([sv])); order.append("Validation_Summary") | |
| oor_tbl = st.session_state.get("results", {}).get("oor_tbl") | |
| if isinstance(oor_tbl, pd.DataFrame) and not oor_tbl.empty: | |
| sheets["Validation_OOR"] = _round_numeric(oor_tbl.reset_index(drop=True)); order.append("Validation_OOR") | |
| if ("Prediction" in (selected or _available_sections())) and "PredictOnly" in res: | |
| pr = _round_numeric(res["PredictOnly"]); sheets["Prediction"] = pr; order.append("Prediction") | |
| sv = st.session_state.get("results", {}).get("sv_pred", {}) | |
| if sv: sheets["Prediction_Summary"] = _round_numeric(pd.DataFrame([sv])); order.append("Prediction_Summary") | |
| tr_ranges = st.session_state.get("train_ranges") | |
| if ("Training_Ranges" in (selected or _available_sections())) and tr_ranges: | |
| rr = _train_ranges_df(tr_ranges) | |
| if not rr.empty: sheets["Training_Ranges"] = rr; order.append("Training_Ranges") | |
| info = pd.DataFrame([ | |
| {"Key": "AppName", "Value": APP_NAME}, | |
| {"Key": "Tagline", "Value": TAGLINE}, | |
| {"Key": "Target", "Value": TARGET}, | |
| {"Key": "PredColumn", "Value": PRED_COL}, | |
| {"Key": "Features", "Value": ", ".join(FEATURES)}, | |
| {"Key": "ExportedAt", "Value": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}, | |
| ]) | |
| sheets["Info"] = info; order.append("Info") | |
| bio = io.BytesIO() | |
| engine = _excel_engine() | |
| with pd.ExcelWriter(bio, engine=engine) as writer: | |
| for name in order: | |
| sheets[name].to_excel(writer, sheet_name=_excel_safe_name(name), index=False) | |
| bio.seek(0) | |
| fname = f"TOC_Export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx" | |
| return bio.getvalue(), fname, order | |
| options = _available_sections() | |
| selected_sheets = st.multiselect( | |
| "Sheets to include", | |
| options=options, default=[], | |
| placeholder="Choose option(s)", | |
| help="Pick the sheets you want to include in the Excel export.", | |
| key="sheets_dev", | |
| ) | |
| if not selected_sheets: | |
| st.caption("Select one or more sheets above to enable the export.") | |
| st.download_button("⬇️ Export Excel", data=b"", file_name="TOC_Export.xlsx", | |
| mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", | |
| disabled=True, key="download_dev_disabled") | |
| else: | |
| data, fname, names = build_export_workbook(selected=selected_sheets) | |
| if names: st.caption("Will include: " + ", ".join(names)) | |
| st.download_button("⬇️ Export Excel", data=(data or b""), file_name=(fname or "TOC_Export.xlsx"), | |
| mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", | |
| disabled=(data is None), key="download_dev") | |
| # ========================= | |
| # VALIDATION (with actual TOC) | |
| # ========================= | |
| if st.session_state.app_step == "validate": | |
| st.sidebar.header("Validate the Model") | |
| up = st.sidebar.file_uploader("Upload Validation Excel", type=["xlsx","xls"]) | |
| if up is not None: | |
| book = read_book_bytes(up.getvalue()) | |
| if book: | |
| df0 = next(iter(book.values())) | |
| st.sidebar.caption(f"**Data loaded:** {up.name} • {df0.shape[0]} rows × {df0.shape[1]} cols") | |
| if st.sidebar.button("Preview data", use_container_width=True, disabled=(up is None)): | |
| st.session_state.show_preview_modal = True | |
| go_btn = st.sidebar.button("Predict & Validate", type="primary", use_container_width=True) | |
| if st.sidebar.button("⬅ Back to Case Building", use_container_width=True): st.session_state.app_step="dev"; st.rerun() | |
| if st.sidebar.button("Proceed to Prediction ▶", use_container_width=True): st.session_state.app_step="predict"; st.rerun() | |
| sticky_header("Validate the Model", "Upload a dataset with the same **features** and **TOC** to evaluate performance.") | |
| if go_btn and up is not None: | |
| book = read_book_bytes(up.getvalue()) | |
| name = find_sheet(book, ["Validation","Validate","validation2","Val","val"]) or list(book.keys())[0] | |
| df_raw = book[name].copy() | |
| if not ensure_required_features(df_raw, model, FEATURES): | |
| st.stop() | |
| df = normalize_to_abbr(df_raw) | |
| df[PRED_COL] = safe_predict(model, df_raw, FEATURES) | |
| st.session_state.results["Validate"]=df | |
| ranges = st.session_state.train_ranges; oor_pct = 0.0; tbl=None | |
| if ranges: | |
| any_viol = pd.DataFrame({f:(df[f]<ranges[f][0])|(df[f]>ranges[f][1]) for f in FEATURES if f in df.columns}).any(axis=1) | |
| oor_pct = float(any_viol.mean()*100.0) | |
| if any_viol.any(): | |
| tbl = df.loc[any_viol, [c for c in FEATURES if c in df.columns]].copy() | |
| for c in [c for c in FEATURES if c in tbl.columns]: | |
| if pd.api.types.is_numeric_dtype(tbl[c]): tbl[c] = tbl[c].round(2) | |
| tbl["Violations"] = pd.DataFrame({f:(df[f]<ranges[f][0])|(df[f]>ranges[f][1]) for f in FEATURES if f in df.columns}).loc[any_viol].apply( | |
| lambda r:", ".join([c for c,v in r.items() if v]), axis=1 | |
| ) | |
| st.session_state.results["m_val"]={ | |
| "R": pearson_r(df[TARGET], df[PRED_COL]), | |
| "RMSE": rmse(df[TARGET], df[PRED_COL]), | |
| "MAPE": mape(df[TARGET], df[PRED_COL]) | |
| } | |
| st.session_state.results["sv_val"]={"n":len(df), "pred_min":float(df[PRED_COL].min()), "pred_max":float(df[PRED_COL].max()), "oor":oor_pct} | |
| st.session_state.results["oor_tbl"]=tbl | |
| if "Validate" in st.session_state.results: | |
| m = st.session_state.results["m_val"] | |
| c1,c2,c3 = st.columns(3) | |
| c1.metric("R", f"{m['R']:.3f}") | |
| c2.metric("RMSE", f"{m['RMSE']:.2f}") | |
| c3.metric("MAPE (%)", f"{m['MAPE']:.2f}%") | |
| st.markdown(""" | |
| <div style='text-align: left; font-size: 0.8em; color: #6b7280; margin-top: -16px; margin-bottom: 8px;'> | |
| <strong>R:</strong> Pearson Correlation Coefficient<br> | |
| <strong>RMSE:</strong> Root Mean Square Error<br> | |
| <strong>MAPE:</strong> Mean Absolute Percentage Error (percent of actual; rows with near-zero actuals are ignored). | |
| </div> | |
| """, unsafe_allow_html=True) | |
| col_track, col_cross = st.columns([2, 3], gap="large") | |
| with col_track: | |
| st.plotly_chart(track_plot(st.session_state.results["Validate"], include_actual=True), | |
| use_container_width=False, config={"displayModeBar": False, "scrollZoom": True}) | |
| with col_cross: | |
| st.pyplot(cross_plot_static(st.session_state.results["Validate"][TARGET], | |
| st.session_state.results["Validate"][PRED_COL]), | |
| use_container_width=False) | |
| st.divider() | |
| st.markdown("### Export to Excel") | |
| def _export_val(): | |
| res = st.session_state.get("results", {}) | |
| sheets = {} | |
| sheets["Validation"] = res["Validate"] | |
| sheets["Validation_Metrics"] = pd.DataFrame([res.get("m_val", {})]) | |
| if "sv_val" in res: sheets["Validation_Summary"] = pd.DataFrame([res["sv_val"]]) | |
| if isinstance(res.get("oor_tbl"), pd.DataFrame) and not res["oor_tbl"].empty: | |
| sheets["Validation_OOR"] = res["oor_tbl"].reset_index(drop=True) | |
| sheets["Info"] = pd.DataFrame([ | |
| {"Key":"AppName","Value":APP_NAME}, | |
| {"Key":"Target","Value":TARGET}, | |
| {"Key":"PredColumn","Value":PRED_COL}, | |
| {"Key":"ExportedAt","Value":datetime.now().strftime("%Y-%m-%d %H:%M:%S")}, | |
| ]) | |
| bio = io.BytesIO() | |
| engine = _excel_engine() | |
| with pd.ExcelWriter(bio, engine=engine) as writer: | |
| for k,v in sheets.items(): | |
| v.to_excel(writer, sheet_name=k[:31], index=False) | |
| bio.seek(0) | |
| return bio.getvalue(), f"TOC_Validation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx" | |
| data_x, fn_x = _export_val() | |
| st.download_button("⬇️ Export Excel", data=data_x, file_name=fn_x, | |
| mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet") | |
| sv = st.session_state.results["sv_val"] | |
| if sv["oor"] > 0: st.markdown('<div class="st-message-box st-warning">Some inputs fall outside **training min–max** ranges.</div>', unsafe_allow_html=True) | |
| if st.session_state.results["oor_tbl"] is not None: | |
| st.write("*Out-of-range rows (vs. Training min–max):*") | |
| df_centered_rounded(st.session_state.results["oor_tbl"]) | |
| # ========================= | |
| # PREDICTION (no actual TOC) | |
| # ========================= | |
| if st.session_state.app_step == "predict": | |
| st.sidebar.header("Prediction (No Actual TOC)") | |
| up = st.sidebar.file_uploader("Upload Prediction Excel", type=["xlsx","xls"]) | |
| if up is not None: | |
| book = read_book_bytes(up.getvalue()) | |
| if book: | |
| df0 = next(iter(book.values())) | |
| st.sidebar.caption(f"**Data loaded:** {up.name} • {df0.shape[0]} rows × {df0.shape[1]} cols") | |
| if st.sidebar.button("Preview data", use_container_width=True, disabled=(up is None)): | |
| st.session_state.show_preview_modal = True | |
| go_btn = st.sidebar.button("Predict", type="primary", use_container_width=True) | |
| if st.sidebar.button("⬅ Back to Case Building", use_container_width=True): st.session_state.app_step="dev"; st.rerun() | |
| sticky_header("Prediction", "Upload a dataset with the feature columns (no **TOC**).") | |
| if go_btn and up is not None: | |
| book = read_book_bytes(up.getvalue()); name = list(book.keys())[0] | |
| df_raw = book[name].copy() | |
| if not ensure_required_features(df_raw, model, FEATURES): | |
| st.stop() | |
| df = normalize_to_abbr(df_raw) | |
| df[PRED_COL] = safe_predict(model, df_raw, FEATURES) | |
| st.session_state.results["PredictOnly"]=df | |
| ranges = st.session_state.train_ranges; oor_pct = 0.0 | |
| if ranges: | |
| any_viol = pd.DataFrame({f:(df[f]<ranges[f][0])|(df[f]>ranges[f][1]) for f in FEATURES if f in df.columns}).any(axis=1) | |
| oor_pct = float(any_viol.mean()*100.0) | |
| st.session_state.results["sv_pred"]={ | |
| "n":len(df), | |
| "pred_min":float(df[PRED_COL].min()), | |
| "pred_max":float(df[PRED_COL].max()), | |
| "pred_mean":float(df[PRED_COL].mean()), | |
| "pred_std":float(df[PRED_COL].std(ddof=0)), | |
| "oor":oor_pct | |
| } | |
| if "PredictOnly" in st.session_state.results: | |
| df = st.session_state.results["PredictOnly"]; sv = st.session_state.results["sv_pred"] | |
| col_left, col_right = st.columns([2,3], gap="large") | |
| with col_left: | |
| table = pd.DataFrame({ | |
| "Metric": ["# points","Pred min","Pred max","Pred mean","Pred std","OOR %"], | |
| "Value": [sv["n"], round(sv["pred_min"],2), round(sv["pred_max"],2), | |
| round(sv["pred_mean"],2), round(sv["pred_std"],2), f'{sv["oor"]:.1f}%'] | |
| }) | |
| st.markdown('<div class="st-message-box st-success">Predictions ready ✓</div>', unsafe_allow_html=True) | |
| df_centered_rounded(table, hide_index=True) | |
| st.caption("**★ OOR** = % of rows whose input features fall outside the training min–max range.") | |
| with col_right: | |
| st.plotly_chart(track_plot(df, include_actual=False), | |
| use_container_width=False, config={"displayModeBar": False, "scrollZoom": True}) | |
| st.divider() | |
| def _export_pred(): | |
| res = st.session_state.get("results", {}) | |
| sheets = {"Prediction": res["PredictOnly"], "Prediction_Summary": pd.DataFrame([sv])} | |
| sheets["Info"] = pd.DataFrame([ | |
| {"Key":"AppName","Value":APP_NAME}, | |
| {"Key":"Target","Value":TARGET}, | |
| {"Key":"PredColumn","Value":PRED_COL}, | |
| {"Key":"ExportedAt","Value":datetime.now().strftime("%Y-%m-%d %H:%M:%S")}, | |
| ]) | |
| bio = io.BytesIO() | |
| engine = _excel_engine() | |
| with pd.ExcelWriter(bio, engine=engine) as writer: | |
| for k,v in sheets.items(): | |
| v.to_excel(writer, sheet_name=k[:31], index=False) | |
| bio.seek(0) | |
| return bio.getvalue(), f"TOC_Prediction_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx" | |
| data_x, fn_x = _export_pred() | |
| st.download_button("⬇️ Export Excel", data=data_x, file_name=fn_x, | |
| mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet") | |
| # ========================= | |
| # Preview modal | |
| # ========================= | |
| if st.session_state.show_preview_modal: | |
| book_to_preview = {} | |
| if st.session_state.app_step == "dev": | |
| book_to_preview = read_book_bytes(st.session_state.dev_file_bytes) | |
| elif st.session_state.app_step in ["validate", "predict"] and 'up' in locals() and up is not None: | |
| book_to_preview = read_book_bytes(up.getvalue()) | |
| with st.expander("Preview data", expanded=True): | |
| if not book_to_preview: | |
| st.markdown('<div class="st-message-box">No data loaded yet.</div>', unsafe_allow_html=True) | |
| else: | |
| names = list(book_to_preview.keys()) | |
| tabs = st.tabs(names) | |
| for t, name in zip(tabs, names): | |
| with t: | |
| df = normalize_to_abbr(book_to_preview[name]) | |
| t1, t2 = st.tabs(["Tracks", "Summary"]) | |
| with t1: | |
| st.pyplot(preview_tracks(df, FEATURES), use_container_width=True) | |
| with t2: | |
| feat_present = [c for c in FEATURES if c in df.columns] | |
| if not feat_present: | |
| st.info("No feature columns found to summarize.") | |
| else: | |
| tbl = ( | |
| df[feat_present] | |
| .agg(['min','max','mean','std']) | |
| .T.rename(columns={"min":"Min","max":"Max","mean":"Mean","std":"Std"}) | |
| .reset_index(names="Feature") | |
| ) | |
| df_centered_rounded(tbl) | |
| st.session_state.show_preview_modal = False | |
| # ========================= | |
| # Footer | |
| # ========================= | |
| st.markdown(""" | |
| <br><br><br> | |
| <hr> | |
| <div style='text-align:center;color:#6b7280;font-size:1.0em;'> | |
| © 2025 Smart Thinking AI-Solutions Team. All rights reserved.<br> | |
| Website: <a href="https://smartthinking.com.sa" target="_blank" rel="noopener noreferrer">smartthinking.com.sa</a> | |
| </div> | |
| """, unsafe_allow_html=True) | |