from datetime import date from pathlib import Path from fr_toolbelt.api_requests import get_documents_by_date from fr_toolbelt.preprocessing import process_documents, AgencyMetadata from numpy import array from pandas import DataFrame, to_datetime from plotnine import ( ggplot, aes, geom_col, labs, coord_flip, scale_x_discrete, theme_light, ) try: from search_columns import search_columns, SearchError from significant import get_significant_info except ModuleNotFoundError: from .search_columns import search_columns, SearchError from .significant import get_significant_info METADATA, _ = AgencyMetadata().get_agency_metadata() START_DATE = "2024-03-01" GET_SIGNIFICANT = True if date.fromisoformat(START_DATE) >= date(2023, 4, 6) else False class DataAvailabilityError(Exception): pass def get_date_range(start_date: str): start_year = date.fromisoformat(start_date).year end_year = start_year + 1 date_range = { "start": start_date, "end": f"{end_year}-01-31", "transition_year": end_year, } return date_range def get_rules(date_range: dict) -> list[dict]: results, _ = get_documents_by_date( start_date=date_range.get("start"), end_date=date_range.get("end"), document_types=("RULE", ) ) return results def format_documents(documents: list[dict]): """Format Federal Register documents to generate count by presidential year. Args: documents (list[dict]): List of documents. Returns: DataFrame: Pandas DataFrame with formatted data. """ # process agency info in documents documents = process_documents( documents, which=("agencies", "presidents"), return_values_as_str=False ) # create dataframe df = DataFrame(documents) # convert publication date to datetime format df.loc[:, "publication_dt"] = to_datetime(df["publication_date"]) df.loc[:, "publication_date"] = df.apply(lambda x: x["publication_dt"].date(), axis=1) df.loc[:, "publication_year"] = df.apply(lambda x: x["publication_dt"].year, axis=1) df.loc[:, "publication_month"] = df.apply(lambda x: x["publication_dt"].month, axis=1) df.loc[:, "publication_day"] = df.apply(lambda x: x["publication_dt"].day, axis=1) # return dataframe return df def filter_new_admin_rules( df: DataFrame, transition_year: int, date_col: str = "publication_date", ): admin_transitions = { 2001: "george-w-bush", 2009: "barack-obama", 2017: "donald-trump", 2021: "joe-biden", } bool_date = array(df[date_col] >= date(transition_year, 1, 20)) bool_prez = array(df["president_id"] == admin_transitions.get(transition_year)) bool_ = bool_date & bool_prez return df.loc[~bool_] def filter_corrections(df: DataFrame): """Filter out corrections from Federal Register documents. Identifies corrections using `corrrection_of` field and regex searches of `document_number`, `title`, and `action` fields. Args: df (DataFrame): Federal Register data. Returns: tuple: DataFrame with corrections removed, DataFrame of corrections """ # get original column names cols = df.columns.tolist() # filter out corrections # 1. Using correction fields bool_na = array(df["correction_of"].isna()) # 2. Searching other fields search_1 = search_columns(df, [r"^[crxz][\d]{1,2}-(?:[\w]{2,4}-)?[\d]+"], ["document_number"], return_column="indicator1") search_2 = search_columns(df, [r"(?:;\scorrection\b)|(?:\bcorrecting\samend[\w]+\b)"], ["title", "action"], return_column="indicator2") bool_search = array(search_1["indicator1"] == 1) | array(search_2["indicator2"] == 1) # separate corrections from non-corrections df_no_corrections = df.loc[(bool_na & ~bool_search), cols] # remove flagged documents df_corrections = df.loc[(~bool_na | bool_search), cols] # return filtered results if len(df) == len(df_no_corrections) + len(df_corrections): return df_no_corrections, df_corrections else: raise SearchError(f"{len(df)} != {len(df_no_corrections)} + {len(df_corrections)}") def get_significant_rules(df, start_date): process_columns = ("significant", "3f1_significant", ) if date.fromisoformat(start_date) < date(2023, 4, 6): raise DataAvailabilityError("This program does not calculate significant rule counts prior to Executive Order 14094 of April 6, 2023.") else: document_numbers = df.loc[:, "document_number"].to_list() df, last_updated = get_significant_info(df, start_date, document_numbers) for col in process_columns: bool_na = df[col].isna() df.loc[bool_na, col] = "0" df.loc[:, col] = df[col].replace(".", "0").astype("int64") bool_3f1 = df["3f1_significant"] == 1 bool_sig = df["significant"] == 1 df.loc[:, "3f1_significant"] = 0 df.loc[bool_3f1, "3f1_significant"] = 1 df.loc[:, "other_significant"] = 0 df.loc[(bool_sig & ~bool_3f1), "other_significant"] = 1 return df, last_updated def get_agency_metadata_values( df: DataFrame, agency_column: str, metadata: dict, metadata_value: str, ): if metadata_value == "acronym": metadata_value = "short_name" return df.loc[:, agency_column].apply( lambda x: metadata.get(x, {}).get(metadata_value) ) def groupby_agency( df: DataFrame, group_col: str = "parent_slug", value_col: str = "document_number", aggfunc: str = "count", significant: bool = True, metadata: dict | None = None, metadata_value: str = "acronym", ): aggfunc_dict = {value_col: aggfunc, } if significant: aggfunc_dict.update({ "3f1_significant": "sum", "other_significant": "sum", }) df_ex = df.explode(group_col, ignore_index=True) grouped = df_ex.groupby( by=group_col ).agg( aggfunc_dict ).reset_index() grouped = grouped.sort_values(value_col, ascending=False).rename( columns={ group_col: "agency", value_col: "rules", }, errors="ignore" ) if metadata is not None: grouped.loc[:, metadata_value] = get_agency_metadata_values( grouped, agency_column="agency", metadata=metadata, metadata_value=metadata_value ) cols = ["agency", metadata_value, "rules", "3f1_significant", "other_significant"] grouped = grouped.loc[:, [c for c in cols if c in grouped.columns]] return grouped def groupby_ym( df: DataFrame, group_col: tuple | list = ("publication_year", "publication_month", ), value_col: str = "document_number", aggfunc: str = "count", significant: bool = True ): aggfunc_dict = {value_col: aggfunc, } if significant: aggfunc_dict.update({ "3f1_significant": "sum", "other_significant": "sum", }) grouped = df.groupby( by=list(group_col) ).agg( aggfunc_dict ).reset_index() grouped = grouped.rename(columns={ value_col: "rules", }, errors="ignore") return grouped def save_csv(path: Path, df_all: DataFrame, df_agency: DataFrame, df_ym: DataFrame, transition_year: int): files = ( f"rules_{transition_year - 1}_{transition_year}.csv", f"rules_by_agency_{transition_year - 1}_{transition_year}.csv", f"rules_by_month_{transition_year - 1}_{transition_year}.csv" ) dataframes = (df_all, df_agency, df_ym) for data, file in zip(dataframes, files): data.to_csv(path / file, index=False) def plot_agency(df, group_col = "acronym", value_col = "rules"): order_list = df.loc[:, group_col].to_list()[::-1] plot = ( ggplot( df, aes(x=group_col, y=value_col), ) + geom_col() + coord_flip() + scale_x_discrete(limits=order_list) + labs(y="", x="", title="Number of Rules Published by Agency") + theme_light() ) return plot def plot_month(df, group_cols = ("publication_year", "publication_month"), value_col = "rules"): df.loc[:, "ym"] = df[group_cols[0]].astype(str) + "-" + df[group_cols[1]].astype(str).str.pad(2, fillchar="0") order_list = df.loc[:, "ym"].to_list() plot = ( ggplot( df, aes(x="ym", y=value_col), ) + geom_col() + scale_x_discrete(limits=order_list) + labs(y="", x="", title="Number of Rules Published by Month") + theme_light() ) return plot def get_rules_in_window(start_date: str, get_significant: bool = True): date_range = get_date_range(start_date) transition_year = date_range.get("transition_year") results = get_rules(date_range) df = format_documents(results) df, _ = filter_corrections(df) df = filter_new_admin_rules(df, transition_year) if get_significant: df, last_updated = get_significant_rules(df, start_date) else: last_updated = date.today() return df, last_updated def get_list_agencies(start_date, agency_column: str = "agency", metadata: dict | None = None, significant: bool = True): df, _ = get_rules_in_window(start_date, get_significant=significant) df_agency = groupby_agency(df, metadata=metadata, significant=significant) print(df_agency.columns) return sorted(list(set(df_agency.loc[df_agency[agency_column].notna(), agency_column].to_list()))) def main(start_date, save_data: bool = True, path: Path | None = None, metadata: dict | None = None, significant: bool = True): if date.fromisoformat(start_date) < date(2023, 4, 6): significant = False date_range = get_date_range(start_date) transition_year = date_range.get("transition_year") df, _ = get_rules_in_window(start_date, get_significant=significant) df_agency = groupby_agency(df, metadata=metadata, significant=significant) df_ym = groupby_ym(df, significant=significant) if save_data: if path is None: path = Path(__file__).parent save_csv(path, df, df_agency, df_ym, transition_year) return df, df_agency, df_ym DF, LAST_UPDATED = get_rules_in_window(START_DATE, get_significant=GET_SIGNIFICANT) AGENCIES = get_list_agencies(START_DATE, metadata=METADATA, significant=GET_SIGNIFICANT) if __name__ == "__main__": print(DF.columns) print(LAST_UPDATED)