|
import os |
|
import time |
|
from datetime import datetime |
|
|
|
import pandas as pd |
|
import schedule |
|
from datasets import Dataset, DatasetDict, load_dataset |
|
from huggingface_hub import login |
|
|
|
from utilities.data_collator import get_latest_data, merge_and_filter_data |
|
from utilities.my_logger import setup_logger |
|
from utilities.praw_downloader import dummy_data |
|
from utilities.readme_update import update_readme |
|
|
|
|
|
subreddit = os.environ["SUBREDDIT"] |
|
username = os.environ["USERNAME"] |
|
dataset_name = f"{username}/dataset-creator-reddit-{subreddit}" |
|
dataset_readme_path = "README.md" |
|
|
|
frequency = os.environ.get("FREQUENCY", '').lower() |
|
if frequency not in ["daily", "hourly"]: |
|
raise ValueError("FREQUENCY environment variable must be 'daily' or 'hourly'") |
|
|
|
|
|
auth_token = os.environ["HUGGINGFACE_AUTH_TOKEN"] |
|
login(auth_token, add_to_git_credential=True) |
|
|
|
logger = setup_logger(__name__) |
|
|
|
|
|
def get_dataset(): |
|
|
|
try: |
|
dataset = load_dataset(dataset_name) |
|
logger.debug("Loading existing dataset") |
|
except FileNotFoundError: |
|
logger.warning("Creating new dataset") |
|
|
|
|
|
dataset = DatasetDict() |
|
dataset['train'] = Dataset.from_dict(dummy_data) |
|
dataset.push_to_hub(repo_id=dataset_name, token=auth_token) |
|
|
|
|
|
dataset = load_dataset(dataset_name) |
|
|
|
|
|
del dataset['train'] |
|
return dataset |
|
|
|
|
|
def main(): |
|
date = datetime.now().strftime('%Y-%m-%d') |
|
logger.warning(f"Running main function for date: {date}") |
|
dataset = get_dataset() |
|
|
|
|
|
new_df = get_latest_data() |
|
if 'train' in dataset.keys(): |
|
old_df = dataset['train'].to_pandas() if 'train' in dataset.keys() else pd.DataFrame() |
|
df = merge_and_filter_data(old_df=old_df, new_df=new_df) |
|
new_rows = len(df) - len(old_df) |
|
else: |
|
df = new_df |
|
new_rows = len(new_df) |
|
dataset['train'] = Dataset.from_pandas(df, preserve_index=False) |
|
|
|
|
|
update_readme(dataset_name=dataset_name, subreddit=subreddit, latest_date=date, new_rows=new_rows) |
|
logger.info(f"Adding {new_rows} rows for {date}.") |
|
|
|
|
|
logger.debug(f"Pushing data for {date} to the Hugging Face hub") |
|
dataset.push_to_hub(dataset_name, token=auth_token) |
|
logger.info(f"Processed and pushed data for {date} to the Hugging Face Hub") |
|
|
|
|
|
def schedule_periodic_task(): |
|
""" |
|
Schedule the main task to run at the user-defined frequency |
|
""" |
|
if frequency == 'hourly': |
|
logger.info(f'Scheduling tasks to run every hour at the top of the hour') |
|
schedule.every().hour.at(":00").do(main) |
|
elif frequency == 'daily': |
|
start_time = '05:00' |
|
logger.info(f'Scheduling tasks to run every day at: {start_time} UTC+00') |
|
schedule.every().day.at(start_time).do(main) |
|
|
|
while True: |
|
schedule.run_pending() |
|
time.sleep(1) |
|
|
|
|
|
if __name__ == "__main__": |
|
schedule_periodic_task() |
|
|