| """ |
| Data loader module for Sentiment Analysis Visualization |
| Handles Snowflake connection and data loading with caching |
| """ |
| import sys |
| import os |
| import re |
| import pandas as pd |
| import numpy as np |
| import streamlit as st |
| from pathlib import Path |
| import json |
| from datetime import datetime, timedelta |
| from dateutil.relativedelta import relativedelta |
|
|
| |
| parent_dir = Path(__file__).resolve().parent.parent.parent |
| sys.path.append(str(parent_dir)) |
|
|
| from visualization.SnowFlakeConnection import SnowFlakeConn |
|
|
|
|
| class SentimentDataLoader: |
| """ |
| Loads sentiment analysis data from Snowflake with caching. |
| |
| Three data loading modes: |
| - load_dashboard_data() : lightweight (no text), cached 24h |
| - load_sa_data(...) : top-N content stats + sampled comments, on-demand |
| - load_reply_required_data() : reply-queue comments with text, on-demand |
| """ |
|
|
| def __init__(self, config_path=None): |
| if config_path is None: |
| config_path = Path(__file__).parent.parent / "config" / "viz_config.json" |
|
|
| with open(config_path, 'r') as f: |
| self.config = json.load(f) |
|
|
| self.query = self.config['snowflake']['query'] |
| self.dashboard_query = self.config['snowflake'].get('dashboard_query', self.query) |
| self.demographics_query = self.config['snowflake'].get('demographics_query', None) |
|
|
| |
| |
| |
|
|
| @st.cache_data(ttl=86400) |
| def load_dashboard_data(_self): |
| """ |
| Load lightweight dashboard data from Snowflake (no text columns). |
| Includes demographics merge if demographics_query is configured. |
| |
| Returns: |
| pd.DataFrame |
| """ |
| try: |
| conn = SnowFlakeConn() |
| df = conn.run_read_query(_self.dashboard_query, "dashboard data") |
| conn.close_connection() |
|
|
| if df is None or df.empty: |
| st.error("No dashboard data returned from Snowflake") |
| return pd.DataFrame() |
|
|
| df = _self._process_dashboard_dataframe(df) |
|
|
| if _self.demographics_query: |
| demographics_df = _self.load_demographics_data() |
| df = _self.merge_demographics_with_comments(df, demographics_df) |
|
|
| return df |
|
|
| except Exception as e: |
| st.error(f"Error loading dashboard data from Snowflake: {e}") |
| return pd.DataFrame() |
|
|
| def _process_dashboard_dataframe(self, df): |
| """Process lightweight dashboard dataframe (no text columns).""" |
| df.columns = df.columns.str.lower() |
|
|
| if 'comment_timestamp' in df.columns: |
| df['comment_timestamp'] = pd.to_datetime(df['comment_timestamp'], errors='coerce') |
|
|
| if 'processed_at' in df.columns: |
| df['processed_at'] = pd.to_datetime(df['processed_at'], errors='coerce') |
|
|
| df['sentiment_polarity'] = df['sentiment_polarity'].fillna('unknown') |
| df['intent'] = df['intent'].fillna('unknown') |
| df['platform'] = df['platform'].fillna('unknown').str.lower() |
| df['brand'] = df['brand'].fillna('unknown').str.lower() |
|
|
| if 'requires_reply' in df.columns: |
| df['requires_reply'] = df['requires_reply'].astype(bool) |
|
|
| return df |
|
|
| |
| |
| |
|
|
| @st.cache_data(ttl=86400) |
| def load_data(_self, reload=False): |
| """ |
| Load full sentiment data (with text). Kept for compatibility. |
| Prefer load_dashboard_data() for dashboard views. |
| """ |
| try: |
| conn = SnowFlakeConn() |
| df = conn.run_read_query(_self.query, "sentiment features") |
| conn.close_connection() |
|
|
| if df is None or df.empty: |
| st.error("No data returned from Snowflake") |
| return pd.DataFrame() |
|
|
| df = _self._process_dataframe(df) |
|
|
| if _self.demographics_query: |
| demographics_df = _self.load_demographics_data() |
| df = _self.merge_demographics_with_comments(df, demographics_df) |
|
|
| return df |
|
|
| except Exception as e: |
| st.error(f"Error loading data from Snowflake: {e}") |
| return pd.DataFrame() |
|
|
| def _process_dataframe(self, df): |
| """Process full dataframe including vectorized display_text computation.""" |
| df.columns = df.columns.str.lower() |
|
|
| if 'comment_timestamp' in df.columns: |
| df['comment_timestamp'] = pd.to_datetime(df['comment_timestamp'], errors='coerce') |
|
|
| if 'processed_at' in df.columns: |
| df['processed_at'] = pd.to_datetime(df['processed_at'], errors='coerce') |
|
|
| df['sentiment_polarity'] = df['sentiment_polarity'].fillna('unknown') |
| df['intent'] = df['intent'].fillna('unknown') |
| df['platform'] = df['platform'].fillna('unknown').str.lower() |
| df['brand'] = df['brand'].fillna('unknown').str.lower() |
|
|
| if 'requires_reply' in df.columns: |
| df['requires_reply'] = df['requires_reply'].astype(bool) |
|
|
| |
| if 'translated_text' in df.columns and 'is_english' in df.columns: |
| mask_translate = (df['is_english'] == False) & df['translated_text'].notna() |
| df['display_text'] = df.get('original_text', pd.Series('', index=df.index)).fillna('') |
| df.loc[mask_translate, 'display_text'] = df.loc[mask_translate, 'translated_text'] |
| elif 'original_text' in df.columns: |
| df['display_text'] = df['original_text'].fillna('') |
| else: |
| df['display_text'] = '' |
|
|
| |
| text = df['display_text'].astype(str) |
| df['display_text_short'] = text.where(text.str.len() <= 100, text.str[:100] + '...') |
|
|
| return df |
|
|
| |
| |
| |
|
|
| def load_sa_data(self, platform, brand, top_n=10, min_comments=10, |
| sort_by='severity_score', sentiments=None, intents=None, |
| date_range=None): |
| """ |
| Load Sentiment Analysis page data: |
| 1. Content aggregation stats for top-N contents |
| 2. Sampled comments (up to 50 neg + 50 pos + 50 other per content) |
| |
| Args: |
| platform: Selected platform string |
| brand: Selected brand string |
| top_n: Max number of contents to return |
| min_comments: Minimum comment threshold for inclusion |
| sort_by: 'severity_score' | 'sentiment_percentage' | 'sentiment_count' | 'total_comments' |
| sentiments: List of sentiments to filter by (dominant_sentiment) |
| intents: List of intents to filter by |
| date_range: Tuple (start_date, end_date) or None |
| |
| Returns: |
| tuple: (contents_df, comments_df) |
| """ |
| sentiments_key = tuple(sorted(sentiments)) if sentiments else () |
| intents_key = tuple(sorted(intents)) if intents else () |
| date_key = (str(date_range[0]), str(date_range[1])) if date_range and len(date_range) == 2 else () |
|
|
| return self._fetch_sa_data( |
| platform, brand, top_n, min_comments, sort_by, |
| sentiments_key, intents_key, date_key |
| ) |
|
|
| @st.cache_data(ttl=86400) |
| def _fetch_sa_data(_self, platform, brand, top_n, min_comments, sort_by, |
| sentiments, intents, date_range): |
| """Cached SA data fetch β returns (contents_df, comments_df).""" |
| try: |
| conn = SnowFlakeConn() |
|
|
| |
| content_query = _self._build_sa_content_query( |
| platform, brand, min_comments, sort_by, date_range |
| ) |
| contents_df = conn.run_read_query(content_query, "SA content aggregation") |
|
|
| if contents_df is None or contents_df.empty: |
| conn.close_connection() |
| return pd.DataFrame(), pd.DataFrame() |
|
|
| |
| contents_df = _self._process_sa_content_stats(contents_df) |
|
|
| |
| if sentiments: |
| contents_df = contents_df[contents_df['dominant_sentiment'].isin(sentiments)] |
|
|
| |
| contents_df = contents_df.head(top_n) |
|
|
| if contents_df.empty: |
| conn.close_connection() |
| return pd.DataFrame(), pd.DataFrame() |
|
|
| |
| content_sk_list = contents_df['content_sk'].tolist() |
| comments_query = _self._build_sa_comments_query( |
| platform, brand, content_sk_list, date_range |
| ) |
| comments_df = conn.run_read_query(comments_query, "SA sampled comments") |
| conn.close_connection() |
|
|
| if comments_df is not None and not comments_df.empty: |
| comments_df = _self._process_sa_comments(comments_df) |
|
|
| |
| |
| if intents: |
| pattern = '|'.join(re.escape(i) for i in intents) |
| valid_sks = comments_df[ |
| comments_df['intent'].str.contains(pattern, na=False, case=False) |
| ]['content_sk'].unique() |
| contents_df = contents_df[contents_df['content_sk'].isin(valid_sks)] |
| comments_df = comments_df[comments_df['content_sk'].isin(valid_sks)] |
| else: |
| comments_df = pd.DataFrame() |
|
|
| return contents_df, comments_df |
|
|
| except Exception as e: |
| st.error(f"Error loading SA data: {e}") |
| return pd.DataFrame(), pd.DataFrame() |
|
|
| def _build_sa_content_query(self, platform, brand, min_comments, sort_by, date_range): |
| """Build dynamic SQL for content-level aggregation (no text columns).""" |
| |
| social_date_clause = self._build_date_clause(date_range, table_alias='s') |
| musora_date_clause = self._build_date_clause(date_range) |
|
|
| safe_brand = self._sanitize_value(brand.lower()) |
| safe_platform = self._sanitize_value(platform.lower()) |
|
|
| sort_exprs = { |
| 'severity_score': ( |
| "(SUM(CASE WHEN SENTIMENT_POLARITY IN ('negative','very_negative') THEN 1 ELSE 0 END)" |
| " * 100.0 / COUNT(*)) * SQRT(COUNT(*))" |
| ), |
| 'sentiment_percentage': ( |
| "SUM(CASE WHEN SENTIMENT_POLARITY IN ('negative','very_negative') THEN 1 ELSE 0 END)" |
| " * 100.0 / COUNT(*)" |
| ), |
| 'sentiment_count': ( |
| "SUM(CASE WHEN SENTIMENT_POLARITY IN ('negative','very_negative') THEN 1 ELSE 0 END)" |
| ), |
| 'total_comments': "COUNT(*)", |
| } |
| sort_expr = sort_exprs.get(sort_by, sort_exprs['severity_score']) |
|
|
| parts = [] |
|
|
| if platform != 'musora_app': |
| parts.append(f""" |
| SELECT |
| s.COMMENT_SK, s.CONTENT_SK, s.CONTENT_DESCRIPTION, |
| c.PERMALINK_URL, CAST(NULL AS VARCHAR) AS THUMBNAIL_URL, |
| s.SENTIMENT_POLARITY, s.INTENT, s.REQUIRES_REPLY, s.COMMENT_TIMESTAMP |
| FROM SOCIAL_MEDIA_DB.ML_FEATURES.COMMENT_SENTIMENT_FEATURES s |
| LEFT JOIN SOCIAL_MEDIA_DB.CORE.DIM_CONTENT c ON s.CONTENT_SK = c.CONTENT_SK |
| WHERE LOWER(s.CHANNEL_NAME) = '{safe_brand}' |
| AND LOWER(s.PLATFORM) = '{safe_platform}' |
| {social_date_clause} |
| """) |
|
|
| if platform == 'musora_app': |
| parts.append(f""" |
| SELECT |
| COMMENT_SK, CONTENT_SK, CONTENT_DESCRIPTION, |
| PERMALINK_URL, THUMBNAIL_URL, |
| SENTIMENT_POLARITY, INTENT, REQUIRES_REPLY, COMMENT_TIMESTAMP |
| FROM SOCIAL_MEDIA_DB.ML_FEATURES.MUSORA_COMMENT_SENTIMENT_FEATURES |
| WHERE LOWER(CHANNEL_NAME) = '{safe_brand}' |
| {musora_date_clause} |
| """) |
|
|
| combined = " UNION ALL ".join(parts) |
|
|
| return f""" |
| WITH combined AS ({combined}) |
| SELECT |
| CONTENT_SK, |
| MAX(CONTENT_DESCRIPTION) AS CONTENT_DESCRIPTION, |
| MAX(PERMALINK_URL) AS PERMALINK_URL, |
| MAX(THUMBNAIL_URL) AS THUMBNAIL_URL, |
| COUNT(*) AS TOTAL_COMMENTS, |
| SUM(CASE WHEN REQUIRES_REPLY THEN 1 ELSE 0 END) AS REPLY_REQUIRED_COUNT, |
| SUM(CASE WHEN SENTIMENT_POLARITY = 'very_negative' THEN 1 ELSE 0 END) AS VERY_NEGATIVE_COUNT, |
| SUM(CASE WHEN SENTIMENT_POLARITY = 'negative' THEN 1 ELSE 0 END) AS NEGATIVE_COUNT_RAW, |
| SUM(CASE WHEN SENTIMENT_POLARITY = 'neutral' THEN 1 ELSE 0 END) AS NEUTRAL_COUNT, |
| SUM(CASE WHEN SENTIMENT_POLARITY = 'positive' THEN 1 ELSE 0 END) AS POSITIVE_COUNT_RAW, |
| SUM(CASE WHEN SENTIMENT_POLARITY = 'very_positive' THEN 1 ELSE 0 END) AS VERY_POSITIVE_COUNT |
| FROM combined |
| GROUP BY CONTENT_SK |
| HAVING COUNT(*) >= {int(min_comments)} |
| ORDER BY {sort_expr} DESC |
| """ |
|
|
| def _process_sa_content_stats(self, df): |
| """ |
| Derive all columns expected by the existing SA page UI from the |
| raw content-aggregation result. |
| """ |
| df['negative_count'] = df['very_negative_count'] + df['negative_count_raw'] |
| df['positive_count'] = df['positive_count_raw'] + df['very_positive_count'] |
|
|
| df['negative_percentage'] = ( |
| df['negative_count'] / df['total_comments'] * 100 |
| ).round(2) |
| df['positive_percentage'] = ( |
| df['positive_count'] / df['total_comments'] * 100 |
| ).round(2) |
|
|
| df['severity_score'] = ( |
| df['negative_percentage'] * (df['total_comments'] ** 0.5) |
| ).round(2) |
|
|
| |
| df['dynamic_severity_score'] = df['severity_score'] |
| df['selected_sentiment_count'] = df['negative_count'] |
| df['selected_sentiment_percentage'] = df['negative_percentage'] |
|
|
| |
| sentiment_cols = pd.DataFrame({ |
| 'very_negative': df['very_negative_count'], |
| 'negative': df['negative_count_raw'], |
| 'neutral': df['neutral_count'], |
| 'positive': df['positive_count_raw'], |
| 'very_positive': df['very_positive_count'], |
| }) |
| df['dominant_sentiment'] = sentiment_cols.idxmax(axis=1) |
|
|
| return df |
|
|
| def _build_sa_comments_query(self, platform, brand, content_sk_list, date_range): |
| """ |
| Build SQL for sampled comments for a list of content_sks. |
| Samples up to 50 per (content_sk, sentiment_group) β neg, pos, other. |
| display_text is computed in SQL (no need to fetch both original + translated). |
| """ |
| |
| social_date_clause = self._build_date_clause(date_range, table_alias='s') |
| musora_date_clause = self._build_date_clause(date_range) |
| safe_brand = self._sanitize_value(brand.lower()) |
| content_sks_str = ", ".join(f"'{self._sanitize_value(str(sk))}'" for sk in content_sk_list) |
|
|
| parts = [] |
|
|
| if platform != 'musora_app': |
| parts.append(f""" |
| SELECT |
| s.COMMENT_SK, s.COMMENT_ID, s.CONTENT_SK, s.CONTENT_DESCRIPTION, |
| CASE WHEN s.IS_ENGLISH = FALSE AND s.TRANSLATED_TEXT IS NOT NULL |
| THEN s.TRANSLATED_TEXT ELSE s.ORIGINAL_TEXT END AS DISPLAY_TEXT, |
| s.ORIGINAL_TEXT, |
| LOWER(s.PLATFORM) AS PLATFORM, |
| LOWER(s.CHANNEL_NAME) AS BRAND, |
| s.COMMENT_TIMESTAMP, s.AUTHOR_NAME, |
| s.DETECTED_LANGUAGE, s.SENTIMENT_POLARITY, s.INTENT, |
| s.REQUIRES_REPLY, s.SENTIMENT_CONFIDENCE, s.IS_ENGLISH, |
| c.PERMALINK_URL |
| FROM SOCIAL_MEDIA_DB.ML_FEATURES.COMMENT_SENTIMENT_FEATURES s |
| LEFT JOIN SOCIAL_MEDIA_DB.CORE.DIM_CONTENT c ON s.CONTENT_SK = c.CONTENT_SK |
| WHERE s.CONTENT_SK IN ({content_sks_str}) |
| AND LOWER(s.CHANNEL_NAME) = '{safe_brand}' |
| {social_date_clause} |
| """) |
|
|
| if platform == 'musora_app': |
| parts.append(f""" |
| SELECT |
| COMMENT_SK, COMMENT_ID, CONTENT_SK, CONTENT_DESCRIPTION, |
| CASE WHEN IS_ENGLISH = FALSE AND TRANSLATED_TEXT IS NOT NULL |
| THEN TRANSLATED_TEXT ELSE ORIGINAL_TEXT END AS DISPLAY_TEXT, |
| ORIGINAL_TEXT, |
| 'musora_app' AS PLATFORM, |
| LOWER(CHANNEL_NAME) AS BRAND, |
| COMMENT_TIMESTAMP, AUTHOR_NAME, |
| DETECTED_LANGUAGE, SENTIMENT_POLARITY, INTENT, |
| REQUIRES_REPLY, SENTIMENT_CONFIDENCE, IS_ENGLISH, |
| PERMALINK_URL |
| FROM SOCIAL_MEDIA_DB.ML_FEATURES.MUSORA_COMMENT_SENTIMENT_FEATURES |
| WHERE CONTENT_SK IN ({content_sks_str}) |
| AND LOWER(CHANNEL_NAME) = '{safe_brand}' |
| {musora_date_clause} |
| """) |
|
|
| combined = " UNION ALL ".join(parts) |
|
|
| return f""" |
| WITH combined AS ({combined}) |
| SELECT * |
| FROM combined |
| QUALIFY ROW_NUMBER() OVER ( |
| PARTITION BY CONTENT_SK, |
| CASE |
| WHEN SENTIMENT_POLARITY IN ('negative', 'very_negative') THEN 'neg' |
| WHEN SENTIMENT_POLARITY IN ('positive', 'very_positive') THEN 'pos' |
| ELSE 'other' |
| END |
| ORDER BY |
| CASE SENTIMENT_POLARITY |
| WHEN 'very_negative' THEN 1 WHEN 'negative' THEN 2 |
| WHEN 'very_positive' THEN 1 WHEN 'positive' THEN 2 |
| ELSE 3 |
| END, |
| RANDOM() |
| ) <= 50 |
| """ |
|
|
| def _process_sa_comments(self, df): |
| """Process sampled comments dataframe for the SA page.""" |
| if 'comment_timestamp' in df.columns: |
| df['comment_timestamp'] = pd.to_datetime(df['comment_timestamp'], errors='coerce') |
|
|
| df['sentiment_polarity'] = df['sentiment_polarity'].fillna('unknown') |
| df['intent'] = df['intent'].fillna('unknown') |
| df['platform'] = df['platform'].fillna('unknown').str.lower() |
|
|
| if 'requires_reply' in df.columns: |
| df['requires_reply'] = df['requires_reply'].astype(bool) |
|
|
| |
| if 'display_text' in df.columns: |
| text = df['display_text'].astype(str) |
| df['display_text_short'] = text.where( |
| text.str.len() <= 100, text.str[:100] + '...' |
| ) |
|
|
| return df |
|
|
| |
| |
| |
|
|
| def load_reply_required_data(self, platforms=None, brands=None, date_range=None): |
| """ |
| Load comments requiring reply, filtered by platform/brand/date. |
| |
| Args: |
| platforms: List of platform strings (or None for all) |
| brands: List of brand strings (or None for all) |
| date_range: Tuple (start_date, end_date) or None |
| |
| Returns: |
| pd.DataFrame |
| """ |
| platforms_key = tuple(sorted(platforms)) if platforms else () |
| brands_key = tuple(sorted(brands)) if brands else () |
| date_key = (str(date_range[0]), str(date_range[1])) if date_range and len(date_range) == 2 else () |
|
|
| return self._fetch_rr_data(platforms_key, brands_key, date_key) |
|
|
| @st.cache_data(ttl=86400) |
| def _fetch_rr_data(_self, platforms, brands, date_range): |
| """Cached Reply Required data fetch.""" |
| try: |
| query = _self._build_rr_query(platforms, brands, date_range) |
| if not query: |
| return pd.DataFrame() |
|
|
| conn = SnowFlakeConn() |
| df = conn.run_read_query(query, "reply required comments") |
| conn.close_connection() |
|
|
| if df is None or df.empty: |
| return pd.DataFrame() |
|
|
| |
| if 'comment_timestamp' in df.columns: |
| df['comment_timestamp'] = pd.to_datetime(df['comment_timestamp'], errors='coerce') |
|
|
| df['sentiment_polarity'] = df['sentiment_polarity'].fillna('unknown') |
| df['intent'] = df['intent'].fillna('unknown') |
| df['platform'] = df['platform'].fillna('unknown').str.lower() |
| df['brand'] = df['brand'].fillna('unknown').str.lower() |
|
|
| if 'requires_reply' in df.columns: |
| df['requires_reply'] = df['requires_reply'].astype(bool) |
|
|
| if 'comment_timestamp' in df.columns: |
| df = df.sort_values('comment_timestamp', ascending=False) |
|
|
| |
| if 'display_text' in df.columns: |
| text = df['display_text'].astype(str) |
| df['display_text_short'] = text.where( |
| text.str.len() <= 100, text.str[:100] + '...' |
| ) |
|
|
| return df |
|
|
| except Exception as e: |
| st.error(f"Error loading reply required data: {e}") |
| return pd.DataFrame() |
|
|
| def _build_rr_query(self, platforms, brands, date_range): |
| """Build dynamic SQL for the Reply Required page.""" |
| |
| |
|
|
| |
| social_date_clause = "" |
| musora_date_clause = "" |
| if date_range and len(date_range) == 2: |
| social_date_clause = ( |
| f"AND s.COMMENT_TIMESTAMP >= '{date_range[0]}'" |
| f" AND s.COMMENT_TIMESTAMP <= '{date_range[1]}'" |
| ) |
| musora_date_clause = ( |
| f"AND COMMENT_TIMESTAMP >= '{date_range[0]}'" |
| f" AND COMMENT_TIMESTAMP <= '{date_range[1]}'" |
| ) |
|
|
| |
| social_brand_clause = "" |
| musora_brand_clause = "" |
| if brands: |
| brands_str = "', '".join(self._sanitize_value(b.lower()) for b in brands) |
| social_brand_clause = f"AND LOWER(s.CHANNEL_NAME) IN ('{brands_str}')" |
| musora_brand_clause = f"AND LOWER(CHANNEL_NAME) IN ('{brands_str}')" |
|
|
| |
| include_social = True |
| include_musora = True |
| social_platform_clause = "" |
|
|
| if platforms: |
| non_musora = [p for p in platforms if p != 'musora_app'] |
| include_musora = 'musora_app' in platforms |
| include_social = len(non_musora) > 0 |
| if non_musora: |
| plat_str = "', '".join(self._sanitize_value(p.lower()) for p in non_musora) |
| social_platform_clause = f"AND LOWER(s.PLATFORM) IN ('{plat_str}')" |
|
|
| if not include_social and not include_musora: |
| return None |
|
|
| parts = [] |
|
|
| if include_social: |
| parts.append(f""" |
| SELECT |
| s.COMMENT_SK, s.COMMENT_ID, s.CONTENT_SK, s.CONTENT_DESCRIPTION, |
| CASE WHEN s.IS_ENGLISH = FALSE AND s.TRANSLATED_TEXT IS NOT NULL |
| THEN s.TRANSLATED_TEXT ELSE s.ORIGINAL_TEXT END AS DISPLAY_TEXT, |
| s.ORIGINAL_TEXT, |
| LOWER(s.PLATFORM) AS PLATFORM, |
| LOWER(s.CHANNEL_NAME) AS BRAND, |
| s.COMMENT_TIMESTAMP, s.AUTHOR_NAME, |
| s.DETECTED_LANGUAGE, s.SENTIMENT_POLARITY, s.INTENT, |
| s.REQUIRES_REPLY, s.SENTIMENT_CONFIDENCE, s.IS_ENGLISH, |
| c.PERMALINK_URL |
| FROM SOCIAL_MEDIA_DB.ML_FEATURES.COMMENT_SENTIMENT_FEATURES s |
| LEFT JOIN SOCIAL_MEDIA_DB.CORE.DIM_CONTENT c ON s.CONTENT_SK = c.CONTENT_SK |
| WHERE s.REQUIRES_REPLY = TRUE |
| {social_platform_clause} |
| {social_brand_clause} |
| {social_date_clause} |
| """) |
|
|
| if include_musora: |
| parts.append(f""" |
| SELECT |
| COMMENT_SK, COMMENT_ID, CONTENT_SK, CONTENT_DESCRIPTION, |
| CASE WHEN IS_ENGLISH = FALSE AND TRANSLATED_TEXT IS NOT NULL |
| THEN TRANSLATED_TEXT ELSE ORIGINAL_TEXT END AS DISPLAY_TEXT, |
| ORIGINAL_TEXT, |
| 'musora_app' AS PLATFORM, |
| LOWER(CHANNEL_NAME) AS BRAND, |
| COMMENT_TIMESTAMP, AUTHOR_NAME, |
| DETECTED_LANGUAGE, SENTIMENT_POLARITY, INTENT, |
| REQUIRES_REPLY, SENTIMENT_CONFIDENCE, IS_ENGLISH, |
| PERMALINK_URL |
| FROM SOCIAL_MEDIA_DB.ML_FEATURES.MUSORA_COMMENT_SENTIMENT_FEATURES |
| WHERE REQUIRES_REPLY = TRUE |
| {musora_brand_clause} |
| {musora_date_clause} |
| """) |
|
|
| combined = " UNION ALL ".join(parts) |
| return f""" |
| WITH combined AS ({combined}) |
| SELECT * FROM combined |
| ORDER BY COMMENT_TIMESTAMP DESC |
| """ |
|
|
| |
| |
| |
|
|
| @st.cache_data(ttl=86400) |
| def load_demographics_data(_self): |
| """Load user demographic data from Snowflake.""" |
| if not _self.demographics_query: |
| return pd.DataFrame() |
|
|
| try: |
| conn = SnowFlakeConn() |
| query_with_cast = _self.demographics_query.replace( |
| "u.birthday as BIRTHDAY", |
| "TO_VARCHAR(u.birthday, 'YYYY-MM-DD HH24:MI:SS.FF6 TZHTZM') as BIRTHDAY" |
| ) |
| df = conn.run_read_query(query_with_cast, "user demographics") |
| conn.close_connection() |
|
|
| if df is None or df.empty: |
| return pd.DataFrame() |
|
|
| return _self._process_demographics_dataframe(df) |
|
|
| except Exception as e: |
| st.warning(f"Could not load demographic data: {str(e)}") |
| return pd.DataFrame() |
|
|
| def _process_demographics_dataframe(self, df): |
| """Process and enrich demographic dataframe.""" |
| df.columns = df.columns.str.lower() |
|
|
| if 'birthday' in df.columns: |
| df['birthday'] = df['birthday'].astype(str) |
| df['birthday'] = pd.to_datetime(df['birthday'], errors='coerce', utc=True) |
| df['birthday'] = df['birthday'].dt.tz_localize(None) |
| df['age'] = df['birthday'].apply(self._calculate_age) |
| df['age_group'] = df['age'].apply(self._categorize_age) |
|
|
| if 'timezone' in df.columns: |
| df['timezone_region'] = df['timezone'].apply(self._extract_timezone_region) |
|
|
| if 'experience_level' in df.columns: |
| df['experience_group'] = df['experience_level'].apply(self._categorize_experience) |
|
|
| if 'user_id' in df.columns: |
| df = df[df['user_id'].notna()] |
|
|
| return df |
|
|
| @staticmethod |
| def _calculate_age(birthday): |
| if pd.isna(birthday): |
| return None |
| try: |
| age = relativedelta(datetime.now(), birthday).years |
| return age if 0 <= age <= 120 else None |
| except Exception: |
| return None |
|
|
| def _categorize_age(self, age): |
| if pd.isna(age) or age is None: |
| return 'Unknown' |
| for group_name, (min_age, max_age) in self.config.get('demographics', {}).get('age_groups', {}).items(): |
| if min_age <= age <= max_age: |
| return group_name |
| return 'Unknown' |
|
|
| @staticmethod |
| def _extract_timezone_region(timezone): |
| if pd.isna(timezone) or not isinstance(timezone, str): |
| return 'Unknown' |
| parts = timezone.split('/') |
| return parts[0] if parts else 'Unknown' |
|
|
| def _categorize_experience(self, experience_level): |
| if pd.isna(experience_level): |
| return 'Unknown' |
| try: |
| exp_level = float(experience_level) |
| except Exception: |
| return 'Unknown' |
| for group_name, (min_exp, max_exp) in self.config.get('demographics', {}).get('experience_groups', {}).items(): |
| if min_exp <= exp_level <= max_exp: |
| return group_name |
| return 'Unknown' |
|
|
| def merge_demographics_with_comments(self, comments_df, demographics_df): |
| """Merge demographic data with comment data for musora_app platform only.""" |
| if demographics_df.empty: |
| for col, val in [('age', None), ('age_group', 'Unknown'), |
| ('timezone', None), ('timezone_region', 'Unknown'), |
| ('experience_level', None), ('experience_group', 'Unknown')]: |
| comments_df[col] = val |
| return comments_df |
|
|
| if 'author_id' in comments_df.columns and 'user_id' in demographics_df.columns: |
| comments_df = comments_df.copy() |
| comments_df['author_id_str'] = comments_df['author_id'].astype(str) |
| demographics_df['user_id_str'] = demographics_df['user_id'].astype(str) |
|
|
| merged_df = comments_df.merge( |
| demographics_df[['user_id_str', 'age', 'age_group', 'timezone', |
| 'timezone_region', 'experience_level', 'experience_group']], |
| left_on='author_id_str', |
| right_on='user_id_str', |
| how='left' |
| ) |
| merged_df.drop(columns=['author_id_str', 'user_id_str'], errors='ignore', inplace=True) |
|
|
| for col in ['age_group', 'timezone_region', 'experience_group']: |
| if col in merged_df.columns: |
| merged_df[col] = merged_df[col].fillna('Unknown') |
|
|
| return merged_df |
|
|
| return comments_df |
|
|
| |
| |
| |
|
|
| @staticmethod |
| def get_filter_options(df): |
| """Get unique values for sidebar filters.""" |
| return { |
| 'platforms': sorted(df['platform'].unique().tolist()), |
| 'brands': sorted(df['brand'].unique().tolist()), |
| 'sentiments': sorted(df['sentiment_polarity'].unique().tolist()), |
| 'languages': sorted(df['detected_language'].dropna().unique().tolist()) |
| if 'detected_language' in df.columns else [] |
| } |
|
|
| @staticmethod |
| def apply_filters(df, platforms=None, brands=None, sentiments=None, |
| date_range=None, languages=None): |
| """Apply sidebar filters to a dataframe (no copy β boolean indexing only).""" |
| filtered_df = df |
|
|
| if platforms: |
| filtered_df = filtered_df[filtered_df['platform'].isin(platforms)] |
|
|
| if brands: |
| filtered_df = filtered_df[filtered_df['brand'].isin(brands)] |
|
|
| if sentiments: |
| filtered_df = filtered_df[filtered_df['sentiment_polarity'].isin(sentiments)] |
|
|
| if languages: |
| filtered_df = filtered_df[filtered_df['detected_language'].isin(languages)] |
|
|
| if date_range and len(date_range) == 2 and 'comment_timestamp' in filtered_df.columns: |
| start_date, end_date = date_range |
| filtered_df = filtered_df[ |
| (filtered_df['comment_timestamp'] >= pd.Timestamp(start_date)) & |
| (filtered_df['comment_timestamp'] <= pd.Timestamp(end_date)) |
| ] |
|
|
| return filtered_df |
|
|
| @staticmethod |
| def get_date_range(df, default_days=30): |
| """Get default date range from dataframe.""" |
| if 'comment_timestamp' in df.columns and not df.empty: |
| max_date = df['comment_timestamp'].max() |
| min_date = max_date - timedelta(days=default_days) |
| return (min_date, max_date) |
| return (datetime.now() - timedelta(days=default_days), datetime.now()) |
|
|
| |
| |
| |
|
|
| @staticmethod |
| def _sanitize_value(value): |
| """Remove characters that could break SQL string literals.""" |
| return re.sub(r"['\";\\]", '', str(value)) |
|
|
| @staticmethod |
| def _build_date_clause(date_range, table_alias=None): |
| """ |
| Build a SQL AND COMMENT_TIMESTAMP ... clause, or empty string. |
| |
| Args: |
| date_range: tuple of (start, end) or None |
| table_alias: optional table alias prefix (e.g. 's') to avoid |
| ambiguous column errors when a JOIN is present |
| """ |
| if date_range and len(date_range) == 2: |
| col = f"{table_alias}.COMMENT_TIMESTAMP" if table_alias else "COMMENT_TIMESTAMP" |
| return f"AND {col} >= '{date_range[0]}' AND {col} <= '{date_range[1]}'" |
| return "" |