File size: 5,525 Bytes
749d1d8
 
 
 
 
 
 
 
 
 
 
 
 
32235fd
 
 
749d1d8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
91a4dd8
749d1d8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import os
import time
from datetime import datetime, timedelta

import pandas as pd
from datasets import Dataset, DatasetDict, load_dataset
from huggingface_hub import login

from my_logger import setup_logger
from utilities.pushshift_data import scrape_submissions_by_day, submissions_to_dataframe

# Set dataset name, path to README.md, and existing dataset details
subreddit = os.environ["SUBREDDIT"]
username = os.environ["USERNAME"]
dataset_name = f"{username}/dataset-creator-{subreddit}"
dataset_readme_path = "README.md"

# Authenticate with Hugging Face using an auth token
auth_token = os.environ["HUGGINGFACE_AUTH_TOKEN"]
login(auth_token, add_to_git_credential=True)

logger = setup_logger(__name__)


def update_readme(dataset_name, subreddit, date_to_fetch):
    readme_text = f"""
    # {dataset_name}

    ## Dataset Overview
    The goal is to have an open dataset of `{subreddit}` submissions. This has been taken from the Pushshift API.

    ## Data Collection
    This has been collected with sequential calls that follow the pagination of the pushshift request. 


    ## Data Structure
    - `all_days`: All the data after `{os.environ["START_DATE"]}`

    ## Update Frequency
    The dataset is updated daily and covers the period from `{os.environ["START_DATE"]}` to two days ago.

    ## Attribution
    Data sourced from the Pushshift API.

    ## Change Log
    <details>
    <summary>Click to expand</summary>

    - **{datetime.now().strftime('%Y-%m-%d')}:** Added data for {date_to_fetch} to the 'all_days' split and saved as CSV

    </details>
    """

    return readme_text


def main(date_to_fetch):
    """
    Runs the main data processing function to fetch and process subreddit data for the specified date.

    Args:
        date_to_fetch (str): The date to fetch subreddit data for, in the format "YYYY-MM-DD".

    Returns:
        most_recent_date (str): Most recent date in dataset
    """

    # Load the existing dataset from the Hugging Face hub or create a new one
    try:
        dataset = load_dataset(dataset_name)
        logger.info("Loading existing dataset")
        if "__index_level_0__" in dataset["all_days"].column_names:
            dataset = dataset.remove_columns(["__index_level_0__"])
    except FileNotFoundError:
        logger.info("Creating new dataset")
        dataset = DatasetDict()

    # Call get_subreddit_day with the calculated date
    logger.info(f"Fetching data for {date_to_fetch}")
    submissions = scrape_submissions_by_day(subreddit, date_to_fetch)
    df = submissions_to_dataframe(submissions)
    logger.info(f"Data fetched for {date_to_fetch}")
    most_recent_date = datetime.strptime(date_to_fetch, '%Y-%m-%d').date()

    # Append DataFrame to split 'all_days' or create new split
    if "all_days" in dataset:
        logger.info("Appending data to split 'all_days'")
        # Merge the new submissions
        old_data = dataset['all_days'].to_pandas()
        new_data = pd.concat([old_data, df], ignore_index=True)

        # Drop duplicates just in case
        new_data = new_data.drop_duplicates(subset=['id'], keep="first")
        new_data_most_recent_date_raw = new_data['created_utc'].max()
        new_data_most_recent_date_dt = datetime.strptime(new_data_most_recent_date_raw.split(' ')[0], '%Y-%m-%d').date()
        # Adding timedelta in case there is rounding error
        most_recent_date = max(new_data_most_recent_date_dt - timedelta(days=1), most_recent_date)

        # Convert back to dataset
        dataset["all_days"] = Dataset.from_pandas(new_data)
    else:
        logger.info("Creating new split 'all_days'")
        dataset["all_days"] = Dataset.from_pandas(df)
    # Log appending or creating split 'all'
    logger.info("Appended or created split 'all_days'")

    # Push the augmented dataset to the Hugging Face hub
    logger.info(f"Pushing data for {date_to_fetch} to the Hugging Face hub")
    readme_text = update_readme(dataset_name, subreddit, date_to_fetch)
    dataset.description = readme_text
    dataset.push_to_hub(dataset_name, token=auth_token)
    logger.info(f"Processed and pushed data for {date_to_fetch} to the Hugging Face Hub")
    return most_recent_date


def run_main_continuously():
    """
    This function runs the given `main_function` continuously, starting from the date specified
    in the environment variable "START_DATE" until two days ago. Once it reaches two days ago,
    it will wait until tomorrow to start again at the same time as when it started today.
    """
    start_date_str = os.environ.get("START_DATE")
    start_date = datetime.strptime(start_date_str, "%Y-%m-%d").date()

    # Calculate the start time for running the main_function every day.
    start_time = datetime.now().time()

    while True:
        today = datetime.now().date()
        two_days_ago = today - timedelta(days=2)

        if start_date <= two_days_ago:
            logger.info(f"Running main function for date: {start_date}")
            most_recent_date = main(str(start_date))
            start_date = most_recent_date + timedelta(days=1)
        else:
            tomorrow = today + timedelta(days=1)
            now = datetime.now()
            start_of_tomorrow = datetime.combine(tomorrow, start_time)
            wait_until_tomorrow = (start_of_tomorrow - now).total_seconds()
            logger.info(f"Waiting until tomorrow: {wait_until_tomorrow} seconds")
            time.sleep(wait_until_tomorrow)


if __name__ == '__main__':
    run_main_continuously()