| """ |
| This class create a connection to Snowflake, run queries (read and write) |
| """ |
| import json |
|
|
| import numpy as np |
| import pandas as pd |
| from snowflake.snowpark import Session |
| from sympy.strategies.branch import condition |
|
|
|
|
| class SnowFlakeConn: |
| def __init__(self, session, brand): |
| self. session = session |
| self.brand = brand |
|
|
| self.final_columns = ['user_id', "email", "user_info", "permission", "expiration_date", "recsys_result", "message", "brand", "recommendation", "segment_name", "timestamp"] |
|
|
| self.campaign_id = { |
| "singeo": 460, |
| "pianote": 457, |
| "guitareo": 458, |
| "drumeo": 392 |
| } |
|
|
| |
| |
| def run_read_query(self, query, data): |
| """ |
| Executes a SQL query on Snowflake that fetch the data |
| :return: Pandas dataframe containing the query results |
| """ |
|
|
| |
| try: |
| dataframe = self.session.sql(query).to_pandas() |
| dataframe.columns = dataframe.columns.str.lower() |
| print(f"reading {data} table successfully") |
| return dataframe |
| except Exception as e: |
| print(f"Error in creating/updating table: {e}") |
|
|
| |
| |
| def is_json_parsed_to_collection(self, s): |
| try: |
| parsed = json.loads(s) |
| return isinstance(parsed, (dict, list)) |
| except: |
| return False |
| |
| |
| def store_df_to_snowflake(self, table_name, dataframe, database="ONLINE_RECSYS", schema="GENERATED_DATA"): |
| """ |
| Executes a SQL query on Snowflake that write the preprocessed data on new tables |
| :param query: SQL query string to be executed |
| :return: None |
| """ |
|
|
| try: |
| self.session.use_database(database) |
| self.session.use_schema(schema) |
|
|
| dataframe = dataframe.reset_index(drop=True) |
| dataframe.columns = dataframe.columns.str.upper() |
|
|
| self.session.write_pandas(df=dataframe, |
| table_name=table_name.strip().upper(), |
| auto_create_table=True, |
| overwrite=True, |
| use_logical_type=True) |
| print(f"Data inserted into {table_name} successfully.") |
|
|
| except Exception as e: |
| print(f"Error in creating/updating/inserting table: {e}") |
|
|
| |
| |
| def get_data(self, data, list_of_ids=None): |
| """ |
| valid Data is = {users, contents, interactions, recsys, popular_contents} |
| :param data: |
| :return: |
| """ |
| valid_data = {'users', 'contents', 'interactions', 'recsys', 'popular_contents'} |
|
|
| if data not in valid_data: |
| raise ValueError(f"Invalid data type: {data}") |
|
|
| |
| method_name = f"_get_{data}" |
|
|
| |
| method = getattr(self, method_name, None) |
| if method is None: |
| raise NotImplementedError(f"The method {method_name} is not implemented.") |
|
|
| query = method(list_of_ids) |
| data = self.run_read_query(query, data) |
|
|
| return data |
| |
| |
| def _get_contents(self, list_of_ids=None): |
| query = f""" |
| select CONTENT_ID, CONTENT_TYPE, CONTENT_PROFILE as content_info --, CONTENT_PROFILE_VECTOR |
| from ONLINE_RECSYS.VECTOR_DB.VECTORIZED_CONTENT |
| where BRAND = '{self.brand}' |
| """ |
| return query |
| |
| |
| def _get_users(self, list_of_ids=None): |
|
|
| if list_of_ids is not None: |
| ids_str = "(" + ", ".join(map(str, list_of_ids)) + ")" |
| condition = f"AND USER_ID in {ids_str}" |
| else : |
| condition = "" |
|
|
| query = f""" |
| select USER_ID, BRAND, FIRST_NAME, BIRTHDAY, TIMEZONE, EMAIL, CURRENT_TIMESTAMP() AS TIMESTAMP, DIFFICULTY, SELF_REPORT_DIFFICULTY, USER_PROFILE as user_info, PERMISSION, EXPIRATION_DATE, |
| DATEDIFF( |
| day, |
| CURRENT_DATE(), |
| CASE |
| WHEN DATE_FROM_PARTS(YEAR(CURRENT_DATE()), EXTRACT(MONTH FROM BIRTHDAY), EXTRACT(DAY FROM BIRTHDAY)) < CURRENT_DATE() |
| THEN DATE_FROM_PARTS(YEAR(CURRENT_DATE()) + 1, EXTRACT(MONTH FROM BIRTHDAY), EXTRACT(DAY FROM BIRTHDAY)) |
| ELSE DATE_FROM_PARTS(YEAR(CURRENT_DATE()), EXTRACT(MONTH FROM BIRTHDAY), EXTRACT(DAY FROM BIRTHDAY)) |
| END) AS birthday_reminder |
| from ONLINE_RECSYS.PREPROCESSED.USERS |
| where BRAND = '{self.brand}' {condition} |
| """ |
| return query |
| |
| |
| def _get_interactions(self, list_of_ids=None): |
|
|
| if list_of_ids is not None: |
| ids_str = "(" + ", ".join(map(str, list_of_ids)) + ")" |
| condition = f"AND USER_ID in {ids_str}" |
| else : |
| condition = "" |
|
|
| query = f""" |
| WITH latest_interactions AS( |
| SELECT |
| USER_ID, CONTENT_ID, CONTENT_TYPE, EVENT_TEXT, TIMESTAMP, |
| ROW_NUMBER() OVER(PARTITION BY USER_ID ORDER BY TIMESTAMP DESC) AS rn |
| FROM ONLINE_RECSYS.PREPROCESSED.RECSYS_INTEACTIONS |
| WHERE BRAND = '{self.brand}' AND EVENT_TEXT IN('Video Completed', 'Video Playing') {condition}) |
| |
| SELECT i.USER_ID, i.CONTENT_ID, i.CONTENT_TYPE, c.content_profile as last_completed_content, i.EVENT_TEXT, i.TIMESTAMP, DATEDIFF('week', i.TIMESTAMP, CURRENT_TIMESTAMP) AS weeks_since_last_interaction |
| FROM latest_interactions i |
| LEFT JOIN |
| ONLINE_RECSYS.VECTOR_DB.VECTORIZED_CONTENT c ON c.CONTENT_ID = i.CONTENT_ID |
| WHERE rn = 1; |
| """ |
| return query |
| |
| |
| def _get_recsys(self, list_of_ids=None): |
|
|
| if list_of_ids is not None: |
| ids_str = "(" + ", ".join(map(str, list_of_ids)) + ")" |
| condition = f"WHERE USER_ID in {ids_str}" |
| else : |
| condition = "" |
|
|
| recsys_col = f"{self.brand}_recsys_v3" |
| query = f""" |
| select USER_ID, {recsys_col} as recsys_result |
| from RECSYS_V3.RECSYS_CIO.RECSYS_V3_CUSTOMER_IO_OLD |
| {condition} |
| """ |
| return query |
| |
| |
| def _get_popular_contents(self, list_of_ids=None): |
|
|
| query = f""" |
| select POPULAR_CONTENT |
| from RECSYS_V3.RECSYS_CIO.POPULAR_CONTENT_CUSTOMER_IO_OLD |
| where brand = '{self.brand.lower()}' |
| """ |
|
|
| return query |
| |
| |
| def extract_id_from_email(self, emails): |
| """ |
| extracting user_ids from emails |
| :param unique_emails: |
| :return: |
| """ |
|
|
| email_list_str = ', '.join(f"'{email}'" for email in emails) |
| query = f""" |
| SELECT id as USER_ID, email as EMAIL |
| FROM STITCH.MUSORA_ECOM_DB.USORA_USERS |
| WHERE email IN ({email_list_str}) |
| """ |
|
|
| user_ids_df = self.run_read_query(query, data="User_ids") |
| return user_ids_df |
| |
| |
|
|
| def adjust_dataframe(self, dataframe): |
| """ |
| Filter dataframe to only include the columns in self.final_columns. |
| Add any missing columns with None values. |
| Ensure the final order is consistent with self.final_columns. |
| """ |
| |
| final_df = dataframe.copy() |
|
|
| |
| final_df.columns = final_df.columns.str.lower() |
| expected_cols = [col.lower() for col in self.final_columns] |
|
|
| |
| available = [col for col in final_df.columns if col in expected_cols] |
| final_df = final_df[available] |
|
|
| |
| for col in expected_cols: |
| if col not in final_df.columns: |
| final_df[col] = None |
|
|
| |
| final_df = final_df[expected_cols] |
|
|
| |
| |
| rename_mapping = {col.lower(): col for col in self.final_columns} |
| final_df.rename(columns=rename_mapping, inplace=True) |
|
|
| return final_df |
|
|
| |
| |
| def close_connection(self): |
| self.session.close() |
|
|
| def get_inactive_users_by_brand(self, brand): |
| if brand == "singeo": |
| return self.get_users_in_campaign(brand=brand) |
| else: |
| return self._get_inactive_users(brand=brand) |
|
|
| |
| def get_users_in_campaign(self, brand, campaign_name="Singeo - Inactive Members (for 3 days) - Re-engagement", stage=1, campaign_view='singeo_re_engagement'): |
| """ |
| creating a query to fetch requested users |
| :param campaign_view: |
| :param stage: |
| :param campaign_name: |
| :param brand: |
| :return: |
| """ |
| |
|
|
| if stage == 1: |
| query = f""" |
| WITH eligible_users AS ( |
| SELECT |
| s.USER_ID, |
| s.EMAIL, |
| s.BRAND, |
| s.CAMPAIGN_NAME |
| FROM MESSAGING_SYSTEM.CAMPAIGNS.{campaign_view} s |
| WHERE s.CAMPAIGN_NAME = '{campaign_name}' |
| AND s.BRAND = '{brand}' |
| ), |
| msgs AS ( |
| -- Only messages from the same brand/campaign |
| SELECT |
| m.EMAIL, |
| m.BRAND, |
| m.CAMPAIGN_NAME, |
| m.STAGE, |
| m.TIMESTAMP |
| FROM MESSAGING_SYSTEM.GENERATED_DATA.INITIAL_MESSAGES m |
| WHERE m.CAMPAIGN_NAME = '{campaign_name}' |
| AND m.BRAND = '{brand}' |
| ), |
| no_stage1 AS ( |
| -- Users who do not already have stage 1 |
| SELECT e.* |
| FROM eligible_users e |
| LEFT JOIN msgs m1 |
| ON m1.EMAIL = e.EMAIL |
| AND m1.STAGE = 1 |
| WHERE m1.EMAIL IS NULL |
| ) |
| SELECT |
| n.EMAIL, |
| FROM no_stage1 n |
| ; |
| """ |
|
|
| else: |
| query = f""" |
| WITH eligible_users AS ( |
| SELECT |
| s.USER_ID, |
| s.EMAIL, |
| s.BRAND, |
| s.CAMPAIGN_NAME |
| FROM MESSAGING_SYSTEM.CAMPAIGNS.{campaign_view} s |
| WHERE s.CAMPAIGN_NAME = '{campaign_name}' |
| AND s.BRAND = '{brand}' |
| ), |
| msgs AS ( |
| -- Only messages from the same brand/campaign |
| SELECT |
| m.EMAIL, |
| m.BRAND, |
| m.CAMPAIGN_NAME, |
| m.STAGE, |
| m.TIMESTAMP |
| FROM MESSAGING_SYSTEM.GENERATED_DATA.INITIAL_MESSAGES m |
| WHERE m.CAMPAIGN_NAME = '{campaign_name}' |
| AND m.BRAND = '{brand}' |
| ), |
| has_prev AS ( |
| -- Users who have the immediately previous stage (N-1) |
| SELECT e.* |
| FROM eligible_users e |
| JOIN msgs mp |
| ON mp.EMAIL = e.EMAIL |
| AND mp.STAGE = {stage} - 1 |
| ), |
| no_current_stage AS ( |
| -- Exclude users who already have the current stage (N) |
| SELECT e.* |
| FROM has_prev e |
| LEFT JOIN msgs mn |
| ON mn.EMAIL = e.EMAIL |
| AND mn.STAGE = {stage} |
| WHERE mn.EMAIL IS NULL |
| ), |
| latest_msg AS ( |
| -- Most-recent message timestamp per user (within this brand/campaign) |
| SELECT |
| EMAIL, |
| MAX(TIMESTAMP) AS LAST_TS |
| FROM msgs |
| GROUP BY EMAIL |
| ) |
| SELECT |
| n.EMAIL, |
| FROM no_current_stage n |
| JOIN latest_msg l |
| ON l.EMAIL = n.EMAIL |
| -- Ensure no messages for this user in this brand/campaign in the last 36 hours |
| --WHERE l.LAST_TS < DATEADD(HOUR, -36, CURRENT_TIMESTAMP()) |
| ; |
| """ |
|
|
| users_df = self.run_read_query(query, data=f"{brand}_campaign") |
| return users_df |
|
|
| def _get_inactive_users(self, brand: str): |
| """ |
| Return up to 50 USER_IDs in the requested brand whose last interaction is > 5 days ago, |
| restricted to users that exist in ONLINE_RECSYS.PREPROCESSED.USERS. |
| """ |
|
|
| path = f"Data/{brand.lower()}_test.csv" |
| users_df = pd.read_csv(path) |
| return users_df |
|
|
|
|
|
|