cra-window-rules / modules /grouping.py
Mark Febrizio
Documentation (#24)
fe4f734 unverified
raw
history blame
11.7 kB
from __future__ import annotations
from datetime import datetime, date, timedelta
from dateutil.relativedelta import *
from pandas import DataFrame, Timestamp, to_datetime
from .utils import get_agency_metadata_values
def _get_first_week_start(dates: list[date], week_start: int | str | "weekday" = MO):
"""Get the start date of the first week from a list of dates.
Pass "week_start" to select a different start date for each week (defaults to Monday).
"""
if week_start in (MO, TU, WE, TH, FR, SA, SU):
pass
elif isinstance(week_start, str):
weekdays = {
"monday": MO,
"tuesday": TU,
"wednesday": WE,
"thursday": TH,
"friday": FR,
"saturday": SA,
"sunday": SU,
}
week_start = weekdays.get(week_start.lower(), MO)
elif isinstance(week_start, int):
weekdays = {
0: MO,
1: TU,
2: WE,
3: TH,
4: FR,
5: SA,
6: SU,
}
week_start = weekdays.get(week_start, MO)
else:
raise TypeError("Parameter 'week_start' must be type `str`, `int`, or a dateutil weekday instance.")
first_day = next(d for d in dates)
return first_day + relativedelta(weekday=week_start(-1))
def _get_week_start_dates(first_week_start: date | Timestamp, end_date: date | None = None):
"""Get the index and start date for each week.
Args:
first_week_start (date | Timestamp): Start date of the first week in the data.
end_date (date | None, optional): End date for data. If None is passed (the default), the end date is `date.today()`.
Returns:
list[tuple]: List of tuples containing the week number and the start date.
"""
if end_date is None:
end_date = date.today()
try:
week_start_dates = [first_week_start.date()]
except AttributeError as err:
week_start_dates = [first_week_start]
while week_start_dates[-1] < end_date:
next_start_date = week_start_dates[-1] + relativedelta(weeks=1)
week_start_dates.append(next_start_date)
week_start_dates = [day for day in week_start_dates if day <= end_date]
week_start_dates = [d.date() if isinstance(d, (Timestamp, datetime)) else d for d in week_start_dates]
return [(idx, w) for idx, w in enumerate(week_start_dates)]
def _get_weeks(dates: list[date], end_date: date | None = None, **kwargs) -> list[tuple]:
"""Takes a list, array, or other iterable of `datetime.date` values and returns a list of tuples containing (week_number, week_start_date) pairs.
Pass keyword arg "week_start" - ranging from 0 (Monday) to 6 (Sunday) - to choose a different start date than Monday for the week.
"""
# get the start date for the first week
first_week_start = _get_first_week_start(dates, **kwargs)
# get start date for each week in the input values
weeks = _get_week_start_dates(first_week_start, end_date=end_date)
# iterate over inputs, append tuple of week number and start date for each week
results = []
for d in dates:
if isinstance(d, Timestamp):
d = d.date()
week_gen = ((idx, start_date) for idx, start_date in weeks if (start_date <= d < (start_date + timedelta(weeks=1))))
results.append(next(week_gen, (0, first_week_start)))
return results
def add_week_info_to_data(df: DataFrame, date_column: str = "publication_date", new_columns: tuple[str] = ("week_number", "week_of")):
"""Add week number and week start date to input data.
Args:
df (DataFrame): Input data.
date_column (str, optional): Name of column containing publication dates. Defaults to "publication_date".
new_columns (tuple[str], optional): New column names. Defaults to ("week_number", "week_start").
Returns:
DataFrame: Data containing week information.
"""
df_c = df.copy()
data = df_c[date_column].to_list()
if len(data) > 0:
week_numbers, week_starts = list(zip(*_get_weeks(data)))
df_c.loc[:, new_columns[0]] = week_numbers
df_c.loc[:, new_columns[1]] = to_datetime(week_starts)
return df_c
def _pad_missing_weeks(timeframe_list: list[date], **kwargs):
"""Pad dataframe with weeks missing from retrieved data (i.e., weeks without qualifying rule data).
"""
# get the start date for the first week
first_week_start = _get_first_week_start(timeframe_list)
# get start date for each week in the input values
return _get_week_start_dates(first_week_start, **kwargs)
def _pad_missing_days(timeframe_list: list[date], end_date: date | None = None):
"""Pad dataframe with days missing from retrieved data (i.e., days without qualifying rule data).
"""
start_date = min(timeframe_list)
if end_date is None:
end_date = date.today()
# create list of weekdays from start to end dates
# remember that range() objects are exclusive of the stop
return [
start_date + relativedelta(days=n)
for n in range((end_date - start_date).days + 1)
if (start_date + relativedelta(days=n)).weekday() in range(0, 5)
]
def pad_missing_dates(df: DataFrame, pad_column: str, how: str, fill_padded_values: dict | None = None, **kwargs):
"""Add missing dates (either weeks or days) to the dataset.
Args:
df (DataFrame): Input data.
pad_column (str): Date column to pad.
how (str): Whether to pad by "days" or "weeks".
fill_padded_values (dict | None, optional): Dictionary of columns and values to fill for padded observations (e.g., {"column": 0}). Defaults to None.
Raises:
ValueError: Must pass 'days' or 'weeks' to parameter 'how'.
Returns:
DataFrame: Padded data.
"""
df_copy = df.copy()
timeframe_list = [d.date() if isinstance(d, (Timestamp, datetime)) else d for d in df_copy[pad_column].to_list()]
df_copy = df_copy.astype({pad_column: "object"})
df_copy.loc[:, pad_column] = timeframe_list
# pad dates if dataframe isn't empty
if len(timeframe_list) > 0:
# choose which time frequency needs padding
if how == "days":
week_numbers = None
padded_timeframes = _pad_missing_days(timeframe_list, **kwargs)
elif how == "weeks":
week_numbers, padded_timeframes = zip(*_pad_missing_weeks(timeframe_list, **kwargs))
else:
raise ValueError("Must pass 'days' or 'weeks' to parameter 'how'.")
# incorporate extended dates into dataframe
df_merge = DataFrame({pad_column: padded_timeframes})
pad_cols = [pad_column]
if week_numbers is not None:
df_merge.loc[:, "week_number"] = week_numbers
pad_cols.append("week_number")
df_copy = df_copy.merge(df_merge, on=pad_cols, how="outer", indicator=True)
if fill_padded_values is not None:
for col, val in fill_padded_values.items():
bool_ = df_copy["_merge"] == "right_only"
df_copy.loc[bool_, col] = val
return df_copy.drop(columns=["_merge"], errors="ignore")
def groupby_agency(
df: DataFrame,
group_col: str = "parent_slug",
value_col: str = "document_number",
aggfunc: str = "count",
significant: bool = True,
metadata: dict | None = None,
metadata_value: str = "acronym",
):
"""Group data by agencies and aggregate the values.
Args:
df (DataFrame): Input data.
group_col (str, optional): Column to group by. Defaults to "parent_slug".
value_col (str, optional): Column for values for grouping and aggregation. Defaults to "document_number".
aggfunc (str, optional): Aggregation function. Defaults to "count".
significant (bool, optional): Whether to include significance data in values. Defaults to True.
metadata (dict | None, optional): Agency metadata. Defaults to None.
metadata_value (str, optional): Metadata value to add to output data. Defaults to "acronym".
Returns:
DataFrame: Grouped and aggregated data.
"""
aggfunc_dict = {value_col: aggfunc, }
if significant:
aggfunc_dict.update({
"3f1_significant": "sum",
"other_significant": "sum",
})
df_ex = df.explode(group_col, ignore_index=True)
grouped = df_ex.groupby(
by=group_col
).agg(
aggfunc_dict
).reset_index()
grouped = grouped.sort_values(value_col, ascending=False).rename(
columns={
group_col: "agency",
value_col: "rules",
}, errors="ignore"
)
if metadata is not None:
grouped.loc[:, metadata_value] = get_agency_metadata_values(
grouped,
agency_column="agency",
metadata=metadata,
metadata_value=metadata_value
)
cols = ["agency", metadata_value, "rules", "3f1_significant", "other_significant"]
grouped = grouped.loc[:, [c for c in cols if c in grouped.columns]]
return grouped
def groupby_date(
df: DataFrame,
group_col: str | tuple | list = ("publication_year", "publication_month", ),
value_col: str = "document_number",
aggfunc: str = "count",
significant: bool = True
):
"""Group data by a given date frequency and aggregate the values.
Args:
df (DataFrame): Input data.
group_col (str | tuple | list, optional): Columns to group by. Defaults to ("publication_year", "publication_month", ).
value_col (str, optional): Column for values for grouping and aggregation. Defaults to "document_number".
aggfunc (str, optional): Aggregation function. Defaults to "count".
significant (bool, optional): Whether to include significance data in values. Defaults to True.
Raises:
TypeError: Parameter 'group_col' must be type `str`, `list`, or `tuple`.
Returns:
DataFrame: Grouped and aggregated data.
"""
if isinstance(group_col, str):
group_col = [group_col]
elif isinstance(group_col, (list, tuple)):
group_col = list(group_col)
else:
raise TypeError("Parameter 'group_col' must be type `str`, `list`, or `tuple`.")
aggfunc_dict = {value_col: aggfunc, }
if significant:
aggfunc_dict.update({
"3f1_significant": "sum",
"other_significant": "sum",
})
grouped = df.groupby(
by=group_col
).agg(
aggfunc_dict
).reset_index()
grouped = grouped.rename(columns={
value_col: "rules",
}, errors="ignore")
return grouped
if __name__ == "__main__":
from datetime import date, timedelta
from pandas import to_datetime
TODAY = date.today()
WEEKS_AGO = TODAY - timedelta(weeks=10)
dates = [(WEEKS_AGO - timedelta(days=r)) for r in range(21) if (r % 3 != 0)][::-1] + [(TODAY - timedelta(days=r)) for r in range(21)][::-1]
df = DataFrame({"dates": dates, "values": [idx for idx, _ in enumerate(dates)]})
df_a = pad_missing_dates(df, "dates", "days", fill_padded_values={"values": 0})
print(df_a.head(10))
df = add_week_info_to_data(df, date_column="dates")
print(df.head(10))
grouped = groupby_date(df, group_col=("week_number", "week_of"), value_col="values", significant=False)
print(grouped)
df_b = pad_missing_dates(grouped, "week_of", how="weeks", fill_padded_values={"rules": 0})
print(df_b)