File size: 3,299 Bytes
749d1d8
 
5d9e0b8
749d1d8
 
285612d
5d9e0b8
749d1d8
 
bcf2055
285612d
5d9e0b8
ed3130d
749d1d8
 
 
32235fd
285612d
32235fd
749d1d8
5d9e0b8
 
 
 
749d1d8
 
 
 
 
 
 
1b56724
 
 
5d9e0b8
1b56724
 
 
5d9e0b8
 
1b56724
5d9e0b8
 
 
 
 
 
 
 
1b56724
 
 
285612d
 
 
 
 
 
5d9e0b8
bcf2055
 
5d9e0b8
 
bcf2055
5d9e0b8
bcf2055
5d9e0b8
 
 
 
285612d
 
cdbb4c0
b65cbe6
285612d
 
 
 
 
 
 
5d9e0b8
285612d
5d9e0b8
285612d
bcf2055
5d9e0b8
 
 
 
 
 
 
285612d
 
 
 
 
 
 
5d9e0b8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import os
import time
from datetime import datetime

import pandas as pd
import schedule
from datasets import Dataset, DatasetDict, load_dataset
from huggingface_hub import login

from utilities.data_collator import get_latest_data, merge_data
from utilities.my_logger import setup_logger
from utilities.praw_downloader import dummy_data
from utilities.readme_update import update_readme

# Set dataset name, path to README.md, and existing dataset details
subreddit = os.environ["SUBREDDIT"]
username = os.environ["USERNAME"]
dataset_name = f"{username}/dataset-creator-reddit-{subreddit}"
dataset_readme_path = "README.md"

frequency = os.environ.get("FREQUENCY", '').lower()
if frequency not in ["daily", "hourly"]:
    raise ValueError("FREQUENCY environment variable must be 'daily' or 'hourly'")

# Authenticate with Hugging Face using an auth token
auth_token = os.environ["HUGGINGFACE_AUTH_TOKEN"]
login(auth_token, add_to_git_credential=True)

logger = setup_logger(__name__)


def get_dataset():
    # Load the existing dataset from the Hugging Face hub or create a new one
    try:
        dataset = load_dataset(dataset_name)
        logger.debug("Loading existing dataset")
    except FileNotFoundError:
        logger.warning("Creating new dataset")

        # Creating Initial Repo
        dataset = DatasetDict()
        dataset['train'] = Dataset.from_dict(dummy_data)
        dataset.push_to_hub(repo_id=dataset_name, token=auth_token)

        # Pulling from Initial Repo
        dataset = load_dataset(dataset_name)

        # Remove dummy data
        del dataset['train']
    return dataset


def main():
    date = datetime.now().strftime('%Y-%m-%d')
    logger.warning(f"Running main function for date: {date}")
    dataset = get_dataset()

    # Get Latest Data and merge with historic data
    new_df = get_latest_data()

    # Using dataset from hub
    if 'train' in dataset.keys():
        old_df = dataset['train'].to_pandas() if 'train' in dataset.keys() else pd.DataFrame()
        df = merge_data(old_df=old_df, new_df=new_df)
        new_rows = len(df) - len(old_df)
    # New dataset
    else:
        df = new_df
        new_rows = len(new_df)
    dataset['train'] = Dataset.from_pandas(df, preserve_index=False)

    # Update README
    update_readme(dataset_name=dataset_name, subreddit=subreddit, latest_date=date, new_rows=new_rows)
    logger.info(f"Adding {new_rows} rows for {date}.")

    # Push the augmented dataset to the Hugging Face hub
    logger.debug(f"Pushing data for {date} to the Hugging Face hub")
    dataset.push_to_hub(dataset_name, token=auth_token)
    logger.info(f"Processed and pushed data for {date} to the Hugging Face Hub")


def schedule_periodic_task():
    """
    Schedule the main task to run at the user-defined frequency
    """
    main()
    if frequency == 'hourly':
        logger.info(f'Scheduling tasks to run every hour at the top of the hour')
        schedule.every().hour.at(":00").do(main)
    elif frequency == 'daily':
        start_time = '05:00'
        logger.info(f'Scheduling tasks to run every day at: {start_time} UTC+00')
        schedule.every().day.at(start_time).do(main)

    while True:
        schedule.run_pending()
        time.sleep(1)


if __name__ == "__main__":
    schedule_periodic_task()