""" Code to run streamlit dashboard """ import glob from typing import Optional import os import time import datetime import streamlit as st import pandas as pd import plotly.graph_objects as go from ffi_reports.generate_reports import generate_finance_reports from db_utils import generate_db class ReportsHelper: DEFAULT_DB_PATH = "./data/generated_txns.db" AVAILABLE_PRODUCTS = [ "ProdA", "ProdB" ] DEFAULT_GENERAL_REPORTS = [ "management_summary" ] DEFAULT_PRODUCT_REPORTS = [ "summary_report", "exception_report_missing_transactions", "exception_report_missing_recon", "exception_report_amount_mismatch" ] def __init__(self, reports_dir: Optional[str] = "./data/reports"): """ Initialize the ReportsHelper object which helps find the reports. Args: reports_dir (str, optional): The directory where the reports are stored. """ self.reports_dir = reports_dir self.product_reports = ReportsHelper.DEFAULT_PRODUCT_REPORTS self.general_reports = ReportsHelper.DEFAULT_GENERAL_REPORTS self.available_products = ReportsHelper.AVAILABLE_PRODUCTS self.db_path = ReportsHelper.DEFAULT_DB_PATH if not os.path.exists(self.db_path): load_str = f"Generating database at: {self.db_path} " \ f"since running for the first time. This may take a while..." print(load_str) with st.spinner(load_str): generate_db.main(1000, 1000000) else: print(f"Database found at: {self.db_path}") def get_report_filepath(self, report_type: str, product_name: Optional[str] = None, reports_dir: Optional[str] = "./data/reports") -> str: """ Get the filepath for a report. Args: report_type (str): The type of report to get the filepath for. One of: - management_summary - summary_report - exception_report_missing_transactions - exception_report_missing_recon - exception_report_amount_mismatch product_name (str): The name of the product to get the report for. Required for product reports. reports_dir (str, optional): The directory where the reports are stored. Returns: """ report_type = report_type.lower() if report_type in self.product_reports and product_name is None: raise ValueError("'product_name' must be provided for product reports.") if report_type == 'management_summary': management_summaries = glob.glob(os.path.join(reports_dir, "management_summary_*.csv")) if len(management_summaries) == 0: report_file_name = report_type + ".csv" else: report_file_name = os.path.basename(max(management_summaries, key=os.path.getctime)) elif report_type in self.product_reports: report_file_name = f"{product_name}_{report_type}.csv" else: raise ValueError(f"Invalid 'report_type': {report_type}") return os.path.join(reports_dir, report_file_name) def mock_progress_bar(max_seconds=15, max_steps=100): progress_bar = st.progress(0) seconds_per_step = max_seconds / max_steps for i in range(100): # Perform some work here time.sleep(seconds_per_step) progress_bar.progress(i + 1) # Streamlit app def run_app(): # Start screen to select the start date and end date st.set_page_config(layout="wide") # Set the title and page layout for the report dashboard st.title("Report Dashboard") st.sidebar.title(" :gear:️ Options") st.sidebar.markdown("## Select Date Range") start_date = st.sidebar.date_input("Start Date", datetime.date(2022, 1, 1)) end_date = st.sidebar.date_input("End Date", datetime.date.today()) # Button to generate reports if st.sidebar.button("(Re)generate Reports"): # Generate reports based on start and end date with st.spinner("Generating reports..."): start_date_str = start_date.strftime('%Y-%m-%d') end_date_str = end_date.strftime('%Y-%m-%d') generate_finance_reports(start_date_str, end_date_str) st.success("Reports generated successfully!") st.sidebar.markdown("## Choose the report type and product") reports_helper = ReportsHelper() # Get the report type and product selection from the sidebar report_type_formatted = st.sidebar.selectbox("Report Type", reports_helper.general_reports + reports_helper.product_reports) report_type = report_type_formatted.lower().replace(" ", "_") product_name = None if report_type in reports_helper.product_reports: product_name = st.sidebar.selectbox("Product", reports_helper.available_products) # Display the original table (csv) on the left side of the screen col1, col2 = st.columns([1, 1]) with col1: st.subheader("Table Explorer") filepath = os.path.abspath(reports_helper.get_report_filepath(report_type, product_name)) if not os.path.exists(filepath): st.write(f"Report not found: {filepath}") else: df = pd.read_csv(reports_helper.get_report_filepath(report_type, product_name), decimal=".") st.write(df) with col2: if not os.path.exists(filepath): st.write(f"Report not found: {filepath}") else: # Calculate the sum of all number columns number_columns = df.select_dtypes(include=["int", "float"]).columns number_columns = [col for col in number_columns if not col.lower().endswith("_id") and not col.lower().endswith("_type")] sum_values = df[number_columns].sum() if report_type == "management_summary": # Display the number columns as a line chart over the dates st.subheader("Line Chart") line_columns = st.multiselect("Select columns for Line Chart", number_columns, default=number_columns) if line_columns: line_fig = go.Figure() for column in line_columns: line_fig.add_trace(go.Line(x=df["Date"], y=df[column], name=column)) line_fig.update_layout(hovermode="x unified") # Set the hover mode st.plotly_chart(line_fig) # Display the sum of number columns as a bar chart st.subheader("Totals") fig = go.Figure() fig.add_trace(go.Bar(x=sum_values.index, y=sum_values.values, name="", showlegend=False)) fig.update_layout(xaxis_title="Column", yaxis_title="Sum") st.plotly_chart(fig) if __name__ == "__main__": run_app()