| import os |
| import re |
| import json |
| import time |
| import traceback |
| from pathlib import Path |
| from typing import Dict, Any, List, Optional, Tuple |
|
|
| import pandas as pd |
| import gradio as gr |
| import papermill as pm |
|
|
| |
| try: |
| from huggingface_hub import InferenceClient |
| except Exception: |
| InferenceClient = None |
|
|
| |
| |
| |
|
|
| BASE_DIR = Path(__file__).resolve().parent |
|
|
| NB1 = os.environ.get("NB1", "pythonanalysis.ipynb").strip() |
| NB2 = os.environ.get("NB2", "ranalysis.ipynb").strip() |
|
|
| RUNS_DIR = BASE_DIR / "runs" |
| ART_DIR = BASE_DIR / "artifacts" |
| PY_FIG_DIR = ART_DIR / "py" / "figures" |
| PY_TAB_DIR = ART_DIR / "py" / "tables" |
| R_FIG_DIR = ART_DIR / "r" / "figures" |
| R_TAB_DIR = ART_DIR / "r" / "tables" |
|
|
| PAPERMILL_TIMEOUT = int(os.environ.get("PAPERMILL_TIMEOUT", "1800")) |
| MAX_PREVIEW_ROWS = int(os.environ.get("MAX_FILE_PREVIEW_ROWS", "50")) |
| MAX_LOG_CHARS = int(os.environ.get("MAX_LOG_CHARS", "8000")) |
|
|
| HF_API_KEY = os.environ.get("HF_API_KEY", "").strip() |
| MODEL_NAME = os.environ.get("MODEL_NAME", "deepseek-ai/DeepSeek-R1").strip() |
| HF_PROVIDER = os.environ.get("HF_PROVIDER", "novita").strip() |
|
|
| LLM_ENABLED = bool(HF_API_KEY) and InferenceClient is not None |
| llm_client = ( |
| InferenceClient(provider=HF_PROVIDER, api_key=HF_API_KEY) |
| if LLM_ENABLED |
| else None |
| ) |
|
|
| |
| |
| |
|
|
| def ensure_dirs(): |
| for p in [RUNS_DIR, ART_DIR, PY_FIG_DIR, PY_TAB_DIR, R_FIG_DIR, R_TAB_DIR]: |
| p.mkdir(parents=True, exist_ok=True) |
|
|
| def stamp(): |
| return time.strftime("%Y%m%d-%H%M%S") |
|
|
| def tail(text: str, n: int = MAX_LOG_CHARS) -> str: |
| return (text or "")[-n:] |
|
|
| def _ls(dir_path: Path, exts: Tuple[str, ...]) -> List[str]: |
| if not dir_path.is_dir(): |
| return [] |
| return sorted(p.name for p in dir_path.iterdir() if p.is_file() and p.suffix.lower() in exts) |
|
|
| def _read_csv(path: Path) -> pd.DataFrame: |
| return pd.read_csv(path, nrows=MAX_PREVIEW_ROWS) |
|
|
| def _read_json(path: Path): |
| with path.open(encoding="utf-8") as f: |
| return json.load(f) |
|
|
| def artifacts_index() -> Dict[str, Any]: |
| return { |
| "python": { |
| "figures": _ls(PY_FIG_DIR, (".png", ".jpg", ".jpeg")), |
| "tables": _ls(PY_TAB_DIR, (".csv", ".json")), |
| }, |
| "r": { |
| "figures": _ls(R_FIG_DIR, (".png", ".jpg", ".jpeg")), |
| "tables": _ls(R_TAB_DIR, (".csv", ".json")), |
| }, |
| } |
|
|
| |
| |
| |
|
|
| def run_notebook(nb_name: str) -> str: |
| ensure_dirs() |
| nb_in = BASE_DIR / nb_name |
| if not nb_in.exists(): |
| return f"ERROR: {nb_name} not found." |
|
|
| nb_out = RUNS_DIR / f"run_{stamp()}_{nb_name}" |
|
|
| pm.execute_notebook( |
| input_path=str(nb_in), |
| output_path=str(nb_out), |
| cwd=str(BASE_DIR), |
| log_output=True, |
| progress_bar=False, |
| request_save_on_cell_execute=True, |
| execution_timeout=PAPERMILL_TIMEOUT, |
| ) |
|
|
| return f"Executed {nb_name}" |
|
|
|
|
| def run_pythonanalysis() -> str: |
| try: |
| log = run_notebook(NB1) |
| idx = artifacts_index() |
| figs = idx["python"]["figures"] |
| tabs = idx["python"]["tables"] |
| return ( |
| f"OK {log}\n\n" |
| f"Figures: {', '.join(figs) or '(none)'}\n" |
| f"Tables: {', '.join(tabs) or '(none)'}" |
| ) |
| except Exception as e: |
| return f"FAILED {e}\n\n{traceback.format_exc()[-2000:]}" |
|
|
| def run_ranalysis() -> str: |
| try: |
| log = run_notebook(NB2) |
| idx = artifacts_index() |
| figs = idx["r"]["figures"] |
| tabs = idx["r"]["tables"] |
| return ( |
| f"OK {log}\n\n" |
| f"Figures: {', '.join(figs) or '(none)'}\n" |
| f"Tables: {', '.join(tabs) or '(none)'}" |
| ) |
| except Exception as e: |
| return f"FAILED {e}\n\n{traceback.format_exc()[-2000:]}" |
|
|
|
|
| def run_full_pipeline() -> str: |
| logs = [] |
| logs.append("=" * 50) |
| logs.append("STEP 1/2: Python Analysis") |
| logs.append("=" * 50) |
| logs.append(run_pythonanalysis()) |
| logs.append("") |
| logs.append("=" * 50) |
| logs.append("STEP 2/2: R Analysis") |
| logs.append("=" * 50) |
| logs.append(run_ranalysis()) |
| return "\n".join(logs) |
|
|
|
|
| |
| |
| |
|
|
| def _load_all_figures() -> List[Tuple[str, str]]: |
| """Return list of (filepath, caption) for Gallery.""" |
| items = [] |
| for p in sorted(PY_FIG_DIR.glob("*.png")): |
| items.append((str(p), f"Python | {p.stem.replace('_', ' ').title()}")) |
| for p in sorted(R_FIG_DIR.glob("*.png")): |
| items.append((str(p), f"R | {p.stem.replace('_', ' ').title()}")) |
| return items |
|
|
|
|
| def _load_table_safe(path: Path) -> pd.DataFrame: |
| try: |
| if path.suffix == ".json": |
| obj = _read_json(path) |
| if isinstance(obj, dict): |
| return pd.DataFrame([obj]) |
| return pd.DataFrame(obj) |
| return _read_csv(path) |
| except Exception as e: |
| return pd.DataFrame([{"error": str(e)}]) |
|
|
|
|
| def refresh_gallery(): |
| """Called when user clicks Refresh on Gallery tab.""" |
| figures = _load_all_figures() |
| idx = artifacts_index() |
|
|
| |
| table_choices = [] |
| for scope in ("python", "r"): |
| for name in idx[scope]["tables"]: |
| table_choices.append(f"{scope}/{name}") |
|
|
| |
| default_df = pd.DataFrame() |
| if table_choices: |
| parts = table_choices[0].split("/", 1) |
| base = PY_TAB_DIR if parts[0] == "python" else R_TAB_DIR |
| default_df = _load_table_safe(base / parts[1]) |
|
|
| return ( |
| figures if figures else [], |
| gr.update(choices=table_choices, value=table_choices[0] if table_choices else None), |
| default_df, |
| ) |
|
|
|
|
| def on_table_select(choice: str): |
| if not choice or "/" not in choice: |
| return pd.DataFrame([{"hint": "Select a table above."}]) |
| scope, name = choice.split("/", 1) |
| base = {"python": PY_TAB_DIR, "r": R_TAB_DIR}.get(scope) |
| if not base: |
| return pd.DataFrame([{"error": f"Unknown scope: {scope}"}]) |
| path = base / name |
| if not path.exists(): |
| return pd.DataFrame([{"error": f"File not found: {path}"}]) |
| return _load_table_safe(path) |
|
|
|
|
| |
| |
| |
|
|
| def load_kpis() -> Dict[str, Any]: |
| for candidate in [PY_TAB_DIR / "kpis.json", PY_FIG_DIR / "kpis.json"]: |
| if candidate.exists(): |
| try: |
| return _read_json(candidate) |
| except Exception: |
| pass |
| return {} |
|
|
|
|
| |
| |
| |
|
|
| DASHBOARD_SYSTEM = """You are an AI dashboard assistant for a book-sales analytics app. |
| The user asks questions or requests about their data. You have access to pre-computed |
| artifacts from Python and R analysis pipelines. |
| |
| AVAILABLE ARTIFACTS (only reference ones that exist): |
| {artifacts_json} |
| |
| KPI SUMMARY: {kpis_json} |
| |
| YOUR JOB: |
| 1. Answer the user's question conversationally using the KPIs and your knowledge of the artifacts. |
| 2. At the END of your response, output a JSON block (fenced with ```json ... ```) that tells |
| the dashboard which artifact to display. The JSON must have this shape: |
| {{"show": "figure"|"table"|"none", "scope": "python"|"r", "filename": "..."}} |
| |
| - Use "show": "figure" to display a chart image. |
| - Use "show": "table" to display a CSV/JSON table. |
| - Use "show": "none" if no artifact is relevant. |
| |
| RULES: |
| - If the user asks about sales trends or forecasting by title, show sales_trends or arima figures. |
| - If the user asks about sentiment, show sentiment figure or sentiment_counts table. |
| - If the user asks about R regression, the R notebook focuses on forecasting, show accuracy_table.csv. |
| - If the user asks about forecast accuracy or model comparison, show accuracy_table.csv or forecast_compare.png. |
| - If the user asks about top sellers, show top_titles_by_units_sold.csv. |
| - If the user asks a general data question, pick the most relevant artifact. |
| - Keep your answer concise (2-4 sentences), then the JSON block. |
| """ |
|
|
| JSON_BLOCK_RE = re.compile(r"```json\s*(\{.*?\})\s*```", re.DOTALL) |
| FALLBACK_JSON_RE = re.compile(r"\{[^{}]*\"show\"[^{}]*\}", re.DOTALL) |
|
|
|
|
| def _parse_display_directive(text: str) -> Dict[str, str]: |
| m = JSON_BLOCK_RE.search(text) |
| if m: |
| try: |
| return json.loads(m.group(1)) |
| except json.JSONDecodeError: |
| pass |
| m = FALLBACK_JSON_RE.search(text) |
| if m: |
| try: |
| return json.loads(m.group(0)) |
| except json.JSONDecodeError: |
| pass |
| return {"show": "none"} |
|
|
|
|
| def _clean_response(text: str) -> str: |
| """Strip the JSON directive block from the displayed response.""" |
| return JSON_BLOCK_RE.sub("", text).strip() |
|
|
|
|
| def ai_chat(user_msg: str, history: list): |
| """Chat function for the AI Dashboard tab.""" |
| if not user_msg or not user_msg.strip(): |
| return history, "", None, None |
|
|
| idx = artifacts_index() |
| kpis = load_kpis() |
|
|
| if not LLM_ENABLED: |
| reply, directive = _keyword_fallback(user_msg, idx, kpis) |
| else: |
| system = DASHBOARD_SYSTEM.format( |
| artifacts_json=json.dumps(idx, indent=2), |
| kpis_json=json.dumps(kpis, indent=2) if kpis else "(no KPIs yet, run the pipeline first)", |
| ) |
| msgs = [{"role": "system", "content": system}] |
| for entry in (history or [])[-6:]: |
| msgs.append(entry) |
| msgs.append({"role": "user", "content": user_msg}) |
|
|
| try: |
| r = llm_client.chat_completion( |
| model=MODEL_NAME, |
| messages=msgs, |
| temperature=0.3, |
| max_tokens=600, |
| stream=False, |
| ) |
| raw = ( |
| r["choices"][0]["message"]["content"] |
| if isinstance(r, dict) |
| else r.choices[0].message.content |
| ) |
| directive = _parse_display_directive(raw) |
| reply = _clean_response(raw) |
| except Exception as e: |
| reply = f"LLM error: {e}. Falling back to keyword matching." |
| reply_fb, directive = _keyword_fallback(user_msg, idx, kpis) |
| reply += "\n\n" + reply_fb |
|
|
| |
| fig_out = None |
| tab_out = None |
| show = directive.get("show", "none") |
| scope = directive.get("scope", "") |
| fname = directive.get("filename", "") |
|
|
| if show == "figure" and scope and fname: |
| base = {"python": PY_FIG_DIR, "r": R_FIG_DIR}.get(scope) |
| if base and (base / fname).exists(): |
| fig_out = str(base / fname) |
| else: |
| reply += f"\n\n*(Could not find figure: {scope}/{fname})*" |
|
|
| if show == "table" and scope and fname: |
| base = {"python": PY_TAB_DIR, "r": R_TAB_DIR}.get(scope) |
| if base and (base / fname).exists(): |
| tab_out = _load_table_safe(base / fname) |
| else: |
| reply += f"\n\n*(Could not find table: {scope}/{fname})*" |
|
|
| new_history = (history or []) + [ |
| {"role": "user", "content": user_msg}, |
| {"role": "assistant", "content": reply}, |
| ] |
|
|
| return new_history, "", fig_out, tab_out |
|
|
|
|
| def _keyword_fallback(msg: str, idx: Dict, kpis: Dict) -> Tuple[str, Dict]: |
| """Simple keyword matcher when LLM is unavailable.""" |
| msg_lower = msg.lower() |
|
|
| if not any(idx[s]["figures"] or idx[s]["tables"] for s in ("python", "r")): |
| return ( |
| "No artifacts found yet. Please run the pipeline first (Tab 1), " |
| "then come back here to explore the results.", |
| {"show": "none"}, |
| ) |
|
|
| kpi_text = "" |
| if kpis: |
| total = kpis.get("total_units_sold", 0) |
| kpi_text = ( |
| f"Quick summary: **{kpis.get('n_titles', '?')}** book titles across " |
| f"**{kpis.get('n_months', '?')}** months, with **{total:,.0f}** total units sold." |
| ) |
|
|
| if any(w in msg_lower for w in ["trend", "sales trend", "monthly sale"]): |
| return ( |
| f"Here are the sales trends for sampled titles. {kpi_text}", |
| {"show": "figure", "scope": "python", "filename": "sales_trends_sampled_titles.png"}, |
| ) |
|
|
| if any(w in msg_lower for w in ["sentiment", "review", "positive", "negative"]): |
| return ( |
| f"Here is the sentiment distribution across sampled book titles. {kpi_text}", |
| {"show": "figure", "scope": "python", "filename": "sentiment_distribution_sampled_titles.png"}, |
| ) |
|
|
| if any(w in msg_lower for w in ["arima", "forecast", "predict"]): |
| if "compar" in msg_lower or "ets" in msg_lower or "accuracy" in msg_lower: |
| if "forecast_compare.png" in idx.get("r", {}).get("figures", []): |
| return ( |
| "Here is the ARIMA+Fourier vs ETS forecast comparison from the R analysis.", |
| {"show": "figure", "scope": "r", "filename": "forecast_compare.png"}, |
| ) |
| return ( |
| f"Here are the ARIMA forecasts for sampled titles from the Python analysis. {kpi_text}", |
| {"show": "figure", "scope": "python", "filename": "arima_forecasts_sampled_titles.png"}, |
| ) |
|
|
| if any(w in msg_lower for w in ["regression", "lm", "coefficient", "price effect", "rating effect"]): |
| return ( |
| "The R notebook focuses on forecasting rather than regression. " |
| "Here is the forecast accuracy comparison instead.", |
| {"show": "table", "scope": "r", "filename": "accuracy_table.csv"}, |
| ) |
|
|
| if any(w in msg_lower for w in ["top", "best sell", "popular", "rank"]): |
| return ( |
| f"Here are the top-selling titles by units sold. {kpi_text}", |
| {"show": "table", "scope": "python", "filename": "top_titles_by_units_sold.csv"}, |
| ) |
|
|
| if any(w in msg_lower for w in ["accuracy", "benchmark", "rmse", "mape"]): |
| return ( |
| "Here is the forecast accuracy comparison (ARIMA+Fourier vs ETS) from the R analysis.", |
| {"show": "table", "scope": "r", "filename": "accuracy_table.csv"}, |
| ) |
|
|
| if any(w in msg_lower for w in ["r analysis", "r output", "r result"]): |
| if "forecast_compare.png" in idx.get("r", {}).get("figures", []): |
| return ( |
| "Here is the main R output: forecast model comparison plot.", |
| {"show": "figure", "scope": "r", "filename": "forecast_compare.png"}, |
| ) |
|
|
| if any(w in msg_lower for w in ["dashboard", "overview", "summary", "kpi"]): |
| return ( |
| f"Dashboard overview: {kpi_text}\n\nAsk me about sales trends, sentiment, forecasts, " |
| "forecast accuracy, or top sellers to see specific visualizations.", |
| {"show": "table", "scope": "python", "filename": "df_dashboard.csv"}, |
| ) |
|
|
| |
| return ( |
| f"I can show you various analyses. {kpi_text}\n\n" |
| "Try asking about: **sales trends**, **sentiment**, **ARIMA forecasts**, " |
| "**forecast accuracy**, **top sellers**, or **dashboard overview**.", |
| {"show": "none"}, |
| ) |
|
|
|
|
| |
| |
| |
|
|
| ensure_dirs() |
|
|
| def load_css() -> str: |
| css_path = BASE_DIR / "style.css" |
| return css_path.read_text(encoding="utf-8") if css_path.exists() else "" |
|
|
|
|
| with gr.Blocks(title="RX12 Workshop App") as demo: |
|
|
| gr.Markdown( |
| "# RX12 - Intro to Python and R - Workshop App\n" |
| "*The app to integrate the three notebooks in to get a functioning blueprint of the group project's final product*", |
| elem_id="escp_title", |
| ) |
|
|
| |
| |
| |
| with gr.Tab("Pipeline Runner"): |
| gr.Markdown( |
| ) |
|
|
| with gr.Row(): |
| with gr.Column(scale=1): |
| btn_nb1 = gr.Button( |
| "Step 1: Python Analysis", |
| variant="secondary", |
| ) |
| gr.Markdown( |
| ) |
| with gr.Column(scale=1): |
| btn_nb2 = gr.Button( |
| "Step 2: R Analysis", |
| variant="secondary", |
| ) |
| gr.Markdown( |
| ) |
|
|
| with gr.Row(): |
| btn_all = gr.Button( |
| "Run All 2 Steps", |
| variant="primary", |
| ) |
|
|
| run_log = gr.Textbox( |
| label="Execution Log", |
| lines=18, |
| max_lines=30, |
| interactive=False, |
| ) |
|
|
| btn_nb1.click(run_pythonanalysis, outputs=[run_log]) |
| btn_nb2.click(run_ranalysis, outputs=[run_log]) |
| btn_all.click(run_full_pipeline, outputs=[run_log]) |
|
|
| |
| |
| |
| with gr.Tab("Results Gallery"): |
| gr.Markdown( |
| "### All generated artifacts\n\n" |
| "After running the pipeline, click **Refresh** to load all figures and tables. " |
| "Figures are shown in the gallery; select a table from the dropdown to inspect it." |
| ) |
|
|
| refresh_btn = gr.Button("Refresh Gallery", variant="primary") |
|
|
| gr.Markdown("#### Figures") |
| gallery = gr.Gallery( |
| label="All Figures (Python + R)", |
| columns=2, |
| height=480, |
| object_fit="contain", |
| ) |
|
|
| gr.Markdown("#### Tables") |
| table_dropdown = gr.Dropdown( |
| label="Select a table to view", |
| choices=[], |
| interactive=True, |
| ) |
| table_display = gr.Dataframe( |
| label="Table Preview", |
| interactive=False, |
| ) |
|
|
| refresh_btn.click( |
| refresh_gallery, |
| outputs=[gallery, table_dropdown, table_display], |
| ) |
| table_dropdown.change( |
| on_table_select, |
| inputs=[table_dropdown], |
| outputs=[table_display], |
| ) |
|
|
| |
| |
| |
| with gr.Tab('"AI" Dashboard'): |
| gr.Markdown( |
| "### Ask questions, get visualisations\n\n" |
| "Describe what you want to see and the AI will pick the right chart or table. " |
| + ( |
| "*LLM is active.*" |
| if LLM_ENABLED |
| else "*No API key detected \u2014 using keyword matching. " |
| "Set `HF_API_KEY` in Space secrets for full LLM support.*" |
| ) |
| ) |
|
|
| with gr.Row(equal_height=True): |
| with gr.Column(scale=1): |
| chatbot = gr.Chatbot( |
| label="Conversation", |
| height=380, |
| ) |
| user_input = gr.Textbox( |
| label="Ask about your data", |
| placeholder="e.g. Show me sales trends / What drives revenue? / Compare forecast models", |
| lines=1, |
| ) |
| gr.Examples( |
| examples=[ |
| "Show me the sales trends", |
| "What does the sentiment look like?", |
| "Which titles sell the most?", |
| "Show the forecast accuracy comparison", |
| "Compare the ARIMA and ETS forecasts", |
| "Give me a dashboard overview", |
| ], |
| inputs=user_input, |
| ) |
|
|
| with gr.Column(scale=1): |
| ai_figure = gr.Image( |
| label="Visualisation", |
| height=350, |
| ) |
| ai_table = gr.Dataframe( |
| label="Data Table", |
| interactive=False, |
| ) |
|
|
| user_input.submit( |
| ai_chat, |
| inputs=[user_input, chatbot], |
| outputs=[chatbot, user_input, ai_figure, ai_table], |
| ) |
|
|
|
|
| PORT = int(os.environ.get("PORT", os.environ.get("GRADIO_SERVER_PORT", "7860"))) |
|
|
| demo.launch( |
| server_name="0.0.0.0", |
| server_port=PORT, |
| css=load_css(), |
| allowed_paths=[str(BASE_DIR)], |
| ) |
|
|