import asyncio
from datetime import datetime, date, time
from pathlib import Path
from faicons import icon_svg
from pandas import DataFrame, to_datetime
from plotnine import ggplot, labs
from modules import (
DF,
LAST_UPDATED,
START_DATE,
WINDOW_OPEN_DATE,
GET_SIGNIFICANT,
METADATA,
AGENCIES,
groupby_agency,
groupby_date,
add_weeks_to_data,
pad_missing_dates,
plot_agency,
plot_tf,
)
from shiny import reactive
from shiny.express import input, render, ui
TITLE = "CRA Window Tracker - GW Regulatory Studies Center"
HEADER = "Rules in the Congressional Review Act (CRA) Window"
FOOTER = f"""
-----
© 2024 [GW Regulatory Studies Center](https://go.gwu.edu/regstudies). See our page on the [Congressional Review Act](https://regulatorystudies.columbian.gwu.edu/congressional-review-act) for more information.
"""
ui.include_css( Path(__file__).parent.joinpath("www") / "style.css")
ui.tags.title(TITLE)
sidebar_logo = ui.HTML(
f"""
"""
)
page_header = ui.HTML(
f"""
"""
)
page_header
with ui.sidebar(open={"desktop": "open", "mobile": "closed"}):
sidebar_logo
ui.input_date("start_date", "Select start of window", value=WINDOW_OPEN_DATE, min=START_DATE, max=date.today())
ui.input_select("menu_agency", "Select agencies", choices=["all"] + AGENCIES, selected="all", multiple=True)
ui.input_select("frequency", "Select frequency", choices=["daily", "monthly", "weekly"], selected="daily")
#ui.input_switch("switch", "Stack significant rules in plots", False)
with ui.layout_column_wrap():
with ui.value_box(showcase=icon_svg("book")):
"All final rules"
@render.text
def count_rules():
return f"{filtered_df()['document_number'].count()}"
ui.input_action_button("filter_all", "View", ) #class_="btn-success")
with ui.value_box(showcase=icon_svg("book")):
"Section 3(f)(1) Significant rules *"
@render.text
def count_3f1_significant():
output = "Not available"
if GET_SIGNIFICANT:
output = f"{filtered_df()['3f1_significant'].sum()}"
return output
ui.input_action_button("filter_3f1", "View", ) #class_="btn-success")
with ui.value_box(showcase=icon_svg("book")):
"Other Significant rules *"
@render.text
def count_other_significant():
output = "Not available"
if GET_SIGNIFICANT:
output = f"{filtered_df()['other_significant'].sum()}"
return output
ui.input_action_button("filter_other", "View", )
ui.markdown(
f"""
\* *Executive Order 12866 significance data last updated **{LAST_UPDATED}***.
"""
)
with ui.navset_card_underline(title=""):
with ui.nav_panel("Rules in detail"):
@render.data_frame
def table_rule_detail():
df = filtered_sig().copy()
df.loc[:, "date"] = df.loc[:, "publication_date"].apply(lambda x: f"{x.date()}")
char, limit = " ", 10
df.loc[:, "title"] = df["title"].apply(lambda x: x if len(x.split(char)) < (limit + 1) else f"{char.join(x.split(char)[:limit])}...")
df.loc[:, "agencies"] = df["parent_slug"].apply(lambda x: "; ".join(x))
cols = [
"date",
"title",
"agencies",
"3f1_significant",
"other_significant",
]
return render.DataGrid(df.loc[:, [c for c in cols if c in df.columns]], width="100%")
with ui.nav_panel("Over time"):
with ui.layout_columns():
with ui.card(full_screen=True):
@render.plot
def plot_over_time(value_col: str = "rules"):
grouped = get_grouped_data_over_time()
values = grouped.loc[:, value_col].to_numpy()
count_gte_zero = sum(1 if g > 0 else 0 for g in values)
max_val = max(values, default=0)
if (max_val < 2) or (count_gte_zero < 2):
return (
ggplot()
+ labs(title="Not enough data available to visualize.")
)
else:
return plot_tf(
grouped,
input.frequency()
)
with ui.card(full_screen=True):
@render.data_frame
def table_over_time():
grouped = get_grouped_data_over_time()
date_cols = ["publication_date", "week_of", ]
if any(d in grouped.columns for d in date_cols):
grouped = grouped.astype({d: "str" for d in date_cols if d in grouped.columns}, errors="ignore")
grouped = grouped.rename(columns={
"publication_year": "year",
"publication_month": "month",
"publication_date": "date",
}, errors="ignore")
cols = [
"date",
"year",
"month",
"week_of",
"rules",
"3f1_significant",
"other_significant",
]
return render.DataTable(grouped.loc[:, [c for c in cols if c in grouped.columns]])
with ui.nav_panel("By agency"):
with ui.layout_columns():
with ui.card(full_screen=True):
@render.plot
def plot_by_agency():
grouped = grouped_df_agency()
#if input.switch():
# pass
# # placeholder for stacked bar chart
plot = plot_agency(
grouped.head(10),
)
return plot
with ui.card(full_screen=True):
@render.data_frame
def table_by_agency():
grouped = grouped_df_agency()
cols = [
"agency",
"acronym",
"rules",
"3f1_significant",
"other_significant",
]
return render.DataTable(grouped.loc[:, [c for c in cols if c in grouped.columns]])
with ui.accordion(open=False):
with ui.accordion_panel("Download Data"):
@render.download(
label="Download data as CSV",
filename=f"rules_in_cra_window_accessed_{date.today()}.csv",
)
async def download(
output_cols: tuple | list = (
"document_number",
"citation",
"publication_date",
"title",
"type",
"action",
"abstract",
"docket_ids",
"json_url",
"html_url",
"agencies",
"independent_reg_agency",
"parent_slug",
"subagency_slug",
"president_id",
"significant",
"3f1_significant",
"other_significant"
)
):
await asyncio.sleep(0.25)
yield filtered_df().loc[:, output_cols].to_csv(index=False)
with ui.accordion(open=False):
with ui.accordion_panel("Notes"):
ui.markdown(
f"""
Rule data retrieved from the [Federal Register API](https://www.federalregister.gov/developers/documentation/api/v1).
The window for the CRA lookback period is [estimated](https://www.huntonak.com/the-nickel-report/federal-agencies-face-looming-congressional-review-act-deadline) to open on May 23, 2024.
"""
)
ui.markdown(
FOOTER
)
# ----- REACTIVE CALCULATIONS ----- #
@reactive.calc
def filtered_df():
filt_df = DF
# filter dates
try:
filt_df = filt_df.loc[filt_df["publication_date"] >= input.start_date()]
except TypeError:
filt_df = filt_df.loc[filt_df["publication_date"] >= datetime.combine(input.start_date(), time(0, 0))]
# filter agencies
if (input.menu_agency() is not None) and ("all" not in input.menu_agency()):
bool_agency = [True if sum(selected in agency for selected in input.menu_agency()) > 0 else False for agency in filt_df["parent_slug"]]
filt_df = filt_df.loc[bool_agency]
return filt_df
@reactive.calc
def filtered_sig():
filt_df = filtered_df()
# filter significance
if filter_value.get() == "all":
pass
elif filter_value.get() == "3f1":
filt_df = filt_df.loc[filt_df["3f1_significant"] == 1]
elif filter_value.get() == "other":
filt_df = filt_df.loc[filt_df["other_significant"] == 1]
return filt_df
@reactive.calc
def grouped_df_month():
filt_df = filtered_sig()
grouped = groupby_date(filt_df, significant=GET_SIGNIFICANT)
return grouped
@reactive.calc
def grouped_df_day():
filt_df = filtered_sig()
date_col = "publication_date"
grouped = groupby_date(filt_df, group_col=date_col, significant=GET_SIGNIFICANT)
grouped = pad_missing_dates(
grouped,
date_col,
"days",
fill_padded_values={
"rules": 0,
"3f1_significant": 0,
"other_significant": 0,
})
return grouped
@reactive.calc
def grouped_df_week():
filt_df = filtered_sig()
filt_df = add_weeks_to_data(filt_df)
try:
grouped = groupby_date(filt_df, group_col=("week_number", "week_of"), significant=GET_SIGNIFICANT)
grouped = pad_missing_dates(
grouped,
"week_of",
how="weeks",
fill_padded_values={
"rules": 0,
"3f1_significant": 0,
"other_significant": 0,
})
except KeyError as err:
grouped = DataFrame(columns=["week_number", "week_of", "rules", "3f1_significant", "other_significant"])
return grouped
@reactive.calc
def grouped_df_agency():
filt_df = filtered_sig()
grouped = groupby_agency(filt_df, metadata=METADATA, significant=GET_SIGNIFICANT)
return grouped
@reactive.calc
def get_grouped_data_over_time():
if input.frequency() == "daily":
grouped = grouped_df_day()
elif input.frequency() == "monthly":
grouped = grouped_df_month()
elif input.frequency() == "weekly":
grouped = grouped_df_week()
else:
raise ValueError("Only 'daily', 'monthly', or 'weekly' are valid inputs.")
return grouped
# ----- REACTIVE VALUES ----- #
filter_value = reactive.value("all")
@reactive.effect
@reactive.event(input.filter_all)
def _():
filter_value.set("all")
filtered_df()
@reactive.effect
@reactive.event(input.filter_3f1)
def _():
filter_value.set("3f1")
filtered_df()
@reactive.effect
@reactive.event(input.filter_other)
def _():
filter_value.set("other")
filtered_df()