from __future__ import annotations from datetime import datetime, date, timedelta from dateutil.relativedelta import * from pandas import DataFrame, Timestamp, to_datetime from .utils import get_agency_metadata_values def _get_first_week_start(dates: list[date], week_start: int | str | "weekday" = MO): """Get the start date of the first week from a list of dates. Pass "week_start" to select a different start date for each week (defaults to Monday). """ if week_start in (MO, TU, WE, TH, FR, SA, SU): pass elif isinstance(week_start, str): weekdays = { "monday": MO, "tuesday": TU, "wednesday": WE, "thursday": TH, "friday": FR, "saturday": SA, "sunday": SU, } week_start = weekdays.get(week_start.lower(), MO) elif isinstance(week_start, int): weekdays = { 0: MO, 1: TU, 2: WE, 3: TH, 4: FR, 5: SA, 6: SU, } week_start = weekdays.get(week_start, MO) else: raise TypeError("Parameter 'week_start' must be type `str`, `int`, or a dateutil weekday instance.") first_day = next(d for d in dates) return first_day + relativedelta(weekday=week_start(-1)) def _get_week_start_dates(first_week_start: date | Timestamp, end_date: date | None = None): """Get the index and start date for each week. Args: first_week_start (date | Timestamp): Start date of the first week in the data. end_date (date | None, optional): End date for data. If None is passed (the default), the end date is `date.today()`. Returns: list[tuple]: List of tuples containing the week number and the start date. """ if end_date is None: end_date = date.today() try: week_start_dates = [first_week_start.date()] except AttributeError as err: week_start_dates = [first_week_start] while week_start_dates[-1] < end_date: next_start_date = week_start_dates[-1] + relativedelta(weeks=1) week_start_dates.append(next_start_date) week_start_dates = [day for day in week_start_dates if day <= end_date] week_start_dates = [d.date() if isinstance(d, (Timestamp, datetime)) else d for d in week_start_dates] return [(idx, w) for idx, w in enumerate(week_start_dates)] def _get_weeks(dates: list[date], end_date: date | None = None, **kwargs) -> list[tuple]: """Takes a list, array, or other iterable of `datetime.date` values and returns a list of tuples containing (week_number, week_start_date) pairs. Pass keyword arg "week_start" - ranging from 0 (Monday) to 6 (Sunday) - to choose a different start date than Monday for the week. """ # get the start date for the first week first_week_start = _get_first_week_start(dates, **kwargs) # get start date for each week in the input values weeks = _get_week_start_dates(first_week_start, end_date=end_date) # iterate over inputs, append tuple of week number and start date for each week results = [] for d in dates: if isinstance(d, Timestamp): d = d.date() week_gen = ((idx, start_date) for idx, start_date in weeks if (start_date <= d < (start_date + timedelta(weeks=1)))) results.append(next(week_gen, (0, first_week_start))) return results def add_week_info_to_data(df: DataFrame, date_column: str = "publication_date", new_columns: tuple[str] = ("week_number", "week_of")): """Add week number and week start date to input data. Args: df (DataFrame): Input data. date_column (str, optional): Name of column containing publication dates. Defaults to "publication_date". new_columns (tuple[str], optional): New column names. Defaults to ("week_number", "week_start"). Returns: DataFrame: Data containing week information. """ df_c = df.copy() data = df_c[date_column].to_list() if len(data) > 0: week_numbers, week_starts = list(zip(*_get_weeks(data))) df_c.loc[:, new_columns[0]] = week_numbers df_c.loc[:, new_columns[1]] = to_datetime(week_starts) return df_c def _pad_missing_weeks(timeframe_list: list[date], **kwargs): """Pad dataframe with weeks missing from retrieved data (i.e., weeks without qualifying rule data). """ # get the start date for the first week first_week_start = _get_first_week_start(timeframe_list) # get start date for each week in the input values return _get_week_start_dates(first_week_start, **kwargs) def _pad_missing_days(timeframe_list: list[date], end_date: date | None = None): """Pad dataframe with days missing from retrieved data (i.e., days without qualifying rule data). """ start_date = min(timeframe_list) if end_date is None: end_date = date.today() # create list of weekdays from start to end dates # remember that range() objects are exclusive of the stop return [ start_date + relativedelta(days=n) for n in range((end_date - start_date).days + 1) if (start_date + relativedelta(days=n)).weekday() in range(0, 5) ] def pad_missing_dates(df: DataFrame, pad_column: str, how: str, fill_padded_values: dict | None = None, **kwargs): """Add missing dates (either weeks or days) to the dataset. Args: df (DataFrame): Input data. pad_column (str): Date column to pad. how (str): Whether to pad by "days" or "weeks". fill_padded_values (dict | None, optional): Dictionary of columns and values to fill for padded observations (e.g., {"column": 0}). Defaults to None. Raises: ValueError: Must pass 'days' or 'weeks' to parameter 'how'. Returns: DataFrame: Padded data. """ df_copy = df.copy() timeframe_list = [d.date() if isinstance(d, (Timestamp, datetime)) else d for d in df_copy[pad_column].to_list()] df_copy = df_copy.astype({pad_column: "object"}) df_copy.loc[:, pad_column] = timeframe_list # pad dates if dataframe isn't empty if len(timeframe_list) > 0: # choose which time frequency needs padding if how == "days": week_numbers = None padded_timeframes = _pad_missing_days(timeframe_list, **kwargs) elif how == "weeks": week_numbers, padded_timeframes = zip(*_pad_missing_weeks(timeframe_list, **kwargs)) else: raise ValueError("Must pass 'days' or 'weeks' to parameter 'how'.") # incorporate extended dates into dataframe df_merge = DataFrame({pad_column: padded_timeframes}) pad_cols = [pad_column] if week_numbers is not None: df_merge.loc[:, "week_number"] = week_numbers pad_cols.append("week_number") df_copy = df_copy.merge(df_merge, on=pad_cols, how="outer", indicator=True) if fill_padded_values is not None: for col, val in fill_padded_values.items(): bool_ = df_copy["_merge"] == "right_only" df_copy.loc[bool_, col] = val return df_copy.drop(columns=["_merge"], errors="ignore") def groupby_agency( df: DataFrame, group_col: str = "parent_slug", value_col: str = "document_number", aggfunc: str = "count", significant: bool = True, metadata: dict | None = None, metadata_value: str = "acronym", ): """Group data by agencies and aggregate the values. Args: df (DataFrame): Input data. group_col (str, optional): Column to group by. Defaults to "parent_slug". value_col (str, optional): Column for values for grouping and aggregation. Defaults to "document_number". aggfunc (str, optional): Aggregation function. Defaults to "count". significant (bool, optional): Whether to include significance data in values. Defaults to True. metadata (dict | None, optional): Agency metadata. Defaults to None. metadata_value (str, optional): Metadata value to add to output data. Defaults to "acronym". Returns: DataFrame: Grouped and aggregated data. """ aggfunc_dict = {value_col: aggfunc, } if significant: aggfunc_dict.update({ "3f1_significant": "sum", "other_significant": "sum", }) df_ex = df.explode(group_col, ignore_index=True) grouped = df_ex.groupby( by=group_col ).agg( aggfunc_dict ).reset_index() grouped = grouped.sort_values(value_col, ascending=False).rename( columns={ group_col: "agency", value_col: "rules", }, errors="ignore" ) if metadata is not None: grouped.loc[:, metadata_value] = get_agency_metadata_values( grouped, agency_column="agency", metadata=metadata, metadata_value=metadata_value ) cols = ["agency", metadata_value, "rules", "3f1_significant", "other_significant"] grouped = grouped.loc[:, [c for c in cols if c in grouped.columns]] return grouped def groupby_date( df: DataFrame, group_col: str | tuple | list = ("publication_year", "publication_month", ), value_col: str = "document_number", aggfunc: str = "count", significant: bool = True ): """Group data by a given date frequency and aggregate the values. Args: df (DataFrame): Input data. group_col (str | tuple | list, optional): Columns to group by. Defaults to ("publication_year", "publication_month", ). value_col (str, optional): Column for values for grouping and aggregation. Defaults to "document_number". aggfunc (str, optional): Aggregation function. Defaults to "count". significant (bool, optional): Whether to include significance data in values. Defaults to True. Raises: TypeError: Parameter 'group_col' must be type `str`, `list`, or `tuple`. Returns: DataFrame: Grouped and aggregated data. """ if isinstance(group_col, str): group_col = [group_col] elif isinstance(group_col, (list, tuple)): group_col = list(group_col) else: raise TypeError("Parameter 'group_col' must be type `str`, `list`, or `tuple`.") aggfunc_dict = {value_col: aggfunc, } if significant: aggfunc_dict.update({ "3f1_significant": "sum", "other_significant": "sum", }) grouped = df.groupby( by=group_col ).agg( aggfunc_dict ).reset_index() grouped = grouped.rename(columns={ value_col: "rules", }, errors="ignore") return grouped if __name__ == "__main__": from datetime import date, timedelta from pandas import to_datetime TODAY = date.today() WEEKS_AGO = TODAY - timedelta(weeks=10) dates = [(WEEKS_AGO - timedelta(days=r)) for r in range(21) if (r % 3 != 0)][::-1] + [(TODAY - timedelta(days=r)) for r in range(21)][::-1] df = DataFrame({"dates": dates, "values": [idx for idx, _ in enumerate(dates)]}) df_a = pad_missing_dates(df, "dates", "days", fill_padded_values={"values": 0}) print(df_a.head(10)) df = add_week_info_to_data(df, date_column="dates") print(df.head(10)) grouped = groupby_date(df, group_col=("week_number", "week_of"), value_col="values", significant=False) print(grouped) df_b = pad_missing_dates(grouped, "week_of", how="weeks", fill_padded_values={"rules": 0}) print(df_b)