derek-thomas's picture
derek-thomas HF staff
Refactoring updates and minor fixes
61f9cd0
raw
history blame
No virus
3.94 kB
import os
from datetime import datetime
import pandas as pd
from datasets import Dataset, DatasetDict, load_dataset
from huggingface_hub import login
from utilities.data_processing import data_processing
from utilities.my_logger import setup_logger
from utilities.praw_downloader import praw_downloader
from utilities.praw_processor import preprocess_praw_data
# Set dataset name, path to README.md, and existing dataset details
subreddit = os.environ["SUBREDDIT"]
username = os.environ["USERNAME"]
dataset_name = f"{username}/dataset-creator-reddit-{subreddit}"
frequency = os.environ.get("FREQUENCY", '').lower()
if frequency not in ["daily", "hourly"]:
raise ValueError("FREQUENCY environment variable must be 'daily' or 'hourly'")
# Authenticate with Hugging Face using an auth token
auth_token = os.environ["HUGGINGFACE_AUTH_TOKEN"]
login(auth_token, add_to_git_credential=True)
logger = setup_logger(__name__)
# Dummy row for when we create a new repo make sure to put everything in a list
dummy_data = {
"id": ['id'],
"content": ["This is a sample post content. Just for demonstration purposes!"],
"poster": ["sampleUser123"],
"date_utc": [datetime.strptime("2023-10-26 14:30:45", '%Y-%m-%d %H:%M:%S')],
"flair": ["Discussion"],
"title": ["Sample Post Title: How to Use Hugging Face?"],
"score": [457],
"permalink": ["/r/sampleSubreddit/comments/sampleID/sample_post_title_how_to_use_hugging_face/"],
"updated": [False],
"new": [False],
}
def load_or_create_dataset():
"""
Loads an existing dataset from the Hugging Face hub or creates a new one if it doesn't exist.
This function attempts to load a dataset specified by 'dataset_name'. If the dataset is not found,
it creates a new dataset with 'dummy_data', pushes it to the Hugging Face hub, and then reloads it.
After reloading, the dummy data is removed from the dataset.
Returns:
dataset (DatasetDict): The loaded or newly created dataset.
Raises:
FileNotFoundError: If the dataset cannot be loaded or created.
"""
# Load the existing dataset from the Hugging Face hub or create a new one
try:
dataset = load_dataset(dataset_name)
logger.debug("Loading existing dataset")
except FileNotFoundError:
logger.warning("Creating new dataset")
# Creating Initial Repo
dataset = DatasetDict()
dataset['train'] = Dataset.from_dict(dummy_data)
dataset.push_to_hub(repo_id=dataset_name, token=auth_token)
# Pulling from Initial Repo
dataset = load_dataset(dataset_name)
# Remove dummy data
del dataset['train']
return dataset
def merge_data(old_df: pd.DataFrame, new_df: pd.DataFrame) -> pd.DataFrame:
"""
Merges two dataframes, sorts them by 'date_utc', and marks new IDs.
The function first marks rows from the new dataframe, then concatenates the old and new dataframes.
It sorts the resulting dataframe by the 'date_utc' column. Rows from the new dataframe that are not
in the old dataframe are marked as 'new'.
Args:
- old_df (pd.DataFrame): The original dataframe.
- new_df (pd.DataFrame): The new dataframe to be merged with the original dataframe.
Returns:
- pd.DataFrame: The merged, sorted, and marked dataframe.
"""
old_df.drop(columns=['new', 'updated'], inplace=True)
# Concatenate old and new dataframes, sort by 'date_utc', and reset index
df = pd.concat([old_df, new_df], ignore_index=True).sort_values(by='date_utc').reset_index(drop=True)
# Process data accordingly
df = data_processing(df)
# Identify new rows (present in new_df but not in old_df)
df['new'] = df['id'].apply(lambda x: x in set(new_df['id']) - set(old_df['id']))
return df
def get_latest_data():
submissions = praw_downloader()
df = preprocess_praw_data(submissions=submissions)
return df