|
import os |
|
from datetime import datetime |
|
import json |
|
|
|
import pandas as pd |
|
from datasets import Dataset, DatasetDict, load_dataset, DownloadMode |
|
from huggingface_hub import login |
|
|
|
from utilities.data_processing import data_processing |
|
from utilities.my_logger import setup_logger |
|
from utilities.praw_downloader import praw_downloader |
|
from utilities.praw_processor import preprocess_praw_data |
|
|
|
|
|
subreddit = os.environ["SUBREDDIT"] |
|
username = os.environ["USERNAME"] |
|
dataset_name = f"{username}/dataset-creator-reddit-{subreddit}" |
|
|
|
frequency = os.environ.get("FREQUENCY", '').lower() |
|
if frequency not in ["daily", "hourly"]: |
|
raise ValueError("FREQUENCY environment variable must be 'daily' or 'hourly'") |
|
|
|
|
|
auth_token = os.environ["HF_TOKEN"] |
|
login(auth_token, add_to_git_credential=True) |
|
|
|
logger = setup_logger(__name__) |
|
|
|
|
|
dummy_data = { |
|
"id": ['id'], |
|
"content": ["This is a sample post content. Just for demonstration purposes!"], |
|
"poster": ["sampleUser123"], |
|
"date_utc": [datetime.strptime("2023-10-26 14:30:45", '%Y-%m-%d %H:%M:%S')], |
|
"flair": ["Discussion"], |
|
"title": ["Sample Post Title: How to Use Hugging Face?"], |
|
"score": [457], |
|
"permalink": ["/r/sampleSubreddit/comments/sampleID/sample_post_title_how_to_use_hugging_face/"], |
|
"updated": [False], |
|
"new": [False], |
|
"nsfw": [False] |
|
} |
|
|
|
|
|
def load_or_create_dataset(): |
|
""" |
|
Loads an existing dataset from the Hugging Face hub or creates a new one if it doesn't exist. |
|
|
|
This function attempts to load a dataset specified by 'dataset_name'. If the dataset is not found, |
|
it creates a new dataset with 'dummy_data', pushes it to the Hugging Face hub, and then reloads it. |
|
After reloading, the dummy data is removed from the dataset. |
|
|
|
Returns: |
|
dataset (DatasetDict): The loaded or newly created dataset. |
|
|
|
Raises: |
|
FileNotFoundError: If the dataset cannot be loaded or created. |
|
""" |
|
|
|
try: |
|
logger.debug(f"Trying to download {dataset_name}") |
|
dataset = load_dataset(dataset_name, download_mode=DownloadMode.FORCE_REDOWNLOAD) |
|
logger.debug("Loading existing dataset") |
|
except FileNotFoundError: |
|
logger.warning("Creating new dataset") |
|
|
|
|
|
dataset = DatasetDict() |
|
dataset['train'] = Dataset.from_dict(dummy_data) |
|
dataset.push_to_hub(repo_id=dataset_name, token=auth_token) |
|
|
|
|
|
dataset = load_dataset(dataset_name) |
|
|
|
|
|
del dataset['train'] |
|
return dataset |
|
|
|
|
|
def merge_data(old_df: pd.DataFrame, new_df: pd.DataFrame) -> pd.DataFrame: |
|
""" |
|
Merges two dataframes, sorts them by 'date_utc', and marks new IDs. |
|
|
|
The function first marks rows from the new dataframe, then concatenates the old and new dataframes. |
|
It sorts the resulting dataframe by the 'date_utc' column. Rows from the new dataframe that are not |
|
in the old dataframe are marked as 'new'. |
|
|
|
Args: |
|
- old_df (pd.DataFrame): The original dataframe. |
|
- new_df (pd.DataFrame): The new dataframe to be merged with the original dataframe. |
|
|
|
Returns: |
|
- pd.DataFrame: The merged, sorted, and marked dataframe. |
|
""" |
|
|
|
old_df.drop(columns=['new', 'updated'], inplace=True) |
|
|
|
df = pd.concat([old_df, new_df], ignore_index=True).sort_values(by='date_utc').reset_index(drop=True) |
|
|
|
|
|
df = data_processing(df) |
|
|
|
|
|
df['new'] = df['id'].apply(lambda x: x in set(new_df['id']) - set(old_df['id'])) |
|
|
|
return df |
|
|
|
|
|
def remove_filtered_rows(df: pd.DataFrame) -> pd.DataFrame: |
|
""" |
|
Removes rows from the DataFrame where the 'id' is present in filter_ids.json. |
|
|
|
:param df: Input DataFrame to be filtered. |
|
:return: DataFrame with rows containing IDs present in filter_ids.json removed. |
|
""" |
|
|
|
with open('filter_ids.json', 'r') as file: |
|
filter_ids = json.load(file) |
|
|
|
|
|
filtered_df = df[~df['id'].isin(filter_ids)] |
|
logger.info(f"Filtered {len(df) - len(filtered_df)} rows from the DataFrame") |
|
|
|
return filtered_df |
|
|
|
|
|
def get_latest_data(): |
|
submissions = praw_downloader() |
|
df = preprocess_praw_data(submissions=submissions) |
|
return df |
|
|