import asyncio from datetime import datetime, date, time from pathlib import Path from faicons import icon_svg from pandas import DataFrame, to_datetime from plotnine import ggplot, labs from modules import ( DF, LAST_UPDATED, START_DATE, WINDOW_OPEN_DATE, GET_SIGNIFICANT, METADATA, AGENCIES, groupby_agency, groupby_date, add_weeks_to_data, pad_missing_dates, plot_agency, plot_tf, ) from shiny import reactive from shiny.express import input, render, ui TITLE = "CRA Window Tracker - GW Regulatory Studies Center" HEADER = "Rules in the Congressional Review Act (CRA) Window" FOOTER = f""" ----- © 2024 [GW Regulatory Studies Center](https://go.gwu.edu/regstudies). See our page on the [Congressional Review Act](https://regulatorystudies.columbian.gwu.edu/congressional-review-act) for more information. """ ui.include_css( Path(__file__).parent.joinpath("www") / "style.css") ui.tags.title(TITLE) sidebar_logo = ui.HTML( f"""
Regulatory Studies Center logo
""" ) page_header = ui.HTML( f"""
{HEADER}
""" ) page_header with ui.sidebar(open={"desktop": "open", "mobile": "closed"}): sidebar_logo ui.input_date("start_date", "Select start of window", value=WINDOW_OPEN_DATE, min=START_DATE, max=date.today()) ui.input_select("menu_agency", "Select agencies", choices=["all"] + AGENCIES, selected="all", multiple=True) ui.input_select("frequency", "Select frequency", choices=["daily", "monthly", "weekly"], selected="daily") #ui.input_switch("switch", "Stack significant rules in plots", False) with ui.layout_column_wrap(): with ui.value_box(showcase=icon_svg("book")): "All final rules" @render.text def count_rules(): return f"{filtered_df()['document_number'].count()}" ui.input_action_button("filter_all", "View", ) #class_="btn-success") with ui.value_box(showcase=icon_svg("book")): "Section 3(f)(1) Significant rules *" @render.text def count_3f1_significant(): output = "Not available" if GET_SIGNIFICANT: output = f"{filtered_df()['3f1_significant'].sum()}" return output ui.input_action_button("filter_3f1", "View", ) #class_="btn-success") with ui.value_box(showcase=icon_svg("book")): "Other Significant rules *" @render.text def count_other_significant(): output = "Not available" if GET_SIGNIFICANT: output = f"{filtered_df()['other_significant'].sum()}" return output ui.input_action_button("filter_other", "View", ) ui.markdown( f""" \* *Executive Order 12866 significance data last updated **{LAST_UPDATED}***. """ ) with ui.navset_card_underline(title=""): with ui.nav_panel("Rules in detail"): @render.data_frame def table_rule_detail(): df = filtered_sig().copy() df.loc[:, "date"] = df.loc[:, "publication_date"].apply(lambda x: f"{x.date()}") char, limit = " ", 10 df.loc[:, "title"] = df["title"].apply(lambda x: x if len(x.split(char)) < (limit + 1) else f"{char.join(x.split(char)[:limit])}...") df.loc[:, "agencies"] = df["parent_slug"].apply(lambda x: "; ".join(x)) cols = [ "date", "title", "agencies", "3f1_significant", "other_significant", ] return render.DataGrid(df.loc[:, [c for c in cols if c in df.columns]], width="100%") with ui.nav_panel("Over time"): with ui.layout_columns(): with ui.card(full_screen=True): @render.plot def plot_over_time(value_col: str = "rules"): grouped = get_grouped_data_over_time() values = grouped.loc[:, value_col].to_numpy() count_gte_zero = sum(1 if g > 0 else 0 for g in values) max_val = max(values, default=0) if (max_val < 2) or (count_gte_zero < 2): return ( ggplot() + labs(title="Not enough data available to visualize.") ) else: return plot_tf( grouped, input.frequency() ) with ui.card(full_screen=True): @render.data_frame def table_over_time(): grouped = get_grouped_data_over_time() date_cols = ["publication_date", "week_of", ] if any(d in grouped.columns for d in date_cols): grouped = grouped.astype({d: "str" for d in date_cols if d in grouped.columns}, errors="ignore") grouped = grouped.rename(columns={ "publication_year": "year", "publication_month": "month", "publication_date": "date", }, errors="ignore") cols = [ "date", "year", "month", "week_of", "rules", "3f1_significant", "other_significant", ] return render.DataTable(grouped.loc[:, [c for c in cols if c in grouped.columns]]) with ui.nav_panel("By agency"): with ui.layout_columns(): with ui.card(full_screen=True): @render.plot def plot_by_agency(): grouped = grouped_df_agency() #if input.switch(): # pass # # placeholder for stacked bar chart plot = plot_agency( grouped.head(10), ) return plot with ui.card(full_screen=True): @render.data_frame def table_by_agency(): grouped = grouped_df_agency() cols = [ "agency", "acronym", "rules", "3f1_significant", "other_significant", ] return render.DataTable(grouped.loc[:, [c for c in cols if c in grouped.columns]]) with ui.accordion(open=False): with ui.accordion_panel("Download Data"): @render.download( label="Download data as CSV", filename=f"rules_in_cra_window_accessed_{date.today()}.csv", ) async def download( output_cols: tuple | list = ( "document_number", "citation", "publication_date", "title", "type", "action", "abstract", "docket_ids", "json_url", "html_url", "agencies", "independent_reg_agency", "parent_slug", "subagency_slug", "president_id", "significant", "3f1_significant", "other_significant" ) ): await asyncio.sleep(0.25) yield filtered_df().loc[:, output_cols].to_csv(index=False) with ui.accordion(open=False): with ui.accordion_panel("Notes"): ui.markdown( f""" Rule data retrieved from the [Federal Register API](https://www.federalregister.gov/developers/documentation/api/v1). The window for the CRA lookback period is [estimated](https://www.huntonak.com/the-nickel-report/federal-agencies-face-looming-congressional-review-act-deadline) to open on May 23, 2024. """ ) ui.markdown( FOOTER ) # ----- REACTIVE CALCULATIONS ----- # @reactive.calc def filtered_df(): filt_df = DF # filter dates try: filt_df = filt_df.loc[filt_df["publication_date"] >= input.start_date()] except TypeError: filt_df = filt_df.loc[filt_df["publication_date"] >= datetime.combine(input.start_date(), time(0, 0))] # filter agencies if (input.menu_agency() is not None) and ("all" not in input.menu_agency()): bool_agency = [True if sum(selected in agency for selected in input.menu_agency()) > 0 else False for agency in filt_df["parent_slug"]] filt_df = filt_df.loc[bool_agency] return filt_df @reactive.calc def filtered_sig(): filt_df = filtered_df() # filter significance if filter_value.get() == "all": pass elif filter_value.get() == "3f1": filt_df = filt_df.loc[filt_df["3f1_significant"] == 1] elif filter_value.get() == "other": filt_df = filt_df.loc[filt_df["other_significant"] == 1] return filt_df @reactive.calc def grouped_df_month(): filt_df = filtered_sig() grouped = groupby_date(filt_df, significant=GET_SIGNIFICANT) return grouped @reactive.calc def grouped_df_day(): filt_df = filtered_sig() date_col = "publication_date" grouped = groupby_date(filt_df, group_col=date_col, significant=GET_SIGNIFICANT) grouped = pad_missing_dates( grouped, date_col, "days", fill_padded_values={ "rules": 0, "3f1_significant": 0, "other_significant": 0, }) return grouped @reactive.calc def grouped_df_week(): filt_df = filtered_sig() filt_df = add_weeks_to_data(filt_df) try: grouped = groupby_date(filt_df, group_col=("week_number", "week_of"), significant=GET_SIGNIFICANT) grouped = pad_missing_dates( grouped, "week_of", how="weeks", fill_padded_values={ "rules": 0, "3f1_significant": 0, "other_significant": 0, }) except KeyError as err: grouped = DataFrame(columns=["week_number", "week_of", "rules", "3f1_significant", "other_significant"]) return grouped @reactive.calc def grouped_df_agency(): filt_df = filtered_sig() grouped = groupby_agency(filt_df, metadata=METADATA, significant=GET_SIGNIFICANT) return grouped @reactive.calc def get_grouped_data_over_time(): if input.frequency() == "daily": grouped = grouped_df_day() elif input.frequency() == "monthly": grouped = grouped_df_month() elif input.frequency() == "weekly": grouped = grouped_df_week() else: raise ValueError("Only 'daily', 'monthly', or 'weekly' are valid inputs.") return grouped # ----- REACTIVE VALUES ----- # filter_value = reactive.value("all") @reactive.effect @reactive.event(input.filter_all) def _(): filter_value.set("all") filtered_df() @reactive.effect @reactive.event(input.filter_3f1) def _(): filter_value.set("3f1") filtered_df() @reactive.effect @reactive.event(input.filter_other) def _(): filter_value.set("other") filtered_df()