|
from __future__ import annotations |
|
|
|
from datetime import datetime, date, timedelta |
|
|
|
from dateutil.relativedelta import * |
|
from pandas import DataFrame, Timestamp, to_datetime |
|
|
|
from .utils import get_agency_metadata_values |
|
|
|
|
|
def _get_first_week_start(dates: list[date], week_start: int | str | "weekday" = MO): |
|
"""Get the start date of the first week from a list of dates. |
|
Pass "week_start" to select a different start date for each week (defaults to Monday). |
|
""" |
|
if week_start in (MO, TU, WE, TH, FR, SA, SU): |
|
pass |
|
elif isinstance(week_start, str): |
|
weekdays = { |
|
"monday": MO, |
|
"tuesday": TU, |
|
"wednesday": WE, |
|
"thursday": TH, |
|
"friday": FR, |
|
"saturday": SA, |
|
"sunday": SU, |
|
} |
|
week_start = weekdays.get(week_start.lower(), MO) |
|
elif isinstance(week_start, int): |
|
weekdays = { |
|
0: MO, |
|
1: TU, |
|
2: WE, |
|
3: TH, |
|
4: FR, |
|
5: SA, |
|
6: SU, |
|
} |
|
week_start = weekdays.get(week_start, MO) |
|
else: |
|
raise TypeError("Parameter 'week_start' must be type `str`, `int`, or a dateutil weekday instance.") |
|
|
|
first_day = next(d for d in dates) |
|
return first_day + relativedelta(weekday=week_start(-1)) |
|
|
|
|
|
def _get_week_start_dates(first_week_start: date | Timestamp, end_date: date | None = None): |
|
"""Get the index and start date for each week. |
|
|
|
Args: |
|
first_week_start (date | Timestamp): Start date of the first week in the data. |
|
end_date (date | None, optional): End date for data. If None is passed (the default), the end date is `date.today()`. |
|
|
|
Returns: |
|
list[tuple]: List of tuples containing the week number and the start date. |
|
""" |
|
if end_date is None: |
|
end_date = date.today() |
|
try: |
|
week_start_dates = [first_week_start.date()] |
|
except AttributeError as err: |
|
week_start_dates = [first_week_start] |
|
while week_start_dates[-1] < end_date: |
|
next_start_date = week_start_dates[-1] + relativedelta(weeks=1) |
|
week_start_dates.append(next_start_date) |
|
week_start_dates = [day for day in week_start_dates if day <= end_date] |
|
week_start_dates = [d.date() if isinstance(d, (Timestamp, datetime)) else d for d in week_start_dates] |
|
return [(idx, w) for idx, w in enumerate(week_start_dates)] |
|
|
|
|
|
def _get_weeks(dates: list[date], end_date: date | None = None, **kwargs) -> list[tuple]: |
|
"""Takes a list, array, or other iterable of `datetime.date` values and returns a list of tuples containing (week_number, week_start_date) pairs. |
|
Pass keyword arg "week_start" - ranging from 0 (Monday) to 6 (Sunday) - to choose a different start date than Monday for the week. |
|
""" |
|
|
|
first_week_start = _get_first_week_start(dates, **kwargs) |
|
|
|
|
|
weeks = _get_week_start_dates(first_week_start, end_date=end_date) |
|
|
|
|
|
results = [] |
|
for d in dates: |
|
if isinstance(d, Timestamp): |
|
d = d.date() |
|
week_gen = ((idx, start_date) for idx, start_date in weeks if (start_date <= d < (start_date + timedelta(weeks=1)))) |
|
results.append(next(week_gen, (0, first_week_start))) |
|
return results |
|
|
|
|
|
def add_week_info_to_data(df: DataFrame, date_column: str = "publication_date", new_columns: tuple[str] = ("week_number", "week_of")): |
|
"""Add week number and week start date to input data. |
|
|
|
Args: |
|
df (DataFrame): Input data. |
|
date_column (str, optional): Name of column containing publication dates. Defaults to "publication_date". |
|
new_columns (tuple[str], optional): New column names. Defaults to ("week_number", "week_start"). |
|
|
|
Returns: |
|
DataFrame: Data containing week information. |
|
""" |
|
df_c = df.copy() |
|
data = df_c[date_column].to_list() |
|
if len(data) > 0: |
|
week_numbers, week_starts = list(zip(*_get_weeks(data))) |
|
df_c.loc[:, new_columns[0]] = week_numbers |
|
df_c.loc[:, new_columns[1]] = to_datetime(week_starts) |
|
return df_c |
|
|
|
|
|
def _pad_missing_weeks(timeframe_list: list[date], **kwargs): |
|
"""Pad dataframe with weeks missing from retrieved data (i.e., weeks without qualifying rule data). |
|
""" |
|
|
|
first_week_start = _get_first_week_start(timeframe_list) |
|
|
|
|
|
return _get_week_start_dates(first_week_start, **kwargs) |
|
|
|
|
|
def _pad_missing_days(timeframe_list: list[date], end_date: date | None = None): |
|
"""Pad dataframe with days missing from retrieved data (i.e., days without qualifying rule data). |
|
""" |
|
start_date = min(timeframe_list) |
|
if end_date is None: |
|
end_date = date.today() |
|
|
|
|
|
|
|
return [ |
|
start_date + relativedelta(days=n) |
|
for n in range((end_date - start_date).days + 1) |
|
if (start_date + relativedelta(days=n)).weekday() in range(0, 5) |
|
] |
|
|
|
|
|
def pad_missing_dates(df: DataFrame, pad_column: str, how: str, fill_padded_values: dict | None = None, **kwargs): |
|
"""Add missing dates (either weeks or days) to the dataset. |
|
|
|
Args: |
|
df (DataFrame): Input data. |
|
pad_column (str): Date column to pad. |
|
how (str): Whether to pad by "days" or "weeks". |
|
fill_padded_values (dict | None, optional): Dictionary of columns and values to fill for padded observations (e.g., {"column": 0}). Defaults to None. |
|
|
|
Raises: |
|
ValueError: Must pass 'days' or 'weeks' to parameter 'how'. |
|
|
|
Returns: |
|
DataFrame: Padded data. |
|
""" |
|
df_copy = df.copy() |
|
timeframe_list = [d.date() if isinstance(d, (Timestamp, datetime)) else d for d in df_copy[pad_column].to_list()] |
|
df_copy = df_copy.astype({pad_column: "object"}) |
|
df_copy.loc[:, pad_column] = timeframe_list |
|
|
|
|
|
if len(timeframe_list) > 0: |
|
|
|
|
|
if how == "days": |
|
week_numbers = None |
|
padded_timeframes = _pad_missing_days(timeframe_list, **kwargs) |
|
elif how == "weeks": |
|
week_numbers, padded_timeframes = zip(*_pad_missing_weeks(timeframe_list, **kwargs)) |
|
else: |
|
raise ValueError("Must pass 'days' or 'weeks' to parameter 'how'.") |
|
|
|
|
|
df_merge = DataFrame({pad_column: padded_timeframes}) |
|
pad_cols = [pad_column] |
|
if week_numbers is not None: |
|
df_merge.loc[:, "week_number"] = week_numbers |
|
pad_cols.append("week_number") |
|
df_copy = df_copy.merge(df_merge, on=pad_cols, how="outer", indicator=True) |
|
if fill_padded_values is not None: |
|
for col, val in fill_padded_values.items(): |
|
bool_ = df_copy["_merge"] == "right_only" |
|
df_copy.loc[bool_, col] = val |
|
|
|
return df_copy.drop(columns=["_merge"], errors="ignore") |
|
|
|
|
|
def groupby_agency( |
|
df: DataFrame, |
|
group_col: str = "parent_slug", |
|
value_col: str = "document_number", |
|
aggfunc: str = "count", |
|
significant: bool = True, |
|
metadata: dict | None = None, |
|
metadata_value: str = "acronym", |
|
): |
|
"""Group data by agencies and aggregate the values. |
|
|
|
Args: |
|
df (DataFrame): Input data. |
|
group_col (str, optional): Column to group by. Defaults to "parent_slug". |
|
value_col (str, optional): Column for values for grouping and aggregation. Defaults to "document_number". |
|
aggfunc (str, optional): Aggregation function. Defaults to "count". |
|
significant (bool, optional): Whether to include significance data in values. Defaults to True. |
|
metadata (dict | None, optional): Agency metadata. Defaults to None. |
|
metadata_value (str, optional): Metadata value to add to output data. Defaults to "acronym". |
|
|
|
Returns: |
|
DataFrame: Grouped and aggregated data. |
|
""" |
|
aggfunc_dict = {value_col: aggfunc, } |
|
if significant: |
|
aggfunc_dict.update({ |
|
"3f1_significant": "sum", |
|
"other_significant": "sum", |
|
}) |
|
df_ex = df.explode(group_col, ignore_index=True) |
|
grouped = df_ex.groupby( |
|
by=group_col |
|
).agg( |
|
aggfunc_dict |
|
).reset_index() |
|
grouped = grouped.sort_values(value_col, ascending=False).rename( |
|
columns={ |
|
group_col: "agency", |
|
value_col: "rules", |
|
}, errors="ignore" |
|
) |
|
if metadata is not None: |
|
grouped.loc[:, metadata_value] = get_agency_metadata_values( |
|
grouped, |
|
agency_column="agency", |
|
metadata=metadata, |
|
metadata_value=metadata_value |
|
) |
|
cols = ["agency", metadata_value, "rules", "3f1_significant", "other_significant"] |
|
grouped = grouped.loc[:, [c for c in cols if c in grouped.columns]] |
|
return grouped |
|
|
|
|
|
def groupby_date( |
|
df: DataFrame, |
|
group_col: str | tuple | list = ("publication_year", "publication_month", ), |
|
value_col: str = "document_number", |
|
aggfunc: str = "count", |
|
significant: bool = True |
|
): |
|
"""Group data by a given date frequency and aggregate the values. |
|
|
|
Args: |
|
df (DataFrame): Input data. |
|
group_col (str | tuple | list, optional): Columns to group by. Defaults to ("publication_year", "publication_month", ). |
|
value_col (str, optional): Column for values for grouping and aggregation. Defaults to "document_number". |
|
aggfunc (str, optional): Aggregation function. Defaults to "count". |
|
significant (bool, optional): Whether to include significance data in values. Defaults to True. |
|
|
|
Raises: |
|
TypeError: Parameter 'group_col' must be type `str`, `list`, or `tuple`. |
|
|
|
Returns: |
|
DataFrame: Grouped and aggregated data. |
|
""" |
|
if isinstance(group_col, str): |
|
group_col = [group_col] |
|
elif isinstance(group_col, (list, tuple)): |
|
group_col = list(group_col) |
|
else: |
|
raise TypeError("Parameter 'group_col' must be type `str`, `list`, or `tuple`.") |
|
|
|
aggfunc_dict = {value_col: aggfunc, } |
|
if significant: |
|
aggfunc_dict.update({ |
|
"3f1_significant": "sum", |
|
"other_significant": "sum", |
|
}) |
|
grouped = df.groupby( |
|
by=group_col |
|
).agg( |
|
aggfunc_dict |
|
).reset_index() |
|
grouped = grouped.rename(columns={ |
|
value_col: "rules", |
|
}, errors="ignore") |
|
return grouped |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
from datetime import date, timedelta |
|
from pandas import to_datetime |
|
|
|
TODAY = date.today() |
|
WEEKS_AGO = TODAY - timedelta(weeks=10) |
|
|
|
dates = [(WEEKS_AGO - timedelta(days=r)) for r in range(21) if (r % 3 != 0)][::-1] + [(TODAY - timedelta(days=r)) for r in range(21)][::-1] |
|
df = DataFrame({"dates": dates, "values": [idx for idx, _ in enumerate(dates)]}) |
|
|
|
df_a = pad_missing_dates(df, "dates", "days", fill_padded_values={"values": 0}) |
|
print(df_a.head(10)) |
|
|
|
df = add_week_info_to_data(df, date_column="dates") |
|
print(df.head(10)) |
|
|
|
grouped = groupby_date(df, group_col=("week_number", "week_of"), value_col="values", significant=False) |
|
print(grouped) |
|
|
|
df_b = pad_missing_dates(grouped, "week_of", how="weeks", fill_padded_values={"rules": 0}) |
|
print(df_b) |
|
|