File size: 11,681 Bytes
58bb4c7 393578a 58bb4c7 fe4f734 58bb4c7 fe4f734 58bb4c7 fe4f734 58bb4c7 fe4f734 58bb4c7 fe4f734 58bb4c7 fe4f734 58bb4c7 fe4f734 58bb4c7 fe4f734 58bb4c7 fe4f734 58bb4c7 393578a 58bb4c7 fe4f734 58bb4c7 fe4f734 58bb4c7 fe4f734 58bb4c7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 |
from __future__ import annotations
from datetime import datetime, date, timedelta
from dateutil.relativedelta import *
from pandas import DataFrame, Timestamp, to_datetime
from .utils import get_agency_metadata_values
def _get_first_week_start(dates: list[date], week_start: int | str | "weekday" = MO):
"""Get the start date of the first week from a list of dates.
Pass "week_start" to select a different start date for each week (defaults to Monday).
"""
if week_start in (MO, TU, WE, TH, FR, SA, SU):
pass
elif isinstance(week_start, str):
weekdays = {
"monday": MO,
"tuesday": TU,
"wednesday": WE,
"thursday": TH,
"friday": FR,
"saturday": SA,
"sunday": SU,
}
week_start = weekdays.get(week_start.lower(), MO)
elif isinstance(week_start, int):
weekdays = {
0: MO,
1: TU,
2: WE,
3: TH,
4: FR,
5: SA,
6: SU,
}
week_start = weekdays.get(week_start, MO)
else:
raise TypeError("Parameter 'week_start' must be type `str`, `int`, or a dateutil weekday instance.")
first_day = next(d for d in dates)
return first_day + relativedelta(weekday=week_start(-1))
def _get_week_start_dates(first_week_start: date | Timestamp, end_date: date | None = None):
"""Get the index and start date for each week.
Args:
first_week_start (date | Timestamp): Start date of the first week in the data.
end_date (date | None, optional): End date for data. If None is passed (the default), the end date is `date.today()`.
Returns:
list[tuple]: List of tuples containing the week number and the start date.
"""
if end_date is None:
end_date = date.today()
try:
week_start_dates = [first_week_start.date()]
except AttributeError as err:
week_start_dates = [first_week_start]
while week_start_dates[-1] < end_date:
next_start_date = week_start_dates[-1] + relativedelta(weeks=1)
week_start_dates.append(next_start_date)
week_start_dates = [day for day in week_start_dates if day <= end_date]
week_start_dates = [d.date() if isinstance(d, (Timestamp, datetime)) else d for d in week_start_dates]
return [(idx, w) for idx, w in enumerate(week_start_dates)]
def _get_weeks(dates: list[date], end_date: date | None = None, **kwargs) -> list[tuple]:
"""Takes a list, array, or other iterable of `datetime.date` values and returns a list of tuples containing (week_number, week_start_date) pairs.
Pass keyword arg "week_start" - ranging from 0 (Monday) to 6 (Sunday) - to choose a different start date than Monday for the week.
"""
# get the start date for the first week
first_week_start = _get_first_week_start(dates, **kwargs)
# get start date for each week in the input values
weeks = _get_week_start_dates(first_week_start, end_date=end_date)
# iterate over inputs, append tuple of week number and start date for each week
results = []
for d in dates:
if isinstance(d, Timestamp):
d = d.date()
week_gen = ((idx, start_date) for idx, start_date in weeks if (start_date <= d < (start_date + timedelta(weeks=1))))
results.append(next(week_gen, (0, first_week_start)))
return results
def add_week_info_to_data(df: DataFrame, date_column: str = "publication_date", new_columns: tuple[str] = ("week_number", "week_of")):
"""Add week number and week start date to input data.
Args:
df (DataFrame): Input data.
date_column (str, optional): Name of column containing publication dates. Defaults to "publication_date".
new_columns (tuple[str], optional): New column names. Defaults to ("week_number", "week_start").
Returns:
DataFrame: Data containing week information.
"""
df_c = df.copy()
data = df_c[date_column].to_list()
if len(data) > 0:
week_numbers, week_starts = list(zip(*_get_weeks(data)))
df_c.loc[:, new_columns[0]] = week_numbers
df_c.loc[:, new_columns[1]] = to_datetime(week_starts)
return df_c
def _pad_missing_weeks(timeframe_list: list[date], **kwargs):
"""Pad dataframe with weeks missing from retrieved data (i.e., weeks without qualifying rule data).
"""
# get the start date for the first week
first_week_start = _get_first_week_start(timeframe_list)
# get start date for each week in the input values
return _get_week_start_dates(first_week_start, **kwargs)
def _pad_missing_days(timeframe_list: list[date], end_date: date | None = None):
"""Pad dataframe with days missing from retrieved data (i.e., days without qualifying rule data).
"""
start_date = min(timeframe_list)
if end_date is None:
end_date = date.today()
# create list of weekdays from start to end dates
# remember that range() objects are exclusive of the stop
return [
start_date + relativedelta(days=n)
for n in range((end_date - start_date).days + 1)
if (start_date + relativedelta(days=n)).weekday() in range(0, 5)
]
def pad_missing_dates(df: DataFrame, pad_column: str, how: str, fill_padded_values: dict | None = None, **kwargs):
"""Add missing dates (either weeks or days) to the dataset.
Args:
df (DataFrame): Input data.
pad_column (str): Date column to pad.
how (str): Whether to pad by "days" or "weeks".
fill_padded_values (dict | None, optional): Dictionary of columns and values to fill for padded observations (e.g., {"column": 0}). Defaults to None.
Raises:
ValueError: Must pass 'days' or 'weeks' to parameter 'how'.
Returns:
DataFrame: Padded data.
"""
df_copy = df.copy()
timeframe_list = [d.date() if isinstance(d, (Timestamp, datetime)) else d for d in df_copy[pad_column].to_list()]
df_copy = df_copy.astype({pad_column: "object"})
df_copy.loc[:, pad_column] = timeframe_list
# pad dates if dataframe isn't empty
if len(timeframe_list) > 0:
# choose which time frequency needs padding
if how == "days":
week_numbers = None
padded_timeframes = _pad_missing_days(timeframe_list, **kwargs)
elif how == "weeks":
week_numbers, padded_timeframes = zip(*_pad_missing_weeks(timeframe_list, **kwargs))
else:
raise ValueError("Must pass 'days' or 'weeks' to parameter 'how'.")
# incorporate extended dates into dataframe
df_merge = DataFrame({pad_column: padded_timeframes})
pad_cols = [pad_column]
if week_numbers is not None:
df_merge.loc[:, "week_number"] = week_numbers
pad_cols.append("week_number")
df_copy = df_copy.merge(df_merge, on=pad_cols, how="outer", indicator=True)
if fill_padded_values is not None:
for col, val in fill_padded_values.items():
bool_ = df_copy["_merge"] == "right_only"
df_copy.loc[bool_, col] = val
return df_copy.drop(columns=["_merge"], errors="ignore")
def groupby_agency(
df: DataFrame,
group_col: str = "parent_slug",
value_col: str = "document_number",
aggfunc: str = "count",
significant: bool = True,
metadata: dict | None = None,
metadata_value: str = "acronym",
):
"""Group data by agencies and aggregate the values.
Args:
df (DataFrame): Input data.
group_col (str, optional): Column to group by. Defaults to "parent_slug".
value_col (str, optional): Column for values for grouping and aggregation. Defaults to "document_number".
aggfunc (str, optional): Aggregation function. Defaults to "count".
significant (bool, optional): Whether to include significance data in values. Defaults to True.
metadata (dict | None, optional): Agency metadata. Defaults to None.
metadata_value (str, optional): Metadata value to add to output data. Defaults to "acronym".
Returns:
DataFrame: Grouped and aggregated data.
"""
aggfunc_dict = {value_col: aggfunc, }
if significant:
aggfunc_dict.update({
"3f1_significant": "sum",
"other_significant": "sum",
})
df_ex = df.explode(group_col, ignore_index=True)
grouped = df_ex.groupby(
by=group_col
).agg(
aggfunc_dict
).reset_index()
grouped = grouped.sort_values(value_col, ascending=False).rename(
columns={
group_col: "agency",
value_col: "rules",
}, errors="ignore"
)
if metadata is not None:
grouped.loc[:, metadata_value] = get_agency_metadata_values(
grouped,
agency_column="agency",
metadata=metadata,
metadata_value=metadata_value
)
cols = ["agency", metadata_value, "rules", "3f1_significant", "other_significant"]
grouped = grouped.loc[:, [c for c in cols if c in grouped.columns]]
return grouped
def groupby_date(
df: DataFrame,
group_col: str | tuple | list = ("publication_year", "publication_month", ),
value_col: str = "document_number",
aggfunc: str = "count",
significant: bool = True
):
"""Group data by a given date frequency and aggregate the values.
Args:
df (DataFrame): Input data.
group_col (str | tuple | list, optional): Columns to group by. Defaults to ("publication_year", "publication_month", ).
value_col (str, optional): Column for values for grouping and aggregation. Defaults to "document_number".
aggfunc (str, optional): Aggregation function. Defaults to "count".
significant (bool, optional): Whether to include significance data in values. Defaults to True.
Raises:
TypeError: Parameter 'group_col' must be type `str`, `list`, or `tuple`.
Returns:
DataFrame: Grouped and aggregated data.
"""
if isinstance(group_col, str):
group_col = [group_col]
elif isinstance(group_col, (list, tuple)):
group_col = list(group_col)
else:
raise TypeError("Parameter 'group_col' must be type `str`, `list`, or `tuple`.")
aggfunc_dict = {value_col: aggfunc, }
if significant:
aggfunc_dict.update({
"3f1_significant": "sum",
"other_significant": "sum",
})
grouped = df.groupby(
by=group_col
).agg(
aggfunc_dict
).reset_index()
grouped = grouped.rename(columns={
value_col: "rules",
}, errors="ignore")
return grouped
if __name__ == "__main__":
from datetime import date, timedelta
from pandas import to_datetime
TODAY = date.today()
WEEKS_AGO = TODAY - timedelta(weeks=10)
dates = [(WEEKS_AGO - timedelta(days=r)) for r in range(21) if (r % 3 != 0)][::-1] + [(TODAY - timedelta(days=r)) for r in range(21)][::-1]
df = DataFrame({"dates": dates, "values": [idx for idx, _ in enumerate(dates)]})
df_a = pad_missing_dates(df, "dates", "days", fill_padded_values={"values": 0})
print(df_a.head(10))
df = add_week_info_to_data(df, date_column="dates")
print(df.head(10))
grouped = groupby_date(df, group_col=("week_number", "week_of"), value_col="values", significant=False)
print(grouped)
df_b = pad_missing_dates(grouped, "week_of", how="weeks", fill_padded_values={"rules": 0})
print(df_b)
|