import os import time from datetime import datetime import pandas as pd import schedule from datasets import Dataset, load_dataset from huggingface_hub import login from utilities.user_defined_functions import get_latest_data, merge_data from utilities.my_logger import setup_logger from utilities.readme_update import update_readme # Set dataset name, path to README.md, and existing dataset details subreddit = os.environ["SUBREDDIT"] username = os.environ["USERNAME"] dataset_name = f"{username}/dataset-creator-reddit-{subreddit}" dataset_readme_path = "README.md" frequency = os.environ.get("FREQUENCY", '').lower() if frequency not in ["daily", "hourly"]: raise ValueError("FREQUENCY environment variable must be 'daily' or 'hourly'") # Authenticate with Hugging Face using an auth token auth_token = os.environ["HUGGINGFACE_AUTH_TOKEN"] login(auth_token, add_to_git_credential=True) logger = setup_logger(__name__) def main(): date = datetime.now().strftime('%Y-%m-%d') logger.warning(f"Running main function for date: {date}") dataset = load_dataset() # Get Latest Data and merge with historic data new_df = get_latest_data() # Using dataset from hub if 'train' in dataset.keys(): old_df = dataset['train'].to_pandas() if 'train' in dataset.keys() else pd.DataFrame() df = merge_data(old_df=old_df, new_df=new_df) new_rows = len(df) - len(old_df) # New dataset else: df = new_df new_rows = len(new_df) dataset['train'] = Dataset.from_pandas(df, preserve_index=False) # Update README update_readme(dataset_name=dataset_name, subreddit=subreddit, latest_date=date, new_rows=new_rows) logger.info(f"Adding {new_rows} rows for {date}.") # Push the augmented dataset to the Hugging Face hub logger.debug(f"Pushing data for {date} to the Hugging Face hub") dataset.push_to_hub(dataset_name, token=auth_token) logger.info(f"Processed and pushed data for {date} to the Hugging Face Hub") def schedule_periodic_task(): """ Schedule the main task to run at the user-defined frequency """ main() if frequency == 'hourly': logger.info(f'Scheduling tasks to run every hour at the top of the hour') schedule.every().hour.at(":00").do(main) elif frequency == 'daily': start_time = '05:00' logger.info(f'Scheduling tasks to run every day at: {start_time} UTC+00') schedule.every().day.at(start_time).do(main) while True: schedule.run_pending() time.sleep(1) if __name__ == "__main__": schedule_periodic_task()