MF / app.py
Parthiban97's picture
Update app.py
a9d9774 verified
from __future__ import annotations
import io
import sys
import tempfile
import time
import traceback
from contextlib import redirect_stderr, redirect_stdout
from datetime import datetime
from pathlib import Path
from typing import Any
import streamlit as st
from src.csv_enrichment import (
TARGET_COLUMNS,
EnrichmentConfig,
enrich_csv,
)
from src.data_engine import run_data_engine
# ── Session logging ───────────────────────────────────────────────────────────
def _init_session_log() -> Path:
if "session_log_path" not in st.session_state:
log_dir = Path("logs") / "streamlit_sessions"
log_dir.mkdir(parents=True, exist_ok=True)
stamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
log_path = log_dir / f"session_{stamp}.log"
log_path.write_text(
f"[{datetime.now().isoformat()}] session_started\n",
encoding="utf-8",
)
st.session_state["session_log_path"] = str(log_path)
return Path(st.session_state["session_log_path"])
def _log_session_event(message: str) -> None:
try:
log_path = _init_session_log()
with log_path.open("a", encoding="utf-8") as f:
f.write(f"[{datetime.now().isoformat()}] {message}\n")
except Exception:
pass
def _log_session_block(title: str, content: str) -> None:
try:
log_path = _init_session_log()
with log_path.open("a", encoding="utf-8") as f:
f.write(f"[{datetime.now().isoformat()}] --- {title} (start) ---\n")
f.write((content.rstrip() + "\n") if content.strip() else "(no output)\n")
f.write(f"[{datetime.now().isoformat()}] --- {title} (end) ---\n")
except Exception:
pass
# ── Captured output runner ────────────────────────────────────────────────────
def _run_with_captured_output(func: Any, *args: Any, **kwargs: Any) -> tuple[Any, str]:
"""Run function, mirror prints to terminal, capture for UI display."""
class _TeeCapture(io.TextIOBase):
def __init__(self, mirror: Any, on_write: Any = None) -> None:
self._mirror = mirror
self._buffer = io.StringIO()
self._on_write = on_write
def write(self, s: str) -> int:
text = str(s)
self._buffer.write(text)
try:
self._mirror.write(text)
self._mirror.flush()
except Exception:
pass
if self._on_write is not None:
try:
self._on_write(text)
except Exception:
pass
return len(text)
def flush(self) -> None:
try:
self._mirror.flush()
except Exception:
pass
def getvalue(self) -> str:
return self._buffer.getvalue()
live_callback = kwargs.pop("live_callback", None)
out_tee = _TeeCapture(sys.__stdout__, live_callback)
err_tee = _TeeCapture(sys.__stderr__, live_callback)
with redirect_stdout(out_tee), redirect_stderr(err_tee):
result = func(*args, **kwargs)
return result, out_tee.getvalue() + err_tee.getvalue()
# ── CSS ───────────────────────────────────────────────────────────────────────
def _inject_custom_css() -> None:
st.markdown(
"""
<style>
:root {
--mf-primary: #4A90E2;
--mf-accent: #22c55e;
--mf-bg: #0f0f0f;
--mf-bg-secondary: #1a1a1a;
--mf-surface: #1a1a1a;
--mf-text: #e5e5e5;
--mf-text-muted: #a0a0a0;
--mf-border: #333333;
}
.mf-shell { max-width: 1100px; margin: 0 auto; padding: 0 0 3rem 0; }
.mf-hero {
padding: 1.9rem 2.1rem 1.5rem 2.1rem;
border-radius: 18px;
background: var(--mf-bg-secondary);
border: 1px solid var(--mf-border);
}
.mf-kicker {
letter-spacing: .16em; font-size: 0.75rem;
text-transform: uppercase; color: var(--mf-primary); margin-bottom: 0.5rem;
}
.mf-title {
font-size: 2.2rem; font-weight: 650;
line-height: 1.1; color: var(--mf-text); margin-bottom: 0.75rem;
}
.mf-subtitle { max-width: 40rem; font-size: 0.95rem; color: var(--mf-text-muted); }
.mf-panel {
margin-top: 1.75rem; padding: 1.5rem 1.75rem 1.75rem 1.75rem;
border-radius: 20px; background: var(--mf-surface);
border: 1px solid var(--mf-border);
}
.mf-helper { font-size: 0.8rem; color: var(--mf-text-muted); margin-bottom: 0.9rem; }
.mf-steps { font-size: 0.78rem; color: var(--mf-text-muted); margin-top: 0.3rem; }
.mf-steps li { margin-bottom: 0.1rem; }
.mf-metrics { display: flex; flex-wrap: wrap; gap: 0.75rem; margin-top: 1.25rem; }
.mf-metric {
flex: 0 0 auto; min-width: 140px; padding: 0.6rem 0.8rem;
border-radius: 0.9rem; border: 1px solid var(--mf-border);
background: var(--mf-bg-secondary);
}
.mf-metric-label {
font-size: 0.72rem; text-transform: uppercase;
letter-spacing: 0.09em; color: var(--mf-text-muted); margin-bottom: 0.2rem;
}
.mf-metric-value { font-size: 1.05rem; font-weight: 600; color: var(--mf-accent); }
.mf-timing {
margin-top: 1rem; padding: 0.75rem 1rem;
border-radius: 0.75rem; border: 1px solid var(--mf-border);
background: var(--mf-bg-secondary); font-size: 0.8rem;
color: var(--mf-text-muted);
}
.mf-download-label {
font-size: 0.8rem; color: var(--mf-text-muted);
margin-top: 1.4rem; margin-bottom: 0.35rem;
}
.stFileUploader div[data-testid="stFileUploaderDropzone"] {
border-radius: 0.9rem; border-color: var(--mf-border);
background: var(--mf-bg-secondary);
}
.stButton > button[kind="primary"], .stDownloadButton > button {
border-radius: 0.5rem; border: none;
background: var(--mf-primary) !important;
color: white !important; font-weight: 600;
}
.stApp, [data-testid="stAppViewContainer"] { background-color: var(--mf-bg); }
.block-container { padding-top: 1.5rem; }
@media (max-width: 768px) {
.mf-hero { padding: 1.4rem 1.3rem 1.2rem 1.3rem; }
.mf-title { font-size: 1.6rem; }
}
</style>
""",
unsafe_allow_html=True,
)
# ── Main ──────────────────────────────────────────────────────────────────────
def main() -> None:
st.set_page_config(
page_title="MF Scoring Engine Β· Advisor Demo",
page_icon="πŸ“ˆ",
layout="centered",
)
_inject_custom_css()
_init_session_log()
_log_session_event("app_rendered")
st.markdown('<div class="mf-shell">', unsafe_allow_html=True)
st.markdown(
"""
<section class="mf-hero">
<div class="mf-kicker">Advisor tool</div>
<div class="mf-title">Score your mutual fund list in Excel.</div>
<p class="mf-subtitle">
Upload your mutual fund CSV. The app runs enrichment (NAV engine β†’ web fallback β†’ median),
scores every fund, and gives you a ready-to-share Excel workbook.
</p>
</section>
""",
unsafe_allow_html=True,
)
st.markdown('<section class="mf-panel">', unsafe_allow_html=True)
tab_run, tab_about = st.tabs(["Run analysis", "How scoring works"])
with tab_run:
st.markdown("### Upload CSV & generate workbook")
st.markdown(
"""
<p class="mf-helper">
Upload your standard fund universe CSV
(<code>Fund</code>, <code>Benchmark Type</code>, CAGR columns, etc.).<br>
<strong>P/E and P/B are computed from AMFI monthly holdings (active funds) or NSE index API (index funds)</strong> β€”
all risk metrics (Alpha, Sharpe, Sortino, etc.) are computed directly from NAV history.
</p>
""",
unsafe_allow_html=True,
)
uploaded_file = st.file_uploader(
"Step 1 Β· Upload fund universe CSV",
type=["csv"],
help="Same CSV you feed into the offline data engine.",
)
if uploaded_file is not None:
st.caption(
f"Selected: **{uploaded_file.name}** Β· "
f"{(len(uploaded_file.getbuffer()) / 1024):.1f} KB"
)
_log_session_event(
f"uploaded_file name={uploaded_file.name} "
f"size_kb={(len(uploaded_file.getbuffer())/1024):.1f}"
)
st.info(
"Pipeline: **Scheme code resolution β†’ NAV engine (parallel) "
"β†’ PE/PB via AMFI holdings + NSE API β†’ category median fallback β†’ scoring engine**"
)
st.markdown(
"""
<ul class="mf-steps">
<li>1 β€” Upload your latest CSV export.</li>
<li>2 β€” Click <strong>Run analysis</strong> and watch live logs.</li>
<li>3 β€” Download the scored Excel when complete.</li>
</ul>
""",
unsafe_allow_html=True,
)
run_clicked = st.button(
"Step 2 Β· Run analysis",
type="primary",
use_container_width=True,
disabled=uploaded_file is None,
)
# ── State carried across rerun ─────────────────────────────────────
generated_bytes: io.BytesIO | None = None
generated_filename: str | None = None
funds_count: int | None = None
categories_count: int | None = None
enrichment_summary: str | None = None
timing_html: str | None = None
if run_clicked:
_log_session_event("run_analysis_clicked")
if uploaded_file is None:
st.warning("Please upload a CSV file first.")
_log_session_event("run_aborted_no_upload")
else:
base_stem = Path(uploaded_file.name).stem
stamp = datetime.now().strftime("%Y%m%d_%H%M%S")
input_stem = f"{base_stem}_{stamp}"
with tempfile.NamedTemporaryFile(delete=False, suffix=".csv") as tmp:
tmp.write(uploaded_file.getbuffer())
input_path = Path(tmp.name)
out_dir = Path("output")
out_dir.mkdir(exist_ok=True)
generated_path = out_dir / f"fund_analysis_{input_stem}.xlsx"
t_total_start = time.perf_counter()
try:
with st.status("Processing…", expanded=True) as status:
live_lines: list[str] = []
live_box = st.empty()
# Noise patterns to suppress from the live log box
_SUPPRESS = (
"missing ScriptRunContext",
"FutureWarning",
"Passing literal json",
"To read from a literal string",
"return pd.read_json",
)
def _live_sink(chunk: str) -> None:
clean = chunk.replace("\r", "")
new = [
ln for ln in clean.split("\n")
if ln.strip()
and not any(s in ln for s in _SUPPRESS)
]
if not new:
return
live_lines.extend(new)
if len(live_lines) > 50:
del live_lines[:-50]
live_box.code("\n".join(live_lines), language="text")
# ── Phase 1: Enrichment ────────────────────────────
st.write("**1/2 Enrichment** β€” scheme codes β†’ NAV engine β†’ PE/PB β†’ medians…")
t_enrich_start = time.perf_counter()
enrichment, enrich_output = _run_with_captured_output(
enrich_csv,
str(input_path),
config=EnrichmentConfig(
enabled=True,
max_cells=None,
resolve_scheme_codes=True,
enable_nav_engine=True,
impute_unresolved=True,
),
live_callback=_live_sink,
)
t_enrich_end = time.perf_counter()
enrich_secs = t_enrich_end - t_enrich_start
_log_session_block("enrichment_output", enrich_output)
_log_session_event(
f"enrichment_done "
f"checked={enrichment.examined_cells} "
f"nav={enrichment.nav_cells} "
f"imputed={enrichment.imputed_cells} "
f"skipped={enrichment.skipped_cells} "
f"codes={enrichment.resolved_codes} "
f"secs={enrich_secs:.1f}"
)
st.write(
f" βœ… Enrichment done in **{enrich_secs:.0f}s** β€” "
f"checked {enrichment.examined_cells} cells, "
f"NAV filled {enrichment.nav_cells}, "
f"imputed {enrichment.imputed_cells}"
)
pipeline_input_path = Path(enrichment.enriched_csv_path)
# ── Phase 2: Scoring + Excel ───────────────────────
st.write("**2/2 Scoring engine** β€” computing scores, ranking, generating Excel…")
t_engine_start = time.perf_counter()
funds, engine_output = _run_with_captured_output(
run_data_engine,
csv_path=str(pipeline_input_path),
output_path=str(generated_path),
use_comprehensive_scoring=True,
live_callback=_live_sink,
)
t_engine_end = time.perf_counter()
engine_secs = t_engine_end - t_engine_start
total_secs = time.perf_counter() - t_total_start
_log_session_block("engine_output", engine_output)
_log_session_event(
f"engine_done funds={len(funds)} "
f"secs={engine_secs:.1f} total={total_secs:.1f}"
)
st.write(
f" βœ… Scoring done in **{engine_secs:.0f}s** β€” "
f"{len(funds)} funds scored"
)
status.update(
label=f"βœ… Complete β€” {total_secs:.0f}s total",
state="complete",
expanded=False,
)
except Exception as exc:
err_text = "".join(traceback.format_exception(exc))
_log_session_block("run_failure", err_text)
_log_session_event(f"run_failed error={exc}")
st.error("Run failed. See terminal for traceback.")
st.code(err_text, language="text")
return
# ── Summary ────────────────────────────────────────────────
if enrichment.errors:
st.warning("Enrichment completed with warnings β€” check scratchpad for details.")
if enrichment.scratchpad_path:
st.caption(f"Scratchpad: `{enrichment.scratchpad_path}`")
enrichment_summary = (
f"Enrichment: {enrichment.examined_cells} cells checked β€” "
f"NAV filled {enrichment.nav_cells}, "
f"imputed {enrichment.imputed_cells}, "
f"skipped {enrichment.skipped_cells}."
)
timing_html = (
f'<div class="mf-timing">'
f'⏱ Enrichment: <strong>{enrich_secs:.0f}s</strong> &nbsp;|&nbsp; '
f'Scoring: <strong>{engine_secs:.0f}s</strong> &nbsp;|&nbsp; '
f'Total: <strong>{total_secs:.0f}s ({total_secs/60:.1f} min)</strong>'
f"{'&nbsp; 🎯 Under 3 min!' if total_secs < 180 else ''}"
f'</div>'
)
with generated_path.open("rb") as f:
generated_bytes = io.BytesIO(f.read())
generated_filename = generated_path.name
funds_count = len(funds)
categories_count = len({f.category for f in funds})
st.success("Step 3 Β· Excel ready β€” download below.")
if enrichment_summary:
st.info(enrichment_summary)
# ── Download area (persists after rerun) ──────────────────────────
if generated_bytes and generated_filename:
if timing_html:
st.markdown(timing_html, unsafe_allow_html=True)
st.markdown(
"""
<div class="mf-metrics">
<div class="mf-metric">
<div class="mf-metric-label">Schemes scored</div>
<div class="mf-metric-value">{funds_count}</div>
</div>
<div class="mf-metric">
<div class="mf-metric-label">Categories</div>
<div class="mf-metric-value">{categories_count}</div>
</div>
<div class="mf-metric">
<div class="mf-metric-label">Output format</div>
<div class="mf-metric-value">Excel (.xlsx)</div>
</div>
</div>
""".format(
funds_count=funds_count or 0,
categories_count=categories_count or 0,
),
unsafe_allow_html=True,
)
st.markdown(
'<div class="mf-download-label">Download the scored workbook:</div>',
unsafe_allow_html=True,
)
st.download_button(
label="⬇️ Download processed Excel",
data=generated_bytes.getvalue(),
file_name=generated_filename,
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
use_container_width=True,
)
with tab_about:
st.markdown("### What the pipeline does")
st.markdown(
"""
| Phase | What happens |
|---|---|
| **0 β€” Scheme resolution** | Parallel fuzzy-match of missing AMFI scheme codes (8 threads) |
| **1 β€” NAV engine** | Trailing 3Y risk metrics computed from mfapi NAV history (12 threads) |
| **2 β€” PE/PB engine** | Active funds: AMFI monthly holdings weighted PE/PB (same as Groww). Index funds: NSE index API |
| **3 β€” Median impute** | Category median fills remaining gaps for β‰₯3Y funds. Young funds (<3Y) marked NA |
| **4 β€” Scoring** | Top/Bottom 10 per category, 10-point weighted model |
| **5 β€” Excel export** | Conditional formatting, quartile bands, benchmark rows |
**Cache**: NAV history is cached in Neon (production) or SQLite (local) with a 7-day TTL.
Second runs are near-instant for cached funds.
"""
)
st.markdown("</section>", unsafe_allow_html=True)
st.markdown("</div>", unsafe_allow_html=True)
if __name__ == "__main__":
main()