File size: 4,058 Bytes
1756d68
2703fdd
 
 
e69ab4e
1756d68
2703fdd
 
1756d68
2703fdd
 
 
1756d68
 
 
 
 
 
 
 
 
 
 
 
 
 
61f9cd0
 
2703fdd
 
 
 
 
 
 
 
 
61f9cd0
 
2703fdd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0f72ff6
b55063e
2703fdd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
61f9cd0
2703fdd
 
 
 
 
 
 
61f9cd0
2703fdd
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import os
from datetime import datetime

import pandas as pd
from datasets import Dataset, DatasetDict, load_dataset, DownloadMode
from huggingface_hub import login

from utilities.data_processing import data_processing
from utilities.my_logger import setup_logger
from utilities.praw_downloader import praw_downloader
from utilities.praw_processor import preprocess_praw_data

# Set dataset name, path to README.md, and existing dataset details
subreddit = os.environ["SUBREDDIT"]
username = os.environ["USERNAME"]
dataset_name = f"{username}/dataset-creator-reddit-{subreddit}"

frequency = os.environ.get("FREQUENCY", '').lower()
if frequency not in ["daily", "hourly"]:
    raise ValueError("FREQUENCY environment variable must be 'daily' or 'hourly'")

# Authenticate with Hugging Face using an auth token
auth_token = os.environ["HUGGINGFACE_AUTH_TOKEN"]
login(auth_token, add_to_git_credential=True)

logger = setup_logger(__name__)

# Dummy row for when we create a new repo make sure to put everything in a list
dummy_data = {
    "id": ['id'],
    "content": ["This is a sample post content. Just for demonstration purposes!"],
    "poster": ["sampleUser123"],
    "date_utc": [datetime.strptime("2023-10-26 14:30:45", '%Y-%m-%d %H:%M:%S')],
    "flair": ["Discussion"],
    "title": ["Sample Post Title: How to Use Hugging Face?"],
    "score": [457],
    "permalink": ["/r/sampleSubreddit/comments/sampleID/sample_post_title_how_to_use_hugging_face/"],
    "updated": [False],
    "new": [False],
    }


def load_or_create_dataset():
    """
    Loads an existing dataset from the Hugging Face hub or creates a new one if it doesn't exist.

    This function attempts to load a dataset specified by 'dataset_name'. If the dataset is not found,
    it creates a new dataset with 'dummy_data', pushes it to the Hugging Face hub, and then reloads it.
    After reloading, the dummy data is removed from the dataset.

    Returns:
        dataset (DatasetDict): The loaded or newly created dataset.

    Raises:
        FileNotFoundError: If the dataset cannot be loaded or created.
    """
    # Load the existing dataset from the Hugging Face hub or create a new one
    try:
        logger.debug(f"Trying to download {dataset_name}")
        dataset = load_dataset(dataset_name, download_mode=DownloadMode.FORCE_REDOWNLOAD)
        logger.debug("Loading existing dataset")
    except FileNotFoundError:
        logger.warning("Creating new dataset")

        # Creating Initial Repo
        dataset = DatasetDict()
        dataset['train'] = Dataset.from_dict(dummy_data)
        dataset.push_to_hub(repo_id=dataset_name, token=auth_token)

        # Pulling from Initial Repo
        dataset = load_dataset(dataset_name)

        # Remove dummy data
        del dataset['train']
    return dataset


def merge_data(old_df: pd.DataFrame, new_df: pd.DataFrame) -> pd.DataFrame:
    """
    Merges two dataframes, sorts them by 'date_utc', and marks new IDs.

    The function first marks rows from the new dataframe, then concatenates the old and new dataframes.
    It sorts the resulting dataframe by the 'date_utc' column. Rows from the new dataframe that are not
    in the old dataframe are marked as 'new'.

    Args:
    - old_df (pd.DataFrame): The original dataframe.
    - new_df (pd.DataFrame): The new dataframe to be merged with the original dataframe.

    Returns:
    - pd.DataFrame: The merged, sorted, and marked dataframe.
    """

    old_df.drop(columns=['new', 'updated'], inplace=True)
    # Concatenate old and new dataframes, sort by 'date_utc', and reset index
    df = pd.concat([old_df, new_df], ignore_index=True).sort_values(by='date_utc').reset_index(drop=True)

    # Process data accordingly
    df = data_processing(df)

    # Identify new rows (present in new_df but not in old_df)
    df['new'] = df['id'].apply(lambda x: x in set(new_df['id']) - set(old_df['id']))

    return df


def get_latest_data():
    submissions = praw_downloader()
    df = preprocess_praw_data(submissions=submissions)
    return df