File size: 2,642 Bytes
749d1d8
 
5d9e0b8
749d1d8
 
285612d
61f9cd0
749d1d8
e77b07f
285612d
5ec6657
749d1d8
 
 
32235fd
285612d
32235fd
749d1d8
5d9e0b8
 
 
 
749d1d8
 
 
 
 
 
285612d
 
 
61f9cd0
285612d
5d9e0b8
bcf2055
 
5d9e0b8
 
bcf2055
5d9e0b8
bcf2055
5d9e0b8
 
0f45b88
 
5d9e0b8
e77b07f
5d9e0b8
285612d
 
b65cbe6
285612d
 
 
 
 
5ec6657
 
285612d
 
5d9e0b8
285612d
5d9e0b8
285612d
5d9e0b8
 
 
 
 
 
e69ab4e
285612d
 
 
 
 
 
 
5d9e0b8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import os
import time
from datetime import datetime

import pandas as pd
import schedule
from datasets import Dataset

from utilities.user_defined_functions import get_latest_data, merge_data, load_or_create_dataset, remove_filtered_rows
from utilities.my_logger import setup_logger
from utilities.readme_update import update_dataset_readme

# Set dataset name, path to README.md, and existing dataset details
subreddit = os.environ["SUBREDDIT"]
username = os.environ["USERNAME"]
dataset_name = f"{username}/dataset-creator-reddit-{subreddit}"
dataset_readme_path = "README.md"

frequency = os.environ.get("FREQUENCY", '').lower()
if frequency not in ["daily", "hourly"]:
    raise ValueError("FREQUENCY environment variable must be 'daily' or 'hourly'")

# Authenticate with Hugging Face using an auth token
auth_token = os.environ["HUGGINGFACE_AUTH_TOKEN"]

logger = setup_logger(__name__)


def main():
    date = datetime.now().strftime('%Y-%m-%d')
    logger.warning(f"Running main function for date: {date}")
    dataset = load_or_create_dataset()

    new_df = get_latest_data()

    # Using dataset from hub
    if 'train' in dataset.keys():
        old_df = dataset['train'].to_pandas() if 'train' in dataset.keys() else pd.DataFrame()
        df = merge_data(old_df=old_df, new_df=new_df)
        new_rows = len(df) - len(old_df)
    # New dataset
    else:
        df = new_df
        df['new'] = True
        df['updated'] = False
        new_rows = len(new_df)
    df = remove_filtered_rows(df)
    dataset['train'] = Dataset.from_pandas(df, preserve_index=False)

    # Update README
    logger.info(f"Adding {new_rows} rows for {date}.")

    # Push the augmented dataset to the Hugging Face hub
    logger.debug(f"Pushing data for {date} to the Hugging Face hub")
    dataset.push_to_hub(dataset_name, token=auth_token)
    logger.info(f"Processed and pushed data for {date} to the Hugging Face Hub")
    update_dataset_readme(dataset_name=dataset_name, subreddit=subreddit, new_rows=new_rows)
    logger.info(f"Updated README.")


def schedule_periodic_task():
    """
    Schedule the main task to run at the user-defined frequency
    """
    if frequency == 'hourly':
        logger.info(f'Scheduling tasks to run every hour at the top of the hour')
        schedule.every().hour.at(":00").do(main)
    elif frequency == 'daily':
        start_time = '05:00'
        logger.info(f'Scheduling tasks to run every day at: {start_time} UTC+00')
        schedule.every().day.at(start_time).do(main)

    while True:
        schedule.run_pending()
        time.sleep(1)


if __name__ == "__main__":
    schedule_periodic_task()