Spaces:
Sleeping
Sleeping
| """ | |
| OXON Technologies - Professional Streamlit Dashboard | |
| A comprehensive dashboard for analyzing device data from AWS Athena data lake. | |
| """ | |
| import sys | |
| from pathlib import Path | |
| _project_root = Path(__file__).resolve().parent.parent | |
| if str(_project_root) not in sys.path: | |
| sys.path.insert(0, str(_project_root)) | |
| import streamlit as st | |
| from warnings import filterwarnings | |
| import base64 | |
| from PIL import Image | |
| import pandas as pd | |
| import numpy as np | |
| import yaml | |
| import re | |
| import plotly.graph_objects as go | |
| from typing import Dict, Optional, List, Tuple | |
| from ydata_profiling import ProfileReport | |
| import plotly.express as px | |
| from src.datalake.config import DataLakeConfig | |
| from src.datalake.athena import AthenaQuery | |
| from src.datalake.catalog import DataLakeCatalog | |
| from src.datalake.query import DataLakeQuery | |
| from src.datalake.batch import BatchProcessor | |
| from src.utils.correlation import CorrelationMatrixGenerator | |
| from src.utils.dimension_reduction import DimensionReduction | |
| from src.utils.feature_class import DetectFeatureClasses | |
| # Base directory for config/images (relative to this file) | |
| _SRC_DIR = Path(__file__).resolve().parent | |
| # Ignore warnings | |
| filterwarnings("ignore") | |
| # ============================================================================ | |
| # Configuration Management | |
| # ============================================================================ | |
| def load_config(config_path: Optional[str] = None) -> Dict: | |
| """ | |
| Load configuration from YAML file. | |
| Args: | |
| config_path: Path to the configuration YAML file (default: src/config.yaml) | |
| Returns: | |
| Dictionary containing configuration settings | |
| Raises: | |
| FileNotFoundError: If config file doesn't exist | |
| yaml.YAMLError: If config file is invalid YAML | |
| """ | |
| if config_path is None: | |
| config_path = _SRC_DIR / "config.yaml" | |
| config_file = Path(config_path) | |
| if not config_file.exists(): | |
| raise FileNotFoundError(f"Configuration file not found: {config_path}") | |
| with open(config_file, 'r') as f: | |
| config = yaml.safe_load(f) | |
| return config | |
| def initialize_aws_services(config: Dict) -> Tuple[DataLakeConfig, AthenaQuery, DataLakeCatalog, DataLakeQuery, BatchProcessor]: | |
| """ | |
| Initialize AWS services using configuration. | |
| Args: | |
| config: Configuration dictionary with AWS credentials | |
| Returns: | |
| Tuple of (config, athena, catalog, query, processor) | |
| Raises: | |
| KeyError: If required configuration keys are missing | |
| Exception: If AWS service initialization fails | |
| """ | |
| aws_config = config.get('aws', {}) | |
| required_keys = ['database_name', 'workgroup', 's3_output_location', 'region', | |
| 'access_key_id', 'secret_access_key'] | |
| missing_keys = [key for key in required_keys if key not in aws_config] | |
| if missing_keys: | |
| raise KeyError(f"Missing required AWS configuration keys: {missing_keys}") | |
| data_lake_config = DataLakeConfig.from_credentials( | |
| database_name=aws_config['database_name'], | |
| workgroup=aws_config['workgroup'], | |
| s3_output_location=aws_config['s3_output_location'], | |
| region=aws_config['region'], | |
| access_key_id=aws_config['access_key_id'], | |
| secret_access_key=aws_config['secret_access_key'], | |
| ) | |
| athena = AthenaQuery(data_lake_config) | |
| catalog = DataLakeCatalog(athena, data_lake_config) | |
| query = DataLakeQuery(athena, catalog) | |
| processor = BatchProcessor(query) | |
| return data_lake_config, athena, catalog, query, processor | |
| # ============================================================================ | |
| # Session State Management | |
| # ============================================================================ | |
| def initialize_session_state(): | |
| """Initialize all session state variables with proper defaults.""" | |
| # Configuration | |
| if 'app_config' not in st.session_state: | |
| try: | |
| st.session_state['app_config'] = load_config() | |
| except Exception as e: | |
| st.session_state['app_config'] = None | |
| st.session_state['config_error'] = str(e) | |
| # AWS Services (only initialize when needed) | |
| if 'aws_initialized' not in st.session_state: | |
| st.session_state['aws_initialized'] = False | |
| if 'aws_error' not in st.session_state: | |
| st.session_state['aws_error'] = None | |
| # User selections | |
| if 'selected_device' not in st.session_state: | |
| st.session_state['selected_device'] = None | |
| if 'selected_message' not in st.session_state: | |
| st.session_state['selected_message'] = None | |
| if 'message_mapping' not in st.session_state: | |
| st.session_state['message_mapping'] = None | |
| # Date range filter | |
| if 'date_range_enabled' not in st.session_state: | |
| st.session_state['date_range_enabled'] = False | |
| # Selected dates (what user picks in the UI) | |
| if 'date_range_start' not in st.session_state: | |
| st.session_state['date_range_start'] = None | |
| if 'date_range_end' not in st.session_state: | |
| st.session_state['date_range_end'] = None | |
| # Applied dates (what's actually being used for filtering) | |
| if 'applied_date_range_start' not in st.session_state: | |
| st.session_state['applied_date_range_start'] = None | |
| if 'applied_date_range_end' not in st.session_state: | |
| st.session_state['applied_date_range_end'] = None | |
| # Data cache | |
| if 'device_list' not in st.session_state: | |
| st.session_state['device_list'] = None | |
| if 'message_list' not in st.session_state: | |
| st.session_state['message_list'] = None | |
| if 'current_data' not in st.session_state: | |
| st.session_state['current_data'] = None | |
| # Correlations tab | |
| if 'correlations_run_clicked' not in st.session_state: | |
| st.session_state['correlations_run_clicked'] = False | |
| if 'correlations_data' not in st.session_state: | |
| st.session_state['correlations_data'] = None | |
| if 'correlation_matrix' not in st.session_state: | |
| st.session_state['correlation_matrix'] = None | |
| if 'feature_clusters' not in st.session_state: | |
| st.session_state['feature_clusters'] = None | |
| def initialize_aws_if_needed(): | |
| """ | |
| Initialize AWS services if not already initialized. | |
| Returns True if successful, False otherwise. | |
| """ | |
| if st.session_state['aws_initialized']: | |
| return True | |
| if st.session_state['app_config'] is None: | |
| return False | |
| try: | |
| config, athena, catalog, query, processor = initialize_aws_services( | |
| st.session_state['app_config'] | |
| ) | |
| st.session_state['config'] = config | |
| st.session_state['athena'] = athena | |
| st.session_state['catalog'] = catalog | |
| st.session_state['query'] = query | |
| st.session_state['processor'] = processor | |
| st.session_state['aws_initialized'] = True | |
| st.session_state['aws_error'] = None | |
| return True | |
| except Exception as e: | |
| st.session_state['aws_error'] = str(e) | |
| st.session_state['aws_initialized'] = False | |
| return False | |
| # ============================================================================ | |
| # UI Components | |
| # ============================================================================ | |
| def get_base64_image(image_path: str) -> Optional[str]: | |
| """ | |
| Convert image to base64 string. | |
| Args: | |
| image_path: Path to the image file | |
| Returns: | |
| Base64 encoded string or None if file not found | |
| """ | |
| try: | |
| image_file = Path(image_path) | |
| if not image_file.exists(): | |
| return None | |
| with open(image_file, "rb") as f: | |
| return base64.b64encode(f.read()).decode() | |
| except Exception: | |
| return None | |
| def display_header(logo_path: str, title: str): | |
| """ | |
| Display header with logo and title. | |
| Args: | |
| logo_path: Path to logo image | |
| title: Header title text | |
| """ | |
| logo_base64 = get_base64_image(logo_path) | |
| if logo_base64: | |
| st.markdown( | |
| f""" | |
| <div style="display: flex; align-items: center;"> | |
| <img src="data:image/png;base64,{logo_base64}" alt="Logo" | |
| style="height: 200px; margin-right: 10px;"> | |
| <h1 style="display: inline; margin: 0;">{title} ??</h1> | |
| </div> | |
| """, | |
| unsafe_allow_html=True, | |
| ) | |
| else: | |
| st.title(f"{title} ??") | |
| def display_sidebar(): | |
| """Display sidebar with device selection.""" | |
| with st.sidebar: | |
| # Logo | |
| logo_rel = st.session_state['app_config'].get('dashboard', {}).get('logo_path', 'images/logo.png') | |
| logo_path = _SRC_DIR / logo_rel | |
| try: | |
| st.image(Image.open(logo_path), width='stretch') | |
| except Exception: | |
| st.write("OXON Technologies") | |
| st.title("OXON Technologies") | |
| st.write("Welcome to the OXON Technologies dashboard. " | |
| "Select a device ID and click **Go!** to begin analysis.") | |
| # Check if AWS services are initialized | |
| if not st.session_state['aws_initialized']: | |
| st.warning("?? AWS services not initialized. Please check configuration.") | |
| return | |
| # Load device list if not cached | |
| if st.session_state['device_list'] is None: | |
| try: | |
| with st.spinner("Loading devices..."): | |
| st.session_state['device_list'] = st.session_state['catalog'].list_devices() | |
| except Exception as e: | |
| st.error(f"Error loading devices: {str(e)}") | |
| return | |
| devices_list = st.session_state['device_list'] | |
| if not devices_list: | |
| st.warning("No devices found in the data lake.") | |
| return | |
| # Device selection | |
| current_index = 0 | |
| if st.session_state['selected_device'] in devices_list: | |
| current_index = devices_list.index(st.session_state['selected_device']) | |
| selected_device = st.selectbox( | |
| "Device ID", | |
| devices_list, | |
| index=current_index, | |
| key="sidebar_device_select" | |
| ) | |
| # Apply device selection only when user clicks the button | |
| if st.button("Go!", key="device_go_btn", width='stretch'): | |
| st.session_state['selected_device'] = selected_device | |
| st.session_state['selected_message'] = None | |
| st.session_state['message_list'] = None | |
| st.session_state['message_mapping'] = None | |
| st.session_state['current_data'] = None | |
| st.session_state['date_range_enabled'] = False | |
| st.session_state['date_range_start'] = None | |
| st.session_state['date_range_end'] = None | |
| st.session_state['applied_date_range_start'] = None | |
| st.session_state['applied_date_range_end'] = None | |
| st.session_state['correlations_run_clicked'] = False | |
| st.session_state['correlations_data'] = None | |
| st.session_state['correlation_matrix'] = None | |
| st.session_state['feature_clusters'] = None | |
| st.rerun() | |
| # Show selected device info only after user has confirmed | |
| if st.session_state['selected_device']: | |
| st.success(f"? Selected: {st.session_state['selected_device']}") | |
| # ============================================================================ | |
| # Message Processing | |
| # ============================================================================ | |
| def build_message_mapping(messages_list: List[str], mapping_config: Dict) -> Tuple[Dict[str, str], List[str]]: | |
| """ | |
| Build message mapping dictionary from raw messages. | |
| Args: | |
| messages_list: List of raw message names | |
| mapping_config: Configuration dictionary with message mappings | |
| Returns: | |
| Tuple of (messages_mapping_dict, lost_messages_list) | |
| """ | |
| pattern = re.compile(r"s(?P<s>\d{2})pid.*m(?P<m>[0-9a-fA-F]{2})$") | |
| messages_mapping_dict = {} | |
| lost_messages_list = [] | |
| for message in messages_list: | |
| # Do not change name for messages that are not can1 | |
| if not message.startswith('can1'): | |
| messages_mapping_dict[message] = message | |
| continue | |
| message_id_parts = pattern.search(message) | |
| if not message_id_parts: | |
| continue | |
| message_id = (message_id_parts.group("s") + message_id_parts.group("m")).upper() | |
| if message_id in mapping_config: | |
| message_name = mapping_config[message_id]['name'] | |
| messages_mapping_dict[message_name] = message | |
| else: | |
| lost_messages_list.append(message) | |
| return messages_mapping_dict, lost_messages_list | |
| def load_message_list(device_id: str) -> Optional[List[str]]: | |
| """ | |
| Load message list for a device. | |
| Args: | |
| device_id: Device ID to load messages for | |
| Returns: | |
| List of message names or None if error | |
| """ | |
| try: | |
| return st.session_state['catalog'].list_messages(device_id) | |
| except Exception as e: | |
| st.error(f"Error loading messages: {str(e)}") | |
| return None | |
| # ============================================================================ | |
| # Tab Components | |
| # ============================================================================ | |
| def render_message_viewer_tab(): | |
| """Render the Message Viewer tab.""" | |
| # Check prerequisites | |
| if not st.session_state['aws_initialized']: | |
| st.error("AWS services not initialized. Please check configuration.") | |
| return | |
| if not st.session_state['selected_device']: | |
| st.info("?? Please select a device from the sidebar and click **Go!** to begin.") | |
| return | |
| device_id = st.session_state['selected_device'] | |
| # Load message list if not cached | |
| if st.session_state['message_list'] is None: | |
| with st.spinner(f"Loading messages for device {device_id}..."): | |
| st.session_state['message_list'] = load_message_list(device_id) | |
| if st.session_state['message_list'] is None: | |
| return | |
| messages_list = st.session_state['message_list'] | |
| if not messages_list: | |
| st.warning(f"No messages found for device {device_id}.") | |
| return | |
| # Get message mapping configuration | |
| mapping_config = st.session_state['app_config'].get('message_mapping', {}) | |
| # Build message mapping | |
| if st.session_state['message_mapping'] is None: | |
| messages_mapping_dict, lost_messages_list = build_message_mapping( | |
| messages_list, mapping_config | |
| ) | |
| st.session_state['message_mapping'] = messages_mapping_dict | |
| if lost_messages_list: | |
| st.warning( | |
| f"The following messages were not found in the mapping: " | |
| f"{', '.join(lost_messages_list[:10])}" | |
| f"{'...' if len(lost_messages_list) > 10 else ''}" | |
| ) | |
| else: | |
| messages_mapping_dict = st.session_state['message_mapping'] | |
| if not messages_mapping_dict: | |
| st.warning("No valid messages found after mapping.") | |
| return | |
| # Message selection | |
| current_index = 0 | |
| if st.session_state['selected_message']: | |
| # Find the message name that corresponds to selected_message | |
| for name, msg in messages_mapping_dict.items(): | |
| if msg == st.session_state['selected_message']: | |
| if name in list(messages_mapping_dict.keys()): | |
| current_index = list(messages_mapping_dict.keys()).index(name) | |
| break | |
| st.markdown('<div style="text-align: center;"><h2>Message Viewer</h2></div>', unsafe_allow_html=True) | |
| st.divider() | |
| selected_message_name = st.selectbox( | |
| "Select Message", | |
| list(messages_mapping_dict.keys()), | |
| index=current_index, | |
| key="message_selectbox" | |
| ) | |
| message_clicked = st.button("Show!", key="message_show_btn", width='stretch') | |
| selected_message = messages_mapping_dict[selected_message_name] | |
| # Apply message selection only when user clicks the button | |
| if message_clicked: | |
| st.session_state['selected_message'] = selected_message | |
| st.session_state['current_data'] = None | |
| st.rerun() | |
| if st.session_state['selected_message']: | |
| st.info(f"?? Selected message: `{st.session_state['selected_message']}` ({selected_message_name})") | |
| # Date range selection (optional filter) | |
| st.divider() | |
| date_range_enabled = st.checkbox( | |
| "Filter by Date Range", | |
| value=st.session_state.get('date_range_enabled', False), | |
| key="date_range_checkbox", | |
| help="Enable to filter data by date range" | |
| ) | |
| if date_range_enabled: | |
| # Get min/max dates from cached data if available | |
| min_date = None | |
| max_date = None | |
| if st.session_state.get('current_data') is not None: | |
| try: | |
| df_temp = st.session_state['current_data'] | |
| if 'timestamp' in df_temp.columns: | |
| min_date = df_temp['timestamp'].min().date() | |
| max_date = df_temp['timestamp'].max().date() | |
| except Exception: | |
| pass | |
| col_start, col_end = st.columns([1, 1]) | |
| with col_start: | |
| date_start = st.date_input( | |
| "Start Date", | |
| value=st.session_state.get('date_range_start') or min_date, | |
| min_value=min_date, | |
| max_value=max_date, | |
| key="date_range_start_input", | |
| help="Select start date for filtering" | |
| ) | |
| with col_end: | |
| date_end = st.date_input( | |
| "End Date", | |
| value=st.session_state.get('date_range_end') or max_date, | |
| min_value=min_date, | |
| max_value=max_date, | |
| key="date_range_end_input", | |
| help="Select end date for filtering" | |
| ) | |
| apply_filter_clicked = st.button( | |
| "Apply Filter", | |
| key="apply_date_filter_btn", | |
| use_container_width=True | |
| ) | |
| # Update selected dates in session state | |
| st.session_state['date_range_start'] = date_start | |
| st.session_state['date_range_end'] = date_end | |
| # Apply filter only when button is clicked | |
| if apply_filter_clicked: | |
| # Validate date range before applying | |
| if date_start > date_end: | |
| st.error("?? Start date must be before or equal to end date.") | |
| else: | |
| st.session_state['applied_date_range_start'] = date_start | |
| st.session_state['applied_date_range_end'] = date_end | |
| st.rerun() | |
| # Show current applied filter status | |
| if st.session_state.get('applied_date_range_start') and st.session_state.get('applied_date_range_end'): | |
| st.success( | |
| f"?? **Applied filter:** {st.session_state['applied_date_range_start']} to " | |
| f"{st.session_state['applied_date_range_end']}" | |
| ) | |
| elif date_start and date_end: | |
| if date_start <= date_end: | |
| st.info("?? Select dates and click **Apply Filter** to filter the data.") | |
| else: | |
| st.error("?? Start date must be before or equal to end date.") | |
| else: | |
| # Clear applied date range when disabled | |
| if st.session_state.get('date_range_enabled'): | |
| st.session_state['applied_date_range_start'] = None | |
| st.session_state['applied_date_range_end'] = None | |
| st.session_state['date_range_start'] = None | |
| st.session_state['date_range_end'] = None | |
| # Update enabled state | |
| st.session_state['date_range_enabled'] = date_range_enabled | |
| render_message_data(device_id, st.session_state['selected_message']) | |
| else: | |
| st.info("Select a message and click **Show!** to load data.") | |
| def render_message_data(device_id: str, message: str): | |
| """ | |
| Render data and plot for a selected message. | |
| Args: | |
| device_id: Device ID | |
| message: Message name | |
| """ | |
| # Load data if not cached | |
| if st.session_state['current_data'] is None: | |
| with st.spinner("Loading data..."): | |
| try: | |
| df = st.session_state['query'].read_device_message( | |
| device_id=device_id, | |
| message=message, | |
| ) | |
| if df is None or df.empty: | |
| st.warning("No data found for the selected message.") | |
| return | |
| # Process data | |
| df['t'] = pd.to_datetime(df['t']) | |
| df = df.sort_values(by='t').reset_index(drop=True) | |
| df = df.rename(columns={'t': 'timestamp'}) | |
| st.session_state['current_data'] = df | |
| except Exception as e: | |
| st.error(f"Error loading data: {str(e)}") | |
| return | |
| df = st.session_state['current_data'].copy() | |
| df = df.drop(columns=['date_created'], errors='ignore') | |
| if df is None or df.empty: | |
| return | |
| # Apply date range filter if enabled and applied dates are set | |
| original_row_count = len(df) | |
| if (st.session_state.get('date_range_enabled') and | |
| st.session_state.get('applied_date_range_start') and | |
| st.session_state.get('applied_date_range_end')): | |
| start_date = pd.to_datetime(st.session_state['applied_date_range_start']) | |
| end_date = pd.to_datetime(st.session_state['applied_date_range_end']) | |
| # Include the entire end date (set to end of day) | |
| end_date = end_date.replace(hour=23, minute=59, second=59) | |
| df = df[(df['timestamp'] >= start_date) & (df['timestamp'] <= end_date)].copy() | |
| if len(df) == 0: | |
| st.warning( | |
| f"?? No data found in the selected date range " | |
| f"({st.session_state['applied_date_range_start']} to {st.session_state['applied_date_range_end']})." | |
| ) | |
| st.info("Try selecting a different date range or disable the filter to see all data.") | |
| return | |
| elif len(df) < original_row_count: | |
| st.info(f"?? Showing {len(df):,} of {original_row_count:,} records (filtered by date range).") | |
| # Display statistics | |
| # st.subheader("Statistics") | |
| st.divider() | |
| st.markdown('<div style="text-align: center;"><h2>Overview</h2></div>', unsafe_allow_html=True) | |
| st.divider() | |
| col1, col2, col3, col4 = st.columns([1, 2, 1, 1]) | |
| with col1: | |
| st.metric("Total Records", len(df)) | |
| with col2: | |
| st.metric("Date Range", f"{df['timestamp'].min().date()} to {df['timestamp'].max().date()}") | |
| with col3: | |
| st.metric("Data Columns", len(df.columns) - 1) # Exclude timestamp | |
| with col4: | |
| st.metric("Time Span", f"{(df['timestamp'].max() - df['timestamp'].min()).days} days") | |
| # Display data section | |
| st.divider() | |
| st.markdown('<div style="text-align: center;"><h2>Data & Profile Report</h2></div>', unsafe_allow_html=True) | |
| st.divider() | |
| col1, col2 = st.columns([1, 2]) | |
| with col1: | |
| try: | |
| st.dataframe(df.set_index('timestamp'), width='stretch', height=700) | |
| except Exception as e: # dataframe was too large | |
| st.warning(f"Dataframe was too large to display: {str(e)}") | |
| st.info("Dataframe was too large to display. Please use the profile report to analyze the data.") | |
| with col2: | |
| try: | |
| pr = ProfileReport(df, title="Data Profile", explorative=False, vars={"num": {"low_categorical_threshold": 0}}) | |
| st.components.v1.html(pr.to_html(), scrolling=True, height=700) | |
| except Exception as e: | |
| st.warning(f"Profile report could not be generated: {e}") | |
| # Display plot section | |
| st.divider() | |
| st.markdown('<div style="text-align: center;"><h2>Visualization</h2></div>', unsafe_allow_html=True) | |
| st.divider() | |
| try: | |
| # Prepare aggregated data | |
| daily_aggregated_df = df.groupby( | |
| pd.Grouper(key='timestamp', freq='D') | |
| ).mean().reset_index().fillna(0) | |
| # Create plot | |
| fig = go.Figure() | |
| data_columns = [col for col in daily_aggregated_df.columns | |
| if col not in ['timestamp']] | |
| for column in data_columns: | |
| fig.add_trace( | |
| go.Scatter( | |
| x=daily_aggregated_df['timestamp'], | |
| y=daily_aggregated_df[column], | |
| name=column, | |
| mode='lines+markers' | |
| ) | |
| ) | |
| # Red vertical line at 16 December 2025 with legend entry "Dosing Stage" | |
| dosing_date = st.session_state['app_config'].get('dashboard', {}).get('dosing_stage_date', '2025-12-16') | |
| try: | |
| dosing_datetime = pd.to_datetime(dosing_date) | |
| if data_columns: | |
| y_min = daily_aggregated_df[data_columns].min().min() | |
| y_max = daily_aggregated_df[data_columns].max().max() | |
| if y_min == y_max: | |
| y_min, y_max = y_min - 0.1, y_max + 0.1 | |
| else: | |
| y_min, y_max = 0, 1 | |
| # Add vertical line as a trace so it appears in the legend as "Dosing Stage" | |
| fig.add_trace( | |
| go.Scatter( | |
| x=[dosing_datetime, dosing_datetime], | |
| y=[y_min, y_max], | |
| mode='lines', | |
| name='Dosing Stage', | |
| line=dict(color='red', width=2) | |
| ) | |
| ) | |
| except Exception: | |
| pass | |
| # Update layout with legend | |
| fig.update_layout( | |
| title="Daily Aggregated Data", | |
| xaxis_title="Date", | |
| yaxis_title="Value", | |
| hovermode='x unified', | |
| width=800, | |
| height=700, | |
| showlegend=True, | |
| legend=dict( | |
| orientation="h", | |
| yanchor="bottom", | |
| y=1.02, | |
| xanchor="right", | |
| x=1, | |
| title_text="" | |
| ) | |
| ) | |
| st.plotly_chart(fig, width='stretch') | |
| except Exception as e: | |
| st.error(f"Error creating visualization: {str(e)}") | |
| def load_all_device_messages(device_id: str) -> Optional[pd.DataFrame]: | |
| """ | |
| Load all messages for a device, aggregate daily, and merge on timestamp. | |
| Args: | |
| device_id: Device ID to load messages for | |
| Returns: | |
| Merged DataFrame with all messages aggregated daily, or None if error | |
| """ | |
| try: | |
| messages_list = st.session_state['catalog'].list_messages(device_id) | |
| if not messages_list: | |
| return None | |
| aggregated_dfs = [] | |
| failed_messages = [] | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| total_messages = len(messages_list) | |
| for idx, message in enumerate(messages_list): | |
| if message.startswith('can9'): | |
| continue | |
| status_text.text(f"Loading message {idx + 1}/{total_messages}: {message}") | |
| progress_bar.progress((idx + 1) / total_messages) | |
| try: | |
| # Load message data | |
| df = st.session_state['query'].read_device_message( | |
| device_id=device_id, | |
| message=message, | |
| ) | |
| if df is None or df.empty: | |
| failed_messages.append(message) | |
| continue | |
| # Process data | |
| df['t'] = pd.to_datetime(df['t']) | |
| df = df.sort_values(by='t').reset_index(drop=True) | |
| df = df.rename(columns={'t': 'timestamp'}) | |
| # Drop date_created column | |
| df = df.drop(columns=['date_created'], errors='ignore') | |
| # Aggregate daily by mean | |
| daily_df = df.groupby( | |
| pd.Grouper(key='timestamp', freq='D') | |
| ).mean().reset_index() | |
| # Remove rows with all NaN (days with no data) | |
| daily_df = daily_df.dropna(how='all', subset=[col for col in daily_df.columns if col != 'timestamp']) | |
| if daily_df.empty: | |
| failed_messages.append(message) | |
| continue | |
| # Rename columns to include message name (except timestamp) | |
| # Handle multiple data columns for non-can1 messages | |
| rename_dict = {} | |
| for col in daily_df.columns: | |
| if col != 'timestamp': | |
| # Create unique column name: message_name__column_name | |
| rename_dict[col] = f"{message}__{col}" | |
| daily_df = daily_df.rename(columns=rename_dict) | |
| aggregated_dfs.append(daily_df) | |
| except Exception as e: | |
| failed_messages.append(f"{message} ({str(e)})") | |
| continue | |
| progress_bar.empty() | |
| status_text.empty() | |
| if not aggregated_dfs: | |
| if failed_messages: | |
| st.warning(f"Failed to load all messages. Errors: {', '.join(failed_messages[:5])}") | |
| return None | |
| if failed_messages: | |
| st.warning(f"Failed to load {len(failed_messages)} message(s). Continuing with {len(aggregated_dfs)} messages.") | |
| # Merge all dataframes on timestamp | |
| merged_df = aggregated_dfs[0] | |
| for df in aggregated_dfs[1:]: | |
| merged_df = pd.merge( | |
| merged_df, | |
| df, | |
| on='timestamp', | |
| how='outer' # Keep all days from all messages | |
| ) | |
| # Sort by timestamp | |
| merged_df = merged_df.sort_values(by='timestamp').reset_index(drop=True) | |
| # Fill NaN with 0 for numeric columns (or forward fill) | |
| numeric_cols = merged_df.select_dtypes(include=[np.number]).columns | |
| merged_df[numeric_cols] = merged_df[numeric_cols].fillna(0) | |
| return merged_df | |
| except Exception as e: | |
| st.error(f"Error loading device messages: {str(e)}") | |
| return None | |
| def _reset_correlations(): | |
| """Clear correlations run state and caches (used by Start over button).""" | |
| st.session_state['correlations_run_clicked'] = False | |
| st.session_state['correlations_data'] = None | |
| st.session_state['correlation_matrix'] = None | |
| st.session_state['feature_clusters'] = None | |
| def render_correlations_tab(): | |
| """Render the Correlations tab with correlation matrix and feature clusters.""" | |
| # Check prerequisites | |
| if not st.session_state['aws_initialized']: | |
| st.error("AWS services not initialized. Please check configuration.") | |
| return | |
| if not st.session_state['selected_device']: | |
| st.info("?? Please select a device from the sidebar and click **Go!** to begin.") | |
| return | |
| device_id = st.session_state['selected_device'] | |
| st.markdown('<div style="text-align: center;"><h2>Correlation Analysis</h2></div>', unsafe_allow_html=True) | |
| st.divider() | |
| # Run button: calculations start only after user presses it | |
| if not st.session_state.get('correlations_run_clicked'): | |
| st.info( | |
| "This analysis loads **all messages** for the selected device, aggregates them daily, " | |
| "and computes correlations and feature cohorts. Click the button below to start." | |
| ) | |
| if st.button("Run Correlation Analysis", key="run_correlations_btn", type="primary", use_container_width=True): | |
| st.session_state['correlations_run_clicked'] = True | |
| st.rerun() | |
| return | |
| # Load all device messages if not cached | |
| if st.session_state['correlations_data'] is None: | |
| with st.spinner(f"Loading all messages for device {device_id}..."): | |
| st.session_state['correlations_data'] = load_all_device_messages(device_id) | |
| if st.session_state['correlations_data'] is None or st.session_state['correlations_data'].empty: | |
| st.error("No data available for correlation analysis.") | |
| if st.button("Start over", key="correlations_start_over_btn"): | |
| _reset_correlations() | |
| st.rerun() | |
| return | |
| df = st.session_state['correlations_data'].copy() | |
| # Remove timestamp column for correlation analysis | |
| df_features = df.drop(columns=['timestamp']) | |
| if df_features.empty: | |
| st.error("No features available for correlation analysis.") | |
| return | |
| st.info(f"?? Analyzing {len(df_features.columns)} features from {len(df)} days of data.") | |
| # Detect feature classes | |
| st.subheader("1. Feature Classification") | |
| with st.spinner("Classifying features..."): | |
| try: | |
| detector = DetectFeatureClasses(df_features, categorical_threshold=0.5, string_data_policy='drop') | |
| feature_classes, dropped_features = detector.feature_classes() | |
| if dropped_features: | |
| st.warning(f"Dropped {len(dropped_features)} non-numeric features: {', '.join(dropped_features[:5])}") | |
| df_features = df_features.drop(columns=dropped_features) | |
| # Display feature class summary | |
| class_counts = {} | |
| for cls in feature_classes.values(): | |
| class_counts[cls] = class_counts.get(cls, 0) + 1 | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric("Continuous", class_counts.get('Continuous', 0)) | |
| with col2: | |
| st.metric("Binary", class_counts.get('Binary', 0)) | |
| with col3: | |
| st.metric("Categorical", class_counts.get('Categorical', 0)) | |
| except Exception as e: | |
| st.error(f"Error classifying features: {str(e)}") | |
| return | |
| # Generate correlation matrix | |
| st.subheader("2. Correlation Matrix") | |
| if st.session_state['correlation_matrix'] is None: | |
| with st.spinner("Generating correlation matrix (this may take a while)..."): | |
| try: | |
| corr_generator = CorrelationMatrixGenerator( | |
| df=df_features, | |
| feature_classes=feature_classes, | |
| continuous_vs_continuous_method='pearson' | |
| ) | |
| st.session_state['correlation_matrix'] = corr_generator.generate_matrix() | |
| except Exception as e: | |
| st.error(f"Error generating correlation matrix: {str(e)}") | |
| return | |
| corr_matrix = st.session_state['correlation_matrix'] | |
| # Display interactive heatmap | |
| st.markdown("**Interactive Correlation Heatmap**") | |
| try: | |
| # Create heatmap using plotly | |
| fig = px.imshow( | |
| corr_matrix, | |
| color_continuous_scale='RdBu', | |
| aspect='auto', | |
| labels=dict(x="Feature", y="Feature", color="Correlation"), | |
| title="Feature Correlation Matrix" | |
| ) | |
| fig.update_layout( | |
| height=max(800, len(corr_matrix) * 40), | |
| width=max(800, len(corr_matrix) * 40) | |
| ) | |
| st.plotly_chart(fig, use_container_width=True) | |
| except Exception as e: | |
| st.error(f"Error displaying heatmap: {str(e)}") | |
| # Find feature clusters using dimension reduction | |
| st.subheader("3. Feature Clusters (Cohorts)") | |
| if st.session_state['feature_clusters'] is None: | |
| with st.spinner("Finding feature clusters..."): | |
| try: | |
| dim_reduction = DimensionReduction( | |
| dataframe=df_features, | |
| feature_classes=feature_classes, | |
| method='pearson', | |
| projection_dimension=1 | |
| ) | |
| # Find clusters at different correlation thresholds; store (lower, upper) with each band for correct labeling | |
| st.session_state['feature_clusters'] = [ | |
| ((0.95, 1.0), dim_reduction.find_clusters(lower_bound=0.95, upper_bound=1.0)), | |
| ((0.90, 0.95), dim_reduction.find_clusters(lower_bound=0.90, upper_bound=0.95)), | |
| ((0.85, 0.90), dim_reduction.find_clusters(lower_bound=0.85, upper_bound=0.90)), | |
| ((0.80, 0.85), dim_reduction.find_clusters(lower_bound=0.80, upper_bound=0.85)), | |
| ((0.75, 0.80), dim_reduction.find_clusters(lower_bound=0.75, upper_bound=0.80)), | |
| ((0.70, 0.75), dim_reduction.find_clusters(lower_bound=0.70, upper_bound=0.75)), | |
| ] | |
| except Exception as e: | |
| st.error(f"Error finding clusters: {str(e)}") | |
| return | |
| cluster_bands = st.session_state['feature_clusters'] | |
| # Display clusters with band-bound labels so captions match the shown matrices | |
| for (lower, upper), cluster_list in cluster_bands: | |
| band_label = f"[{lower}, {upper}]" | |
| if cluster_list: | |
| st.markdown(f"**Cohorts with pairwise correlation in {band_label}**") | |
| for idx, cluster in enumerate(cluster_list): | |
| with st.expander(f"Cohort {idx + 1}: {len(cluster)} features (all pairs in {band_label})"): | |
| for feature in cluster: | |
| st.write(f" � {feature}") | |
| if len(cluster) > 1: | |
| st.markdown("**Pairwise correlations (values lie in " + band_label + "):**") | |
| cluster_corr = corr_matrix.loc[cluster, cluster] | |
| st.dataframe(cluster_corr, use_container_width=True) | |
| # Sanity check: ensure displayed matrix matches the band | |
| vals = cluster_corr.values | |
| off_diag = vals[~np.eye(len(cluster), dtype=bool)] | |
| if off_diag.size > 0: | |
| in_range = np.sum((off_diag >= lower) & (off_diag <= upper)) == off_diag.size | |
| if in_range: | |
| st.caption(f"All off-diagonal values in {band_label}.") | |
| else: | |
| st.caption(f"Note: some values fall outside {band_label} (may include NaNs or rounding).") | |
| else: | |
| st.info(f"No cohorts found with pairwise correlation in {band_label}.") | |
| # Summary statistics | |
| st.subheader("4. Summary") | |
| total_clusters = sum(len(cluster_list) for (_, cluster_list) in cluster_bands) | |
| total_features_in_clusters = sum( | |
| len(cluster) for (_, cluster_list) in cluster_bands for cluster in cluster_list | |
| ) | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.metric("Total Cohorts Found", total_clusters) | |
| with col2: | |
| st.metric("Features in Cohorts", total_features_in_clusters) | |
| st.divider() | |
| if st.button("Start over", key="correlations_start_over_bottom", use_container_width=True): | |
| _reset_correlations() | |
| st.rerun() | |
| def render_placeholder_tab(): | |
| """Render placeholder tab.""" | |
| st.info("?? This feature is under development.") | |
| # ============================================================================ | |
| # Main Application | |
| # ============================================================================ | |
| def main(): | |
| """Main application entry point.""" | |
| # Initialize session state | |
| initialize_session_state() | |
| # Load configuration | |
| if st.session_state['app_config'] is None: | |
| st.error( | |
| f"? Configuration Error: {st.session_state.get('config_error', 'Unknown error')}\n\n" | |
| "Please ensure `src/config.yaml` exists and is properly formatted." | |
| ) | |
| st.stop() | |
| # Initialize AWS services | |
| if not initialize_aws_if_needed(): | |
| if st.session_state['aws_error']: | |
| st.error( | |
| f"? AWS Initialization Error: {st.session_state['aws_error']}\n\n" | |
| "Please check your AWS credentials in `src/config.yaml`." | |
| ) | |
| st.stop() | |
| # Get dashboard configuration | |
| dashboard_config = st.session_state['app_config'].get('dashboard', {}) | |
| # Set page config | |
| st.set_page_config( | |
| page_title=dashboard_config.get('page_title', 'OXON Technologies'), | |
| page_icon=dashboard_config.get('page_icon', ':mag:'), | |
| layout=dashboard_config.get('layout', 'wide') | |
| ) | |
| # Custom sidebar styling | |
| sidebar_color = dashboard_config.get('sidebar_background_color', '#74b9ff') | |
| st.markdown( | |
| f""" | |
| <style> | |
| section[data-testid="stSidebar"] {{ | |
| background-color: {sidebar_color}; | |
| }} | |
| </style> | |
| """, | |
| unsafe_allow_html=True, | |
| ) | |
| # Display header | |
| header_logo_rel = dashboard_config.get('header_logo_path', 'images/analysis.png') | |
| header_logo = str(_SRC_DIR / header_logo_rel) | |
| header_title = dashboard_config.get('page_title', 'Analytical Dashboard') | |
| display_header(header_logo, header_title) | |
| # Display sidebar | |
| display_sidebar() | |
| # Main content tabs | |
| tabs = st.tabs(['Message Viewer', 'Correlations', 'To be Implemented']) | |
| with tabs[0]: | |
| render_message_viewer_tab() | |
| with tabs[1]: | |
| render_correlations_tab() | |
| with tabs[2]: | |
| render_placeholder_tab() | |
| if __name__ == "__main__": | |
| main() | |