File size: 3,290 Bytes
749d1d8 5d9e0b8 749d1d8 285612d 61f9cd0 749d1d8 67a3546 285612d 5ec6657 749d1d8 32235fd 8ddb605 67a3546 32235fd 749d1d8 5d9e0b8 fe10af2 5d9e0b8 749d1d8 47ad458 749d1d8 67a3546 285612d a40bda5 67a3546 513505c 99ec3d4 e77b07f 5d9e0b8 285612d b65cbe6 285612d 67a3546 99ec3d4 67a3546 5ec6657 285612d 67a3546 d08f251 67a3546 5d9e0b8 285612d 5d9e0b8 285612d 5d9e0b8 e69ab4e 3976b91 285612d 5d9e0b8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
import os
import time
from datetime import datetime
import pandas as pd
import schedule
from datasets import Dataset
from utilities.user_defined_functions import (
get_latest_data,
merge_data,
load_or_create_dataset,
remove_filtered_rows,
load_or_create_comment_dataset
)
from utilities.my_logger import setup_logger
from utilities.readme_update import update_dataset_readme
# Set dataset name, path to README.md, and existing dataset details
subreddit = os.environ["SUBREDDIT"]
username = os.environ["USERNAME"]
dataset_name = f"{username}/reddit-{subreddit}"
comment_dataset_name = f"{username}/reddit-comments-{subreddit}"
dataset_readme_path = "README.md"
frequency = os.environ.get("FREQUENCY", '').lower()
if frequency not in ["daily", "hourly", "custom"]:
raise ValueError("FREQUENCY environment variable must be 'daily' or 'hourly'")
# Authenticate with Hugging Face using an auth token
auth_token = os.environ["HF_TOKEN"]
logger = setup_logger(__name__)
def upload(new_df, dataset, hf_dataset_name):
date = datetime.now().strftime('%Y-%m-%d')
subset = f"year_{datetime.now().year}"
if 'train' in dataset.keys():
old_df = dataset['train'].to_pandas() if 'train' in dataset.keys() else pd.DataFrame()
df = merge_data(old_df=old_df, new_df=new_df)
new_rows = len(df) - len(old_df)
# New dataset
else:
df = new_df
df['new'] = True
df['updated'] = False
new_rows = len(new_df)
df = remove_filtered_rows(df)
dataset['train'] = Dataset.from_pandas(df, preserve_index=False)
# Update README
logger.info(f"Adding {new_rows} rows for {date}.")
# Push the augmented dataset to the Hugging Face hub
logger.debug(f"Pushing data for {date} to {hf_dataset_name}")
dataset.push_to_hub(hf_dataset_name, subset, token=auth_token)
logger.info(f"Processed and pushed data for {date} to {hf_dataset_name}")
update_dataset_readme(dataset_name=hf_dataset_name, subreddit=subreddit, new_rows=new_rows)
logger.info(f"Updated README.")
def main():
date = datetime.now().strftime('%Y-%m-%d')
logger.warning(f"Running main function for date: {date}")
sub_dataset = load_or_create_dataset()
new_df, new_df_comment = get_latest_data()
upload(new_df, sub_dataset, dataset_name)
del sub_dataset, new_df
import gc
gc.collect()
comment_dataset = load_or_create_comment_dataset()
upload(new_df_comment, comment_dataset, comment_dataset_name)
def schedule_periodic_task():
"""
Schedule the main task to run at the user-defined frequency
"""
if frequency == 'hourly':
logger.info(f'Scheduling tasks to run every hour at the top of the hour')
schedule.every().hour.at(":00").do(main)
elif frequency == 'daily':
start_time = '05:00'
logger.info(f'Scheduling tasks to run every day at: {start_time} UTC+00')
schedule.every().day.at(start_time).do(main)
elif frequency == 'custom':
logger.info(f'Scheduling tasks to run every 3 hours at the top of the hour')
schedule.every(3).hours.at(":00").do(main)
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
schedule_periodic_task()
|