derek-thomas's picture
derek-thomas HF staff
Updating function name
826ed51
raw
history blame
No virus
3.86 kB
import os
import time
from datetime import datetime
import pandas as pd
import schedule
from datasets import Dataset, DatasetDict, load_dataset
from huggingface_hub import login
from utilities.data_collator import get_latest_data, merge_data
from utilities.my_logger import setup_logger
from utilities.praw_downloader import dummy_data
from utilities.readme_update import update_readme
# Set dataset name, path to README.md, and existing dataset details
subreddit = os.environ["SUBREDDIT"]
username = os.environ["USERNAME"]
dataset_name = f"{username}/dataset-creator-reddit-{subreddit}"
dataset_readme_path = "README.md"
frequency = os.environ.get("FREQUENCY", '').lower()
if frequency not in ["daily", "hourly"]:
raise ValueError("FREQUENCY environment variable must be 'daily' or 'hourly'")
# Authenticate with Hugging Face using an auth token
auth_token = os.environ["HUGGINGFACE_AUTH_TOKEN"]
login(auth_token, add_to_git_credential=True)
logger = setup_logger(__name__)
def load_or_create_dataset():
"""
Loads an existing dataset from the Hugging Face hub or creates a new one if it doesn't exist.
This function attempts to load a dataset specified by 'dataset_name'. If the dataset is not found,
it creates a new dataset with 'dummy_data', pushes it to the Hugging Face hub, and then reloads it.
After reloading, the dummy data is removed from the dataset.
Returns:
dataset (DatasetDict): The loaded or newly created dataset.
Raises:
FileNotFoundError: If the dataset cannot be loaded or created.
"""
# Load the existing dataset from the Hugging Face hub or create a new one
try:
dataset = load_dataset(dataset_name)
logger.debug("Loading existing dataset")
except FileNotFoundError:
logger.warning("Creating new dataset")
# Creating Initial Repo
dataset = DatasetDict()
dataset['train'] = Dataset.from_dict(dummy_data)
dataset.push_to_hub(repo_id=dataset_name, token=auth_token)
# Pulling from Initial Repo
dataset = load_dataset(dataset_name)
# Remove dummy data
del dataset['train']
return dataset
def main():
date = datetime.now().strftime('%Y-%m-%d')
logger.warning(f"Running main function for date: {date}")
dataset = load_dataset()
# Get Latest Data and merge with historic data
new_df = get_latest_data()
# Using dataset from hub
if 'train' in dataset.keys():
old_df = dataset['train'].to_pandas() if 'train' in dataset.keys() else pd.DataFrame()
df = merge_data(old_df=old_df, new_df=new_df)
new_rows = len(df) - len(old_df)
# New dataset
else:
df = new_df
new_rows = len(new_df)
dataset['train'] = Dataset.from_pandas(df, preserve_index=False)
# Update README
update_readme(dataset_name=dataset_name, subreddit=subreddit, latest_date=date, new_rows=new_rows)
logger.info(f"Adding {new_rows} rows for {date}.")
# Push the augmented dataset to the Hugging Face hub
logger.debug(f"Pushing data for {date} to the Hugging Face hub")
dataset.push_to_hub(dataset_name, token=auth_token)
logger.info(f"Processed and pushed data for {date} to the Hugging Face Hub")
def schedule_periodic_task():
"""
Schedule the main task to run at the user-defined frequency
"""
main()
if frequency == 'hourly':
logger.info(f'Scheduling tasks to run every hour at the top of the hour')
schedule.every().hour.at(":00").do(main)
elif frequency == 'daily':
start_time = '05:00'
logger.info(f'Scheduling tasks to run every day at: {start_time} UTC+00')
schedule.every().day.at(start_time).do(main)
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == "__main__":
schedule_periodic_task()