|
from datetime import date |
|
from pathlib import Path |
|
|
|
from fr_toolbelt.api_requests import get_documents_by_date |
|
from fr_toolbelt.preprocessing import process_documents, AgencyMetadata |
|
from numpy import array |
|
from pandas import DataFrame, to_datetime |
|
from plotnine import ( |
|
ggplot, |
|
aes, |
|
geom_col, |
|
labs, |
|
coord_flip, |
|
scale_x_discrete, |
|
theme_light, |
|
) |
|
|
|
try: |
|
from search_columns import search_columns, SearchError |
|
from significant import get_significant_info |
|
except ModuleNotFoundError: |
|
from .search_columns import search_columns, SearchError |
|
from .significant import get_significant_info |
|
|
|
|
|
METADATA, _ = AgencyMetadata().get_agency_metadata() |
|
START_DATE = "2024-03-01" |
|
GET_SIGNIFICANT = True if date.fromisoformat(START_DATE) >= date(2023, 4, 6) else False |
|
|
|
|
|
class DataAvailabilityError(Exception): |
|
pass |
|
|
|
|
|
def get_date_range(start_date: str): |
|
start_year = date.fromisoformat(start_date).year |
|
end_year = start_year + 1 |
|
date_range = { |
|
"start": start_date, |
|
"end": f"{end_year}-01-31", |
|
"transition_year": end_year, |
|
} |
|
return date_range |
|
|
|
|
|
def get_rules(date_range: dict) -> list[dict]: |
|
results, _ = get_documents_by_date( |
|
start_date=date_range.get("start"), |
|
end_date=date_range.get("end"), |
|
document_types=("RULE", ) |
|
) |
|
return results |
|
|
|
|
|
def format_documents(documents: list[dict]): |
|
"""Format Federal Register documents to generate count by presidential year. |
|
|
|
Args: |
|
documents (list[dict]): List of documents. |
|
|
|
Returns: |
|
DataFrame: Pandas DataFrame with formatted data. |
|
""" |
|
|
|
documents = process_documents( |
|
documents, |
|
which=("agencies", "presidents"), |
|
return_values_as_str=False |
|
) |
|
|
|
|
|
df = DataFrame(documents) |
|
|
|
|
|
df.loc[:, "publication_dt"] = to_datetime(df["publication_date"]) |
|
df.loc[:, "publication_date"] = df.apply(lambda x: x["publication_dt"].date(), axis=1) |
|
df.loc[:, "publication_year"] = df.apply(lambda x: x["publication_dt"].year, axis=1) |
|
df.loc[:, "publication_month"] = df.apply(lambda x: x["publication_dt"].month, axis=1) |
|
df.loc[:, "publication_day"] = df.apply(lambda x: x["publication_dt"].day, axis=1) |
|
|
|
|
|
return df |
|
|
|
|
|
def filter_new_admin_rules( |
|
df: DataFrame, |
|
transition_year: int, |
|
date_col: str = "publication_date", |
|
): |
|
|
|
admin_transitions = { |
|
2001: "george-w-bush", |
|
2009: "barack-obama", |
|
2017: "donald-trump", |
|
2021: "joe-biden", |
|
} |
|
|
|
bool_date = array(df[date_col] >= date(transition_year, 1, 20)) |
|
bool_prez = array(df["president_id"] == admin_transitions.get(transition_year)) |
|
bool_ = bool_date & bool_prez |
|
return df.loc[~bool_] |
|
|
|
|
|
def filter_corrections(df: DataFrame): |
|
"""Filter out corrections from Federal Register documents. |
|
Identifies corrections using `corrrection_of` field and regex searches of `document_number`, `title`, and `action` fields. |
|
|
|
Args: |
|
df (DataFrame): Federal Register data. |
|
|
|
Returns: |
|
tuple: DataFrame with corrections removed, DataFrame of corrections |
|
""" |
|
|
|
cols = df.columns.tolist() |
|
|
|
|
|
|
|
bool_na = array(df["correction_of"].isna()) |
|
|
|
|
|
search_1 = search_columns(df, [r"^[crxz][\d]{1,2}-(?:[\w]{2,4}-)?[\d]+"], ["document_number"], |
|
return_column="indicator1") |
|
search_2 = search_columns(df, [r"(?:;\scorrection\b)|(?:\bcorrecting\samend[\w]+\b)"], ["title", "action"], |
|
return_column="indicator2") |
|
bool_search = array(search_1["indicator1"] == 1) | array(search_2["indicator2"] == 1) |
|
|
|
|
|
df_no_corrections = df.loc[(bool_na & ~bool_search), cols] |
|
df_corrections = df.loc[(~bool_na | bool_search), cols] |
|
|
|
|
|
if len(df) == len(df_no_corrections) + len(df_corrections): |
|
return df_no_corrections, df_corrections |
|
else: |
|
raise SearchError(f"{len(df)} != {len(df_no_corrections)} + {len(df_corrections)}") |
|
|
|
|
|
def get_significant_rules(df, start_date): |
|
process_columns = ("significant", "3f1_significant", ) |
|
if date.fromisoformat(start_date) < date(2023, 4, 6): |
|
raise DataAvailabilityError("This program does not calculate significant rule counts prior to Executive Order 14094 of April 6, 2023.") |
|
else: |
|
document_numbers = df.loc[:, "document_number"].to_list() |
|
df, last_updated = get_significant_info(df, start_date, document_numbers) |
|
for col in process_columns: |
|
bool_na = df[col].isna() |
|
df.loc[bool_na, col] = "0" |
|
df.loc[:, col] = df[col].replace(".", "0").astype("int64") |
|
bool_3f1 = df["3f1_significant"] == 1 |
|
bool_sig = df["significant"] == 1 |
|
df.loc[:, "3f1_significant"] = 0 |
|
df.loc[bool_3f1, "3f1_significant"] = 1 |
|
df.loc[:, "other_significant"] = 0 |
|
df.loc[(bool_sig & ~bool_3f1), "other_significant"] = 1 |
|
return df, last_updated |
|
|
|
|
|
def get_agency_metadata_values( |
|
df: DataFrame, |
|
agency_column: str, |
|
metadata: dict, |
|
metadata_value: str, |
|
): |
|
if metadata_value == "acronym": |
|
metadata_value = "short_name" |
|
return df.loc[:, agency_column].apply( |
|
lambda x: metadata.get(x, {}).get(metadata_value) |
|
) |
|
|
|
|
|
def groupby_agency( |
|
df: DataFrame, |
|
group_col: str = "parent_slug", |
|
value_col: str = "document_number", |
|
aggfunc: str = "count", |
|
significant: bool = True, |
|
metadata: dict | None = None, |
|
metadata_value: str = "acronym", |
|
): |
|
aggfunc_dict = {value_col: aggfunc, } |
|
if significant: |
|
aggfunc_dict.update({ |
|
"3f1_significant": "sum", |
|
"other_significant": "sum", |
|
}) |
|
df_ex = df.explode(group_col, ignore_index=True) |
|
grouped = df_ex.groupby( |
|
by=group_col |
|
).agg( |
|
aggfunc_dict |
|
).reset_index() |
|
grouped = grouped.sort_values(value_col, ascending=False).rename( |
|
columns={ |
|
group_col: "agency", |
|
value_col: "rules", |
|
}, errors="ignore" |
|
) |
|
if metadata is not None: |
|
grouped.loc[:, metadata_value] = get_agency_metadata_values( |
|
grouped, |
|
agency_column="agency", |
|
metadata=metadata, |
|
metadata_value=metadata_value |
|
) |
|
cols = ["agency", metadata_value, "rules", "3f1_significant", "other_significant"] |
|
grouped = grouped.loc[:, [c for c in cols if c in grouped.columns]] |
|
return grouped |
|
|
|
|
|
def groupby_ym( |
|
df: DataFrame, |
|
group_col: tuple | list = ("publication_year", "publication_month", ), |
|
value_col: str = "document_number", |
|
aggfunc: str = "count", |
|
significant: bool = True |
|
): |
|
aggfunc_dict = {value_col: aggfunc, } |
|
if significant: |
|
aggfunc_dict.update({ |
|
"3f1_significant": "sum", |
|
"other_significant": "sum", |
|
}) |
|
grouped = df.groupby( |
|
by=list(group_col) |
|
).agg( |
|
aggfunc_dict |
|
).reset_index() |
|
grouped = grouped.rename(columns={ |
|
value_col: "rules", |
|
}, errors="ignore") |
|
return grouped |
|
|
|
|
|
def save_csv(path: Path, df_all: DataFrame, df_agency: DataFrame, df_ym: DataFrame, transition_year: int): |
|
files = ( |
|
f"rules_{transition_year - 1}_{transition_year}.csv", |
|
f"rules_by_agency_{transition_year - 1}_{transition_year}.csv", |
|
f"rules_by_month_{transition_year - 1}_{transition_year}.csv" |
|
) |
|
dataframes = (df_all, df_agency, df_ym) |
|
for data, file in zip(dataframes, files): |
|
data.to_csv(path / file, index=False) |
|
|
|
|
|
def plot_agency(df, group_col = "acronym", value_col = "rules"): |
|
|
|
order_list = df.loc[:, group_col].to_list()[::-1] |
|
|
|
plot = ( |
|
ggplot( |
|
df, |
|
aes(x=group_col, y=value_col), |
|
) |
|
+ geom_col() |
|
+ coord_flip() |
|
+ scale_x_discrete(limits=order_list) |
|
+ labs(y="", x="", title="Number of Rules Published by Agency") |
|
+ theme_light() |
|
) |
|
return plot |
|
|
|
|
|
def plot_month(df, group_cols = ("publication_year", "publication_month"), value_col = "rules"): |
|
|
|
df.loc[:, "ym"] = df[group_cols[0]].astype(str) + "-" + df[group_cols[1]].astype(str).str.pad(2, fillchar="0") |
|
order_list = df.loc[:, "ym"].to_list() |
|
|
|
plot = ( |
|
ggplot( |
|
df, |
|
aes(x="ym", y=value_col), |
|
) |
|
+ geom_col() |
|
+ scale_x_discrete(limits=order_list) |
|
+ labs(y="", x="", title="Number of Rules Published by Month") |
|
+ theme_light() |
|
) |
|
return plot |
|
|
|
|
|
def get_rules_in_window(start_date: str, get_significant: bool = True): |
|
date_range = get_date_range(start_date) |
|
transition_year = date_range.get("transition_year") |
|
results = get_rules(date_range) |
|
df = format_documents(results) |
|
df, _ = filter_corrections(df) |
|
df = filter_new_admin_rules(df, transition_year) |
|
if get_significant: |
|
df, last_updated = get_significant_rules(df, start_date) |
|
else: |
|
last_updated = date.today() |
|
return df, last_updated |
|
|
|
|
|
def get_list_agencies(start_date, agency_column: str = "agency", metadata: dict | None = None, significant: bool = True): |
|
|
|
df, _ = get_rules_in_window(start_date, get_significant=significant) |
|
df_agency = groupby_agency(df, metadata=metadata, significant=significant) |
|
print(df_agency.columns) |
|
|
|
return sorted(list(set(df_agency.loc[df_agency[agency_column].notna(), agency_column].to_list()))) |
|
|
|
|
|
def main(start_date, save_data: bool = True, path: Path | None = None, metadata: dict | None = None, significant: bool = True): |
|
if date.fromisoformat(start_date) < date(2023, 4, 6): |
|
significant = False |
|
date_range = get_date_range(start_date) |
|
transition_year = date_range.get("transition_year") |
|
df, _ = get_rules_in_window(start_date, get_significant=significant) |
|
|
|
df_agency = groupby_agency(df, metadata=metadata, significant=significant) |
|
df_ym = groupby_ym(df, significant=significant) |
|
|
|
if save_data: |
|
if path is None: |
|
path = Path(__file__).parent |
|
save_csv(path, df, df_agency, df_ym, transition_year) |
|
|
|
return df, df_agency, df_ym |
|
|
|
|
|
DF, LAST_UPDATED = get_rules_in_window(START_DATE, get_significant=GET_SIGNIFICANT) |
|
AGENCIES = get_list_agencies(START_DATE, metadata=METADATA, significant=GET_SIGNIFICANT) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
print(DF.columns) |
|
print(LAST_UPDATED) |
|
|