Commit
•
2703fdd
1
Parent(s):
826ed51
Refactoring to make it clear which functions are user-defined
Browse files- main.py +2 -37
- utilities/{data_collator.py → data_processing.py} +2 -43
- utilities/praw_downloader.py +0 -14
- utilities/user_defined_functions.py +95 -0
main.py
CHANGED
@@ -4,12 +4,11 @@ from datetime import datetime
|
|
4 |
|
5 |
import pandas as pd
|
6 |
import schedule
|
7 |
-
from datasets import Dataset,
|
8 |
from huggingface_hub import login
|
9 |
|
10 |
-
from utilities.
|
11 |
from utilities.my_logger import setup_logger
|
12 |
-
from utilities.praw_downloader import dummy_data
|
13 |
from utilities.readme_update import update_readme
|
14 |
|
15 |
# Set dataset name, path to README.md, and existing dataset details
|
@@ -29,40 +28,6 @@ login(auth_token, add_to_git_credential=True)
|
|
29 |
logger = setup_logger(__name__)
|
30 |
|
31 |
|
32 |
-
def load_or_create_dataset():
|
33 |
-
"""
|
34 |
-
Loads an existing dataset from the Hugging Face hub or creates a new one if it doesn't exist.
|
35 |
-
|
36 |
-
This function attempts to load a dataset specified by 'dataset_name'. If the dataset is not found,
|
37 |
-
it creates a new dataset with 'dummy_data', pushes it to the Hugging Face hub, and then reloads it.
|
38 |
-
After reloading, the dummy data is removed from the dataset.
|
39 |
-
|
40 |
-
Returns:
|
41 |
-
dataset (DatasetDict): The loaded or newly created dataset.
|
42 |
-
|
43 |
-
Raises:
|
44 |
-
FileNotFoundError: If the dataset cannot be loaded or created.
|
45 |
-
"""
|
46 |
-
# Load the existing dataset from the Hugging Face hub or create a new one
|
47 |
-
try:
|
48 |
-
dataset = load_dataset(dataset_name)
|
49 |
-
logger.debug("Loading existing dataset")
|
50 |
-
except FileNotFoundError:
|
51 |
-
logger.warning("Creating new dataset")
|
52 |
-
|
53 |
-
# Creating Initial Repo
|
54 |
-
dataset = DatasetDict()
|
55 |
-
dataset['train'] = Dataset.from_dict(dummy_data)
|
56 |
-
dataset.push_to_hub(repo_id=dataset_name, token=auth_token)
|
57 |
-
|
58 |
-
# Pulling from Initial Repo
|
59 |
-
dataset = load_dataset(dataset_name)
|
60 |
-
|
61 |
-
# Remove dummy data
|
62 |
-
del dataset['train']
|
63 |
-
return dataset
|
64 |
-
|
65 |
-
|
66 |
def main():
|
67 |
date = datetime.now().strftime('%Y-%m-%d')
|
68 |
logger.warning(f"Running main function for date: {date}")
|
|
|
4 |
|
5 |
import pandas as pd
|
6 |
import schedule
|
7 |
+
from datasets import Dataset, load_dataset
|
8 |
from huggingface_hub import login
|
9 |
|
10 |
+
from utilities.user_defined_functions import get_latest_data, merge_data
|
11 |
from utilities.my_logger import setup_logger
|
|
|
12 |
from utilities.readme_update import update_readme
|
13 |
|
14 |
# Set dataset name, path to README.md, and existing dataset details
|
|
|
28 |
logger = setup_logger(__name__)
|
29 |
|
30 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
31 |
def main():
|
32 |
date = datetime.now().strftime('%Y-%m-%d')
|
33 |
logger.warning(f"Running main function for date: {date}")
|
utilities/{data_collator.py → data_processing.py}
RENAMED
@@ -1,16 +1,7 @@
|
|
1 |
import pandas as pd
|
2 |
|
3 |
-
from utilities.praw_downloader import praw_downloader
|
4 |
-
from utilities.praw_processor import preprocess_praw_data
|
5 |
|
6 |
-
|
7 |
-
def get_latest_data():
|
8 |
-
submissions = praw_downloader()
|
9 |
-
df = preprocess_praw_data(submissions=submissions)
|
10 |
-
return df
|
11 |
-
|
12 |
-
|
13 |
-
def filter_redundant_ids(df: pd.DataFrame) -> pd.DataFrame:
|
14 |
"""
|
15 |
For each id, creates a new row with the longest content and the highest score
|
16 |
from the available rows with the same id. Adds a boolean column 'updated'
|
@@ -56,38 +47,6 @@ def filter_redundant_ids(df: pd.DataFrame) -> pd.DataFrame:
|
|
56 |
return df_merged
|
57 |
|
58 |
|
59 |
-
def merge_data(old_df: pd.DataFrame, new_df: pd.DataFrame) -> pd.DataFrame:
|
60 |
-
"""
|
61 |
-
Merges two dataframes, sorts them by 'date_utc', and marks new IDs.
|
62 |
-
|
63 |
-
The function first marks rows from the new dataframe, then concatenates the old and new dataframes.
|
64 |
-
It sorts the resulting dataframe by the 'date_utc' column. Rows from the new dataframe that are not
|
65 |
-
in the old dataframe are marked as 'new'.
|
66 |
-
|
67 |
-
Args:
|
68 |
-
- old_df (pd.DataFrame): The original dataframe.
|
69 |
-
- new_df (pd.DataFrame): The new dataframe to be merged with the original dataframe.
|
70 |
-
|
71 |
-
Returns:
|
72 |
-
- pd.DataFrame: The merged, sorted, and marked dataframe.
|
73 |
-
"""
|
74 |
-
|
75 |
-
# Mark rows in old and new dataframes
|
76 |
-
old_df['new'] = False
|
77 |
-
new_df['new'] = True
|
78 |
-
|
79 |
-
# Concatenate old and new dataframes, sort by 'date_utc', and reset index
|
80 |
-
df = pd.concat([old_df, new_df], ignore_index=True).sort_values(by='date_utc').reset_index(drop=True)
|
81 |
-
|
82 |
-
# Optional: If you have a function to filter redundant IDs, you can use it here
|
83 |
-
df = filter_redundant_ids(df)
|
84 |
-
|
85 |
-
# Identify new rows (present in new_df but not in old_df)
|
86 |
-
df['new'] = df['new'] & ~df['id'].duplicated(keep=False)
|
87 |
-
|
88 |
-
return df
|
89 |
-
|
90 |
-
|
91 |
if __name__ == '__main__':
|
92 |
# Mock data
|
93 |
data = {
|
@@ -102,4 +61,4 @@ if __name__ == '__main__':
|
|
102 |
print("Original DataFrame:")
|
103 |
print(df)
|
104 |
print("\nFiltered DataFrame:")
|
105 |
-
print(
|
|
|
1 |
import pandas as pd
|
2 |
|
|
|
|
|
3 |
|
4 |
+
def data_processing(df: pd.DataFrame) -> pd.DataFrame:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
5 |
"""
|
6 |
For each id, creates a new row with the longest content and the highest score
|
7 |
from the available rows with the same id. Adds a boolean column 'updated'
|
|
|
47 |
return df_merged
|
48 |
|
49 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
50 |
if __name__ == '__main__':
|
51 |
# Mock data
|
52 |
data = {
|
|
|
61 |
print("Original DataFrame:")
|
62 |
print(df)
|
63 |
print("\nFiltered DataFrame:")
|
64 |
+
print(data_processing(df))
|
utilities/praw_downloader.py
CHANGED
@@ -13,20 +13,6 @@ logger = setup_logger(__name__)
|
|
13 |
subreddit_var = os.getenv("SUBREDDIT")
|
14 |
reddit_pull_limit = int(os.getenv("REDDIT_PULL_LIMIT"))
|
15 |
|
16 |
-
# Dummy row for when we create a new repo
|
17 |
-
dummy_data = {
|
18 |
-
"id": ['id'],
|
19 |
-
"content": ["This is a sample post content. Just for demonstration purposes!"],
|
20 |
-
"poster": ["sampleUser123"],
|
21 |
-
"date_utc": [datetime.strptime("2023-10-26 14:30:45", '%Y-%m-%d %H:%M:%S')],
|
22 |
-
"flair": ["Discussion"],
|
23 |
-
"title": ["Sample Post Title: How to Use Hugging Face?"],
|
24 |
-
"score": [457],
|
25 |
-
"permalink": ["/r/sampleSubreddit/comments/sampleID/sample_post_title_how_to_use_hugging_face/"],
|
26 |
-
"updated": False,
|
27 |
-
"new": False,
|
28 |
-
}
|
29 |
-
|
30 |
|
31 |
def get_reddit_instance() -> praw.Reddit:
|
32 |
"""Initialize and return a Reddit instance using PRAW."""
|
|
|
13 |
subreddit_var = os.getenv("SUBREDDIT")
|
14 |
reddit_pull_limit = int(os.getenv("REDDIT_PULL_LIMIT"))
|
15 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
16 |
|
17 |
def get_reddit_instance() -> praw.Reddit:
|
18 |
"""Initialize and return a Reddit instance using PRAW."""
|
utilities/user_defined_functions.py
ADDED
@@ -0,0 +1,95 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
from datetime import datetime
|
2 |
+
|
3 |
+
import pandas as pd
|
4 |
+
from datasets import Dataset, DatasetDict, load_dataset
|
5 |
+
|
6 |
+
from main import auth_token, dataset_name, logger
|
7 |
+
from utilities.data_processing import data_processing
|
8 |
+
from utilities.praw_downloader import praw_downloader
|
9 |
+
from utilities.praw_processor import preprocess_praw_data
|
10 |
+
|
11 |
+
# Dummy row for when we create a new repo
|
12 |
+
dummy_data = {
|
13 |
+
"id": ['id'],
|
14 |
+
"content": ["This is a sample post content. Just for demonstration purposes!"],
|
15 |
+
"poster": ["sampleUser123"],
|
16 |
+
"date_utc": [datetime.strptime("2023-10-26 14:30:45", '%Y-%m-%d %H:%M:%S')],
|
17 |
+
"flair": ["Discussion"],
|
18 |
+
"title": ["Sample Post Title: How to Use Hugging Face?"],
|
19 |
+
"score": [457],
|
20 |
+
"permalink": ["/r/sampleSubreddit/comments/sampleID/sample_post_title_how_to_use_hugging_face/"],
|
21 |
+
"updated": False,
|
22 |
+
"new": False,
|
23 |
+
}
|
24 |
+
|
25 |
+
|
26 |
+
def load_or_create_dataset():
|
27 |
+
"""
|
28 |
+
Loads an existing dataset from the Hugging Face hub or creates a new one if it doesn't exist.
|
29 |
+
|
30 |
+
This function attempts to load a dataset specified by 'dataset_name'. If the dataset is not found,
|
31 |
+
it creates a new dataset with 'dummy_data', pushes it to the Hugging Face hub, and then reloads it.
|
32 |
+
After reloading, the dummy data is removed from the dataset.
|
33 |
+
|
34 |
+
Returns:
|
35 |
+
dataset (DatasetDict): The loaded or newly created dataset.
|
36 |
+
|
37 |
+
Raises:
|
38 |
+
FileNotFoundError: If the dataset cannot be loaded or created.
|
39 |
+
"""
|
40 |
+
# Load the existing dataset from the Hugging Face hub or create a new one
|
41 |
+
try:
|
42 |
+
dataset = load_dataset(dataset_name)
|
43 |
+
logger.debug("Loading existing dataset")
|
44 |
+
except FileNotFoundError:
|
45 |
+
logger.warning("Creating new dataset")
|
46 |
+
|
47 |
+
# Creating Initial Repo
|
48 |
+
dataset = DatasetDict()
|
49 |
+
dataset['train'] = Dataset.from_dict(dummy_data)
|
50 |
+
dataset.push_to_hub(repo_id=dataset_name, token=auth_token)
|
51 |
+
|
52 |
+
# Pulling from Initial Repo
|
53 |
+
dataset = load_dataset(dataset_name)
|
54 |
+
|
55 |
+
# Remove dummy data
|
56 |
+
del dataset['train']
|
57 |
+
return dataset
|
58 |
+
|
59 |
+
|
60 |
+
def merge_data(old_df: pd.DataFrame, new_df: pd.DataFrame) -> pd.DataFrame:
|
61 |
+
"""
|
62 |
+
Merges two dataframes, sorts them by 'date_utc', and marks new IDs.
|
63 |
+
|
64 |
+
The function first marks rows from the new dataframe, then concatenates the old and new dataframes.
|
65 |
+
It sorts the resulting dataframe by the 'date_utc' column. Rows from the new dataframe that are not
|
66 |
+
in the old dataframe are marked as 'new'.
|
67 |
+
|
68 |
+
Args:
|
69 |
+
- old_df (pd.DataFrame): The original dataframe.
|
70 |
+
- new_df (pd.DataFrame): The new dataframe to be merged with the original dataframe.
|
71 |
+
|
72 |
+
Returns:
|
73 |
+
- pd.DataFrame: The merged, sorted, and marked dataframe.
|
74 |
+
"""
|
75 |
+
|
76 |
+
# Mark rows in old and new dataframes
|
77 |
+
old_df['new'] = False
|
78 |
+
new_df['new'] = True
|
79 |
+
|
80 |
+
# Concatenate old and new dataframes, sort by 'date_utc', and reset index
|
81 |
+
df = pd.concat([old_df, new_df], ignore_index=True).sort_values(by='date_utc').reset_index(drop=True)
|
82 |
+
|
83 |
+
# Process data accordingly
|
84 |
+
df = data_processing(df)
|
85 |
+
|
86 |
+
# Identify new rows (present in new_df but not in old_df)
|
87 |
+
df['new'] = df['new'] & ~df['id'].duplicated(keep=False)
|
88 |
+
|
89 |
+
return df
|
90 |
+
|
91 |
+
|
92 |
+
def get_latest_data():
|
93 |
+
submissions = praw_downloader()
|
94 |
+
df = preprocess_praw_data(submissions=submissions)
|
95 |
+
return df
|