cra-window-rules / modules /get_rules_in_window.py
Mark Febrizio
Update get_rules_in_window.py
e36a0a8
raw
history blame
11 kB
from datetime import date
from pathlib import Path
from fr_toolbelt.api_requests import get_documents_by_date
from fr_toolbelt.preprocessing import process_documents, AgencyMetadata
from numpy import array
from pandas import DataFrame, to_datetime
from plotnine import (
ggplot,
aes,
geom_col,
labs,
coord_flip,
scale_x_discrete,
theme_light,
)
try:
from search_columns import search_columns, SearchError
from significant import get_significant_info
except ModuleNotFoundError:
from .search_columns import search_columns, SearchError
from .significant import get_significant_info
METADATA, _ = AgencyMetadata().get_agency_metadata()
START_DATE = "2024-03-01"
GET_SIGNIFICANT = True if date.fromisoformat(START_DATE) >= date(2023, 4, 6) else False
class DataAvailabilityError(Exception):
pass
def get_date_range(start_date: str):
start_year = date.fromisoformat(start_date).year
end_year = start_year + 1
date_range = {
"start": start_date,
"end": f"{end_year}-01-31",
"transition_year": end_year,
}
return date_range
def get_rules(date_range: dict) -> list[dict]:
results, _ = get_documents_by_date(
start_date=date_range.get("start"),
end_date=date_range.get("end"),
document_types=("RULE", )
)
return results
def format_documents(documents: list[dict]):
"""Format Federal Register documents to generate count by presidential year.
Args:
documents (list[dict]): List of documents.
Returns:
DataFrame: Pandas DataFrame with formatted data.
"""
# process agency info in documents
documents = process_documents(
documents,
which=("agencies", "presidents"),
return_values_as_str=False
)
# create dataframe
df = DataFrame(documents)
# convert publication date to datetime format
df.loc[:, "publication_dt"] = to_datetime(df["publication_date"])
df.loc[:, "publication_date"] = df.apply(lambda x: x["publication_dt"].date(), axis=1)
df.loc[:, "publication_year"] = df.apply(lambda x: x["publication_dt"].year, axis=1)
df.loc[:, "publication_month"] = df.apply(lambda x: x["publication_dt"].month, axis=1)
df.loc[:, "publication_day"] = df.apply(lambda x: x["publication_dt"].day, axis=1)
# return dataframe
return df
def filter_new_admin_rules(
df: DataFrame,
transition_year: int,
date_col: str = "publication_date",
):
admin_transitions = {
2001: "george-w-bush",
2009: "barack-obama",
2017: "donald-trump",
2021: "joe-biden",
}
bool_date = array(df[date_col] >= date(transition_year, 1, 20))
bool_prez = array(df["president_id"] == admin_transitions.get(transition_year))
bool_ = bool_date & bool_prez
return df.loc[~bool_]
def filter_corrections(df: DataFrame):
"""Filter out corrections from Federal Register documents.
Identifies corrections using `corrrection_of` field and regex searches of `document_number`, `title`, and `action` fields.
Args:
df (DataFrame): Federal Register data.
Returns:
tuple: DataFrame with corrections removed, DataFrame of corrections
"""
# get original column names
cols = df.columns.tolist()
# filter out corrections
# 1. Using correction fields
bool_na = array(df["correction_of"].isna())
# 2. Searching other fields
search_1 = search_columns(df, [r"^[crxz][\d]{1,2}-(?:[\w]{2,4}-)?[\d]+"], ["document_number"],
return_column="indicator1")
search_2 = search_columns(df, [r"(?:;\scorrection\b)|(?:\bcorrecting\samend[\w]+\b)"], ["title", "action"],
return_column="indicator2")
bool_search = array(search_1["indicator1"] == 1) | array(search_2["indicator2"] == 1)
# separate corrections from non-corrections
df_no_corrections = df.loc[(bool_na & ~bool_search), cols] # remove flagged documents
df_corrections = df.loc[(~bool_na | bool_search), cols]
# return filtered results
if len(df) == len(df_no_corrections) + len(df_corrections):
return df_no_corrections, df_corrections
else:
raise SearchError(f"{len(df)} != {len(df_no_corrections)} + {len(df_corrections)}")
def get_significant_rules(df, start_date):
process_columns = ("significant", "3f1_significant", )
if date.fromisoformat(start_date) < date(2023, 4, 6):
raise DataAvailabilityError("This program does not calculate significant rule counts prior to Executive Order 14094 of April 6, 2023.")
else:
document_numbers = df.loc[:, "document_number"].to_list()
df, last_updated = get_significant_info(df, start_date, document_numbers)
for col in process_columns:
bool_na = df[col].isna()
df.loc[bool_na, col] = "0"
df.loc[:, col] = df[col].replace(".", "0").astype("int64")
bool_3f1 = df["3f1_significant"] == 1
bool_sig = df["significant"] == 1
df.loc[:, "3f1_significant"] = 0
df.loc[bool_3f1, "3f1_significant"] = 1
df.loc[:, "other_significant"] = 0
df.loc[(bool_sig & ~bool_3f1), "other_significant"] = 1
return df, last_updated
def get_agency_metadata_values(
df: DataFrame,
agency_column: str,
metadata: dict,
metadata_value: str,
):
if metadata_value == "acronym":
metadata_value = "short_name"
return df.loc[:, agency_column].apply(
lambda x: metadata.get(x, {}).get(metadata_value)
)
def groupby_agency(
df: DataFrame,
group_col: str = "parent_slug",
value_col: str = "document_number",
aggfunc: str = "count",
significant: bool = True,
metadata: dict | None = None,
metadata_value: str = "acronym",
):
aggfunc_dict = {value_col: aggfunc, }
if significant:
aggfunc_dict.update({
"3f1_significant": "sum",
"other_significant": "sum",
})
df_ex = df.explode(group_col, ignore_index=True)
grouped = df_ex.groupby(
by=group_col
).agg(
aggfunc_dict
).reset_index()
grouped = grouped.sort_values(value_col, ascending=False).rename(
columns={
group_col: "agency",
value_col: "rules",
}, errors="ignore"
)
if metadata is not None:
grouped.loc[:, metadata_value] = get_agency_metadata_values(
grouped,
agency_column="agency",
metadata=metadata,
metadata_value=metadata_value
)
cols = ["agency", metadata_value, "rules", "3f1_significant", "other_significant"]
grouped = grouped.loc[:, [c for c in cols if c in grouped.columns]]
return grouped
def groupby_ym(
df: DataFrame,
group_col: tuple | list = ("publication_year", "publication_month", ),
value_col: str = "document_number",
aggfunc: str = "count",
significant: bool = True
):
aggfunc_dict = {value_col: aggfunc, }
if significant:
aggfunc_dict.update({
"3f1_significant": "sum",
"other_significant": "sum",
})
grouped = df.groupby(
by=list(group_col)
).agg(
aggfunc_dict
).reset_index()
grouped = grouped.rename(columns={
value_col: "rules",
}, errors="ignore")
return grouped
def save_csv(path: Path, df_all: DataFrame, df_agency: DataFrame, df_ym: DataFrame, transition_year: int):
files = (
f"rules_{transition_year - 1}_{transition_year}.csv",
f"rules_by_agency_{transition_year - 1}_{transition_year}.csv",
f"rules_by_month_{transition_year - 1}_{transition_year}.csv"
)
dataframes = (df_all, df_agency, df_ym)
for data, file in zip(dataframes, files):
data.to_csv(path / file, index=False)
def plot_agency(df, group_col = "acronym", value_col = "rules"):
order_list = df.loc[:, group_col].to_list()[::-1]
plot = (
ggplot(
df,
aes(x=group_col, y=value_col),
)
+ geom_col()
+ coord_flip()
+ scale_x_discrete(limits=order_list)
+ labs(y="", x="", title="Number of Rules Published by Agency")
+ theme_light()
)
return plot
def plot_month(df, group_cols = ("publication_year", "publication_month"), value_col = "rules"):
df.loc[:, "ym"] = df[group_cols[0]].astype(str) + "-" + df[group_cols[1]].astype(str).str.pad(2, fillchar="0")
order_list = df.loc[:, "ym"].to_list()
plot = (
ggplot(
df,
aes(x="ym", y=value_col),
)
+ geom_col()
+ scale_x_discrete(limits=order_list)
+ labs(y="", x="", title="Number of Rules Published by Month")
+ theme_light()
)
return plot
def get_rules_in_window(start_date: str, get_significant: bool = True):
date_range = get_date_range(start_date)
transition_year = date_range.get("transition_year")
results = get_rules(date_range)
df = format_documents(results)
df, _ = filter_corrections(df)
df = filter_new_admin_rules(df, transition_year)
if get_significant:
df, last_updated = get_significant_rules(df, start_date)
else:
last_updated = date.today()
return df, last_updated
def get_list_agencies(start_date, agency_column: str = "agency", metadata: dict | None = None, significant: bool = True):
df, _ = get_rules_in_window(start_date, get_significant=significant)
df_agency = groupby_agency(df, metadata=metadata, significant=significant)
print(df_agency.columns)
return sorted(list(set(df_agency.loc[df_agency[agency_column].notna(), agency_column].to_list())))
def main(start_date, save_data: bool = True, path: Path | None = None, metadata: dict | None = None, significant: bool = True):
if date.fromisoformat(start_date) < date(2023, 4, 6):
significant = False
date_range = get_date_range(start_date)
transition_year = date_range.get("transition_year")
df, _ = get_rules_in_window(start_date, get_significant=significant)
df_agency = groupby_agency(df, metadata=metadata, significant=significant)
df_ym = groupby_ym(df, significant=significant)
if save_data:
if path is None:
path = Path(__file__).parent
save_csv(path, df, df_agency, df_ym, transition_year)
return df, df_agency, df_ym
DF, LAST_UPDATED = get_rules_in_window(START_DATE, get_significant=GET_SIGNIFICANT)
AGENCIES = get_list_agencies(START_DATE, metadata=METADATA, significant=GET_SIGNIFICANT)
if __name__ == "__main__":
print(DF.columns)
print(LAST_UPDATED)