import os import textwrap import nbformat from nbformat import v4 as nbf import logging import json from datetime import datetime, timezone import pandas as pd logger = logging.getLogger(__name__) class Notebook: def __init__(self, data): self.nb = nbf.new_notebook() self.nb.cells = [] self.data = data self.output_path = "tmp/notebook.ipynb" self.imports_added = False def build_notebook(self): success = 0 metrics = self.data logger.info(f"Start building notebook with {len(metrics)} metrics") for i, metric in enumerate(metrics): logger.info(f"Processing metric {i+1} of {len(metrics)}") metric_result = json.loads(metric.get("body")) table_data = metric_result.get("table_data") query_config = metric_result.get("sql_config") chart_data = metric_result.get("chart_data", {}) chart_config = metric_result.get("chart_config", {}) chart_type = metric_result.get("chart_config", {}).get("type", {}) if not self.nb.cells: self._add_title() self._add_metric_header(config=chart_config, index=i) self._add_query_details(query_config=query_config) if not self.imports_added: self._import_libs() self.imports_added = True self._add_table(raw_table=table_data) if chart_data and chart_config: self._chart_code(chart_type=chart_type, chart_data=chart_data, chart_config=chart_config) def export_notebook(self): self.build_notebook() logger.info(f"Exporting notebook to {self.output_path}") os.makedirs(os.path.dirname(self.output_path), exist_ok=True) with open(self.output_path, 'w') as f: nbformat.write(self.nb, f) logger.info(f"Notebook exported successfully to {self.output_path}") return self.output_path def _import_libs(self): self.nb.cells.append(nbf.new_markdown_cell("### 📦 Imports")) self.nb.cells.append(nbf.new_code_cell("import pandas as pd\nimport plotly.express as px")) def _add_title(self): self.nb.cells.append(nbf.new_markdown_cell("# Metrics Dashboard")) def _add_metric_header(self, config, index): self.nb.cells.append(nbf.new_markdown_cell(f"## 📈 Metric {index+1} {config.get('title', '')}\n**Timestamp:** {datetime.now(timezone.utc).isoformat()}")) def _chart_code(self, chart_type, chart_data, chart_config): df_code = f"df = pd.DataFrame({chart_data.get(chart_type).get('data')})" chart_dispatch = { "bar": f""" fig = px.bar(df, x='x', y='y', title='{chart_config['title']}', labels={{'y': '{chart_config['y_axis_label']}', 'x': '{chart_config['x_axis_label']}'}}, color='x', template='plotly_white') """, "line": f""" fig = px.line(df, x='x', y='y', title='{chart_config['title']}', labels={{'y': '{chart_config['y_axis_label']}', 'x': '{chart_config['x_axis_label']}'}}, template='plotly_white') """, "pie": f""" fig = px.pie(df, values='y', names='x', title='{chart_config['title']}', labels={{'y': '{chart_config['y_axis_label']}'}}, template='plotly_white', hole=0.3) fig.update_traces(textposition='inside', textfont_size=12) """, "hist": f""" df['bin_center'] = (df['bin_start'] + df['bin_end']) / 2 fig = px.bar(df, x='bin_center', y='frequency', title='{chart_config['title']}', labels={{'frequency': '{chart_config['y_axis_label']}', 'bin_center': '{chart_config['x_axis_label']}'}}, template='plotly_white') """ } chart_code = f"{df_code}\n{textwrap.dedent(chart_dispatch.get(chart_type))}\nfig.update_layout(showlegend=False)\nfig.show()" self.nb.cells.append(nbf.new_markdown_cell(f"### 📊 Chart")) self.nb.cells.append(nbf.new_code_cell(chart_code)) def _add_query_details(self, query_config): sql_clean = query_config['sql_query'].replace("\\n", "\n").strip() self.nb.cells.append(nbf.new_markdown_cell(f"**User Query:** {query_config['text_query']}\n")) self.nb.cells.append(nbf.new_markdown_cell(f"### 🔍 Query Details\n**SQL:**\n```sql\n{sql_clean}\n```\n**Explanation:** {query_config['explanation']}")) def _add_table(self, raw_table): table_data = pd.DataFrame(raw_table).fillna("NAN").to_dict(orient="records") raw_code = f'raw_table = pd.DataFrame({table_data})\nraw_table.head()' self.nb.cells.append(nbf.new_markdown_cell("### Table")) self.nb.cells.append(nbf.new_code_cell(raw_code))