File size: 11,681 Bytes
58bb4c7
 
 
 
 
 
 
393578a
58bb4c7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fe4f734
58bb4c7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fe4f734
 
58bb4c7
 
 
 
 
 
 
 
fe4f734
 
58bb4c7
 
 
 
 
 
 
 
 
 
 
 
 
 
fe4f734
58bb4c7
fe4f734
 
 
 
 
 
 
 
 
 
 
 
58bb4c7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fe4f734
58bb4c7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fe4f734
58bb4c7
 
fe4f734
 
 
 
 
 
 
58bb4c7
 
fe4f734
58bb4c7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
393578a
58bb4c7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fe4f734
 
 
 
 
 
 
 
 
 
 
 
 
 
 
58bb4c7
 
 
 
 
fe4f734
58bb4c7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fe4f734
58bb4c7
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
from __future__ import annotations

from datetime import datetime, date, timedelta

from dateutil.relativedelta import *
from pandas import DataFrame, Timestamp, to_datetime

from .utils import get_agency_metadata_values


def _get_first_week_start(dates: list[date], week_start: int | str | "weekday" = MO):
    """Get the start date of the first week from a list of dates. 
    Pass "week_start" to select a different start date for each week (defaults to Monday).
    """
    if week_start in (MO, TU, WE, TH, FR, SA, SU):
        pass    
    elif isinstance(week_start, str):
        weekdays = {
            "monday": MO, 
            "tuesday": TU,
            "wednesday": WE,
            "thursday": TH,
            "friday": FR,
            "saturday": SA,
            "sunday": SU, 
        }
        week_start = weekdays.get(week_start.lower(), MO)
    elif isinstance(week_start, int):
        weekdays = {
            0: MO, 
            1: TU,
            2: WE,
            3: TH,
            4: FR,
            5: SA,
            6: SU, 
        }
        week_start = weekdays.get(week_start, MO)
    else:
        raise TypeError("Parameter 'week_start' must be type `str`, `int`, or a dateutil weekday instance.")
    
    first_day = next(d for d in dates)
    return first_day + relativedelta(weekday=week_start(-1))


def _get_week_start_dates(first_week_start: date | Timestamp, end_date: date | None = None):
    """Get the index and start date for each week.

    Args:
        first_week_start (date | Timestamp): Start date of the first week in the data.
        end_date (date | None, optional): End date for data. If None is passed (the default), the end date is `date.today()`.

    Returns:
        list[tuple]: List of tuples containing the week number and the start date.
    """
    if end_date is None:
        end_date = date.today()
    try:
        week_start_dates = [first_week_start.date()]
    except AttributeError as err:
        week_start_dates = [first_week_start]
    while week_start_dates[-1] < end_date:
        next_start_date = week_start_dates[-1] + relativedelta(weeks=1)
        week_start_dates.append(next_start_date)
    week_start_dates = [day for day in week_start_dates if day <= end_date]
    week_start_dates = [d.date() if isinstance(d, (Timestamp, datetime)) else d for d in week_start_dates]
    return [(idx, w) for idx, w in enumerate(week_start_dates)]


def _get_weeks(dates: list[date], end_date: date | None = None, **kwargs) -> list[tuple]:
    """Takes a list, array, or other iterable of `datetime.date` values and returns a list of tuples containing (week_number, week_start_date) pairs.
    Pass keyword arg "week_start" - ranging from 0 (Monday) to 6 (Sunday) - to choose a different start date than Monday for the week.
    """
    # get the start date for the first week
    first_week_start = _get_first_week_start(dates, **kwargs)
    
    # get start date for each week in the input values
    weeks = _get_week_start_dates(first_week_start, end_date=end_date)

    # iterate over inputs, append tuple of week number and start date for each week
    results = []
    for d in dates:
        if isinstance(d, Timestamp):
            d = d.date()
        week_gen = ((idx, start_date) for idx, start_date in weeks if (start_date <= d < (start_date + timedelta(weeks=1))))
        results.append(next(week_gen, (0, first_week_start)))
    return results


def add_week_info_to_data(df: DataFrame, date_column: str = "publication_date", new_columns: tuple[str] = ("week_number", "week_of")):
    """Add week number and week start date to input data.

    Args:
        df (DataFrame): Input data.
        date_column (str, optional): Name of column containing publication dates. Defaults to "publication_date".
        new_columns (tuple[str], optional): New column names. Defaults to ("week_number", "week_start").

    Returns:
        DataFrame: Data containing week information.
    """    
    df_c = df.copy()
    data = df_c[date_column].to_list()
    if len(data) > 0:
        week_numbers, week_starts = list(zip(*_get_weeks(data)))
        df_c.loc[:, new_columns[0]] = week_numbers
        df_c.loc[:, new_columns[1]] = to_datetime(week_starts)
    return df_c


def _pad_missing_weeks(timeframe_list: list[date], **kwargs):
    """Pad dataframe with weeks missing from retrieved data (i.e., weeks without qualifying rule data).
    """
    # get the start date for the first week
    first_week_start = _get_first_week_start(timeframe_list)
    
    # get start date for each week in the input values
    return _get_week_start_dates(first_week_start, **kwargs)


def _pad_missing_days(timeframe_list: list[date], end_date: date | None = None):
    """Pad dataframe with days missing from retrieved data (i.e., days without qualifying rule data).
    """
    start_date = min(timeframe_list)
    if end_date is None:
        end_date = date.today()
    
    # create list of weekdays from start to end dates
    # remember that range() objects are exclusive of the stop
    return [
        start_date + relativedelta(days=n) 
        for n in range((end_date - start_date).days + 1) 
        if (start_date + relativedelta(days=n)).weekday() in range(0, 5)
        ]


def pad_missing_dates(df: DataFrame, pad_column: str, how: str, fill_padded_values: dict | None = None, **kwargs):
    """Add missing dates (either weeks or days) to the dataset.

    Args:
        df (DataFrame): Input data.
        pad_column (str): Date column to pad.
        how (str): Whether to pad by "days" or "weeks".
        fill_padded_values (dict | None, optional): Dictionary of columns and values to fill for padded observations (e.g., {"column": 0}). Defaults to None.

    Raises:
        ValueError: Must pass 'days' or 'weeks' to parameter 'how'.

    Returns:
        DataFrame: Padded data.
    """
    df_copy = df.copy()
    timeframe_list = [d.date() if isinstance(d, (Timestamp, datetime)) else d for d in df_copy[pad_column].to_list()]
    df_copy = df_copy.astype({pad_column: "object"})
    df_copy.loc[:, pad_column] = timeframe_list

    # pad dates if dataframe isn't empty
    if len(timeframe_list) > 0:
        
        # choose which time frequency needs padding
        if how == "days":
            week_numbers = None
            padded_timeframes = _pad_missing_days(timeframe_list, **kwargs)
        elif how == "weeks":
            week_numbers, padded_timeframes = zip(*_pad_missing_weeks(timeframe_list, **kwargs))
        else:
            raise ValueError("Must pass 'days' or 'weeks' to parameter 'how'.")

        # incorporate extended dates into dataframe
        df_merge = DataFrame({pad_column: padded_timeframes})
        pad_cols = [pad_column]
        if week_numbers is not None:
            df_merge.loc[:, "week_number"] = week_numbers
            pad_cols.append("week_number")
        df_copy = df_copy.merge(df_merge, on=pad_cols, how="outer", indicator=True)
        if fill_padded_values is not None:
            for col, val in fill_padded_values.items():
                bool_ = df_copy["_merge"] == "right_only"
                df_copy.loc[bool_, col] = val

    return df_copy.drop(columns=["_merge"], errors="ignore")


def groupby_agency(
        df: DataFrame, 
        group_col: str = "parent_slug", 
        value_col: str = "document_number", 
        aggfunc: str = "count", 
        significant: bool = True,
        metadata: dict | None = None, 
        metadata_value: str = "acronym", 
    ):
    """Group data by agencies and aggregate the values.

    Args:
        df (DataFrame): Input data.
        group_col (str, optional): Column to group by. Defaults to "parent_slug".
        value_col (str, optional): Column for values for grouping and aggregation. Defaults to "document_number".
        aggfunc (str, optional): Aggregation function. Defaults to "count".
        significant (bool, optional): Whether to include significance data in values. Defaults to True.
        metadata (dict | None, optional): Agency metadata. Defaults to None.
        metadata_value (str, optional): Metadata value to add to output data. Defaults to "acronym".

    Returns:
        DataFrame: Grouped and aggregated data.
    """    
    aggfunc_dict = {value_col: aggfunc, }
    if significant:
        aggfunc_dict.update({
            "3f1_significant": "sum", 
            "other_significant": "sum", 
            })
    df_ex = df.explode(group_col, ignore_index=True)
    grouped = df_ex.groupby(
        by=group_col
    ).agg(
        aggfunc_dict
        ).reset_index()
    grouped = grouped.sort_values(value_col, ascending=False).rename(
        columns={
            group_col: "agency", 
            value_col: "rules", 
            }, errors="ignore"
        )
    if metadata is not None:
        grouped.loc[:, metadata_value] = get_agency_metadata_values(
        grouped, 
        agency_column="agency", 
        metadata=metadata, 
        metadata_value=metadata_value
        )
        cols = ["agency", metadata_value, "rules", "3f1_significant", "other_significant"]
        grouped = grouped.loc[:, [c for c in cols if c in grouped.columns]]
    return grouped


def groupby_date(
        df: DataFrame, 
        group_col: str | tuple | list = ("publication_year", "publication_month", ),  
        value_col: str = "document_number", 
        aggfunc: str = "count", 
        significant: bool = True
    ):
    """Group data by a given date frequency and aggregate the values.

    Args:
        df (DataFrame): Input data.
        group_col (str | tuple | list, optional): Columns to group by. Defaults to ("publication_year", "publication_month", ).
        value_col (str, optional): Column for values for grouping and aggregation. Defaults to "document_number".
        aggfunc (str, optional): Aggregation function. Defaults to "count".
        significant (bool, optional): Whether to include significance data in values. Defaults to True.

    Raises:
        TypeError: Parameter 'group_col' must be type `str`, `list`, or `tuple`.

    Returns:
        DataFrame: Grouped and aggregated data.
    """    
    if isinstance(group_col, str):
        group_col = [group_col]
    elif isinstance(group_col, (list, tuple)):
        group_col = list(group_col)
    else:
        raise TypeError("Parameter 'group_col' must be type `str`, `list`, or `tuple`.")
    
    aggfunc_dict = {value_col: aggfunc, }
    if significant:
        aggfunc_dict.update({
            "3f1_significant": "sum", 
            "other_significant": "sum", 
            })
    grouped = df.groupby(
        by=group_col
    ).agg(
        aggfunc_dict
        ).reset_index()
    grouped = grouped.rename(columns={
        value_col: "rules", 
        }, errors="ignore")
    return grouped


if __name__ == "__main__":
    
    from datetime import date, timedelta
    from pandas import to_datetime

    TODAY = date.today()
    WEEKS_AGO = TODAY - timedelta(weeks=10)

    dates = [(WEEKS_AGO - timedelta(days=r)) for r in range(21) if (r % 3 != 0)][::-1] + [(TODAY - timedelta(days=r)) for r in range(21)][::-1]
    df = DataFrame({"dates": dates, "values": [idx for idx, _ in enumerate(dates)]})

    df_a = pad_missing_dates(df, "dates", "days", fill_padded_values={"values": 0})
    print(df_a.head(10))

    df = add_week_info_to_data(df, date_column="dates")
    print(df.head(10))

    grouped = groupby_date(df, group_col=("week_number", "week_of"), value_col="values", significant=False)
    print(grouped)
    
    df_b = pad_missing_dates(grouped, "week_of", how="weeks", fill_padded_values={"rules": 0})
    print(df_b)