from __future__ import annotations |
from datetime import datetime, date, timedelta |
from dateutil.relativedelta import * |
from pandas import DataFrame, Timestamp, to_datetime |
from .utils import get_agency_metadata_values |
def _get_first_week_start(dates: list[date], week_start: int | str | "weekday" = MO): |
"""Get the start date of the first week from a list of dates. |
Pass "week_start" to select a different start date for each week (defaults to Monday). |
""" |
if week_start in (MO, TU, WE, TH, FR, SA, SU): |
pass |
elif isinstance(week_start, str): |
weekdays = { |
"monday": MO, |
"tuesday": TU, |
"wednesday": WE, |
"thursday": TH, |
"friday": FR, |
"saturday": SA, |
"sunday": SU, |
} |
week_start = weekdays.get(week_start.lower(), MO) |
elif isinstance(week_start, int): |
weekdays = { |
0: MO, |
1: TU, |
2: WE, |
3: TH, |
4: FR, |
5: SA, |
6: SU, |
} |
week_start = weekdays.get(week_start, MO) |
else: |
raise TypeError("Parameter 'week_start' must be type `str`, `int`, or a dateutil weekday instance.") |
first_day = next(d for d in dates) |
return first_day + relativedelta(weekday=week_start(-1)) |
def _get_week_start_dates(first_week_start: date | Timestamp, end_date: date | None = None): |
"""Get the index and start date for each week. |
Args: |
first_week_start (date | Timestamp): Start date of the first week in the data. |
end_date (date | None, optional): End date for data. If None is passed (the default), the end date is `date.today()`. |
Returns: |
list[tuple]: List of tuples containing the week number and the start date. |
""" |
if end_date is None: |
end_date = date.today() |
try: |
week_start_dates = [first_week_start.date()] |
except AttributeError as err: |
week_start_dates = [first_week_start] |
while week_start_dates[-1] < end_date: |
next_start_date = week_start_dates[-1] + relativedelta(weeks=1) |
week_start_dates.append(next_start_date) |
week_start_dates = [day for day in week_start_dates if day <= end_date] |
week_start_dates = [d.date() if isinstance(d, (Timestamp, datetime)) else d for d in week_start_dates] |
return [(idx, w) for idx, w in enumerate(week_start_dates)] |
def _get_weeks(dates: list[date], end_date: date | None = None, **kwargs) -> list[tuple]: |
"""Takes a list, array, or other iterable of `datetime.date` values and returns a list of tuples containing (week_number, week_start_date) pairs. |
Pass keyword arg "week_start" - ranging from 0 (Monday) to 6 (Sunday) - to choose a different start date than Monday for the week. |
""" |
first_week_start = _get_first_week_start(dates, **kwargs) |
weeks = _get_week_start_dates(first_week_start, end_date=end_date) |
results = [] |
for d in dates: |
if isinstance(d, Timestamp): |
d = d.date() |
week_gen = ((idx, start_date) for idx, start_date in weeks if (start_date <= d < (start_date + timedelta(weeks=1)))) |
results.append(next(week_gen, (0, first_week_start))) |
return results |
def add_week_info_to_data(df: DataFrame, date_column: str = "publication_date", new_columns: tuple[str] = ("week_number", "week_of")): |
"""Add week number and week start date to input data. |
Args: |
df (DataFrame): Input data. |
date_column (str, optional): Name of column containing publication dates. Defaults to "publication_date". |
new_columns (tuple[str], optional): New column names. Defaults to ("week_number", "week_start"). |
Returns: |
DataFrame: Data containing week information. |
""" |
df_c = df.copy() |
data = df_c[date_column].to_list() |
if len(data) > 0: |
week_numbers, week_starts = list(zip(*_get_weeks(data))) |
df_c.loc[:, new_columns[0]] = week_numbers |
df_c.loc[:, new_columns[1]] = to_datetime(week_starts) |
return df_c |
def _pad_missing_weeks(timeframe_list: list[date], **kwargs): |
"""Pad dataframe with weeks missing from retrieved data (i.e., weeks without qualifying rule data). |
""" |
first_week_start = _get_first_week_start(timeframe_list) |
return _get_week_start_dates(first_week_start, **kwargs) |
def _pad_missing_days(timeframe_list: list[date], end_date: date | None = None): |
"""Pad dataframe with days missing from retrieved data (i.e., days without qualifying rule data). |
""" |
start_date = min(timeframe_list) |
if end_date is None: |
end_date = date.today() |
return [ |
start_date + relativedelta(days=n) |
for n in range((end_date - start_date).days + 1) |
if (start_date + relativedelta(days=n)).weekday() in range(0, 5) |
] |
def pad_missing_dates(df: DataFrame, pad_column: str, how: str, fill_padded_values: dict | None = None, **kwargs): |
"""Add missing dates (either weeks or days) to the dataset. |
Args: |
df (DataFrame): Input data. |
pad_column (str): Date column to pad. |
how (str): Whether to pad by "days" or "weeks". |
fill_padded_values (dict | None, optional): Dictionary of columns and values to fill for padded observations (e.g., {"column": 0}). Defaults to None. |
Raises: |
ValueError: Must pass 'days' or 'weeks' to parameter 'how'. |
Returns: |
DataFrame: Padded data. |
""" |
df_copy = df.copy() |
timeframe_list = [d.date() if isinstance(d, (Timestamp, datetime)) else d for d in df_copy[pad_column].to_list()] |
df_copy = df_copy.astype({pad_column: "object"}) |
df_copy.loc[:, pad_column] = timeframe_list |
if len(timeframe_list) > 0: |
if how == "days": |
week_numbers = None |
padded_timeframes = _pad_missing_days(timeframe_list, **kwargs) |
elif how == "weeks": |
week_numbers, padded_timeframes = zip(*_pad_missing_weeks(timeframe_list, **kwargs)) |
else: |
raise ValueError("Must pass 'days' or 'weeks' to parameter 'how'.") |
df_merge = DataFrame({pad_column: padded_timeframes}) |
pad_cols = [pad_column] |
if week_numbers is not None: |
df_merge.loc[:, "week_number"] = week_numbers |
pad_cols.append("week_number") |
df_copy = df_copy.merge(df_merge, on=pad_cols, how="outer", indicator=True) |
if fill_padded_values is not None: |
for col, val in fill_padded_values.items(): |
bool_ = df_copy["_merge"] == "right_only" |
df_copy.loc[bool_, col] = val |
return df_copy.drop(columns=["_merge"], errors="ignore") |
def groupby_agency( |
df: DataFrame, |
group_col: str = "parent_slug", |
value_col: str = "document_number", |
aggfunc: str = "count", |
significant: bool = True, |
metadata: dict | None = None, |
metadata_value: str = "acronym", |
): |
"""Group data by agencies and aggregate the values. |
Args: |
df (DataFrame): Input data. |
group_col (str, optional): Column to group by. Defaults to "parent_slug". |
value_col (str, optional): Column for values for grouping and aggregation. Defaults to "document_number". |
aggfunc (str, optional): Aggregation function. Defaults to "count". |
significant (bool, optional): Whether to include significance data in values. Defaults to True. |
metadata (dict | None, optional): Agency metadata. Defaults to None. |
metadata_value (str, optional): Metadata value to add to output data. Defaults to "acronym". |
Returns: |
DataFrame: Grouped and aggregated data. |
""" |
aggfunc_dict = {value_col: aggfunc, } |
if significant: |
aggfunc_dict.update({ |
"3f1_significant": "sum", |
"other_significant": "sum", |
}) |
df_ex = df.explode(group_col, ignore_index=True) |
grouped = df_ex.groupby( |
by=group_col |
).agg( |
aggfunc_dict |
).reset_index() |
grouped = grouped.sort_values(value_col, ascending=False).rename( |
columns={ |
group_col: "agency", |
value_col: "rules", |
}, errors="ignore" |
) |
if metadata is not None: |
grouped.loc[:, metadata_value] = get_agency_metadata_values( |
grouped, |
agency_column="agency", |
metadata=metadata, |
metadata_value=metadata_value |
) |
cols = ["agency", metadata_value, "rules", "3f1_significant", "other_significant"] |
grouped = grouped.loc[:, [c for c in cols if c in grouped.columns]] |
return grouped |
def groupby_date( |
df: DataFrame, |
group_col: str | tuple | list = ("publication_year", "publication_month", ), |
value_col: str = "document_number", |
aggfunc: str = "count", |
significant: bool = True |
): |
"""Group data by a given date frequency and aggregate the values. |
Args: |
df (DataFrame): Input data. |
group_col (str | tuple | list, optional): Columns to group by. Defaults to ("publication_year", "publication_month", ). |
value_col (str, optional): Column for values for grouping and aggregation. Defaults to "document_number". |
aggfunc (str, optional): Aggregation function. Defaults to "count". |
significant (bool, optional): Whether to include significance data in values. Defaults to True. |
Raises: |
TypeError: Parameter 'group_col' must be type `str`, `list`, or `tuple`. |
Returns: |
DataFrame: Grouped and aggregated data. |
""" |
if isinstance(group_col, str): |
group_col = [group_col] |
elif isinstance(group_col, (list, tuple)): |
group_col = list(group_col) |
else: |
raise TypeError("Parameter 'group_col' must be type `str`, `list`, or `tuple`.") |
aggfunc_dict = {value_col: aggfunc, } |
if significant: |
aggfunc_dict.update({ |
"3f1_significant": "sum", |
"other_significant": "sum", |
}) |
grouped = df.groupby( |
by=group_col |
).agg( |
aggfunc_dict |
).reset_index() |
grouped = grouped.rename(columns={ |
value_col: "rules", |
}, errors="ignore") |
return grouped |
if __name__ == "__main__": |
from datetime import date, timedelta |
from pandas import to_datetime |
TODAY = date.today() |
WEEKS_AGO = TODAY - timedelta(weeks=10) |
dates = [(WEEKS_AGO - timedelta(days=r)) for r in range(21) if (r % 3 != 0)][::-1] + [(TODAY - timedelta(days=r)) for r in range(21)][::-1] |
df = DataFrame({"dates": dates, "values": [idx for idx, _ in enumerate(dates)]}) |
df_a = pad_missing_dates(df, "dates", "days", fill_padded_values={"values": 0}) |
print(df_a.head(10)) |
df = add_week_info_to_data(df, date_column="dates") |
print(df.head(10)) |
grouped = groupby_date(df, group_col=("week_number", "week_of"), value_col="values", significant=False) |
print(grouped) |
df_b = pad_missing_dates(grouped, "week_of", how="weeks", fill_padded_values={"rules": 0}) |
print(df_b) |