Spaces:
Sleeping
Sleeping
| """ | |
| Query interface for data lake using AWS Athena SQL queries. | |
| Provides methods to read and filter data from the Athena data lake | |
| using SQL queries with support for device/message filtering and date ranges. | |
| """ | |
| from typing import List, Optional, Tuple | |
| import pandas as pd | |
| from .athena import AthenaQuery | |
| from .catalog import DataLakeCatalog | |
| from .config import DataLakeConfig | |
| from .logger import setup_logger | |
| logger = setup_logger(__name__) | |
| class DataLakeQuery: | |
| """ | |
| Query interface for Athena-based data lake. | |
| Provides efficient methods to read data using SQL queries, | |
| with support for filtering by device, message, date range, and time windows. | |
| """ | |
| def __init__(self, athena_query: AthenaQuery, catalog: DataLakeCatalog): | |
| """ | |
| Initialize query engine. | |
| Args: | |
| athena_query: AthenaQuery instance | |
| catalog: Data lake catalog | |
| """ | |
| self.athena = athena_query | |
| self.catalog = catalog | |
| logger.info("Initialized DataLakeQuery") | |
| def read_device_message( | |
| self, | |
| device_id: str, | |
| message: str, | |
| date_range: Optional[Tuple[str, str]] = None, | |
| columns: Optional[List[str]] = None, | |
| limit: Optional[int] = None, | |
| ) -> pd.DataFrame: | |
| """ | |
| Read all data for a device/message combination using SQL. | |
| Args: | |
| device_id: Device identifier | |
| message: Message/rule name | |
| date_range: Optional (start_date, end_date) tuple (YYYY-MM-DD format) | |
| columns: Optional column subset to read (improves performance) | |
| limit: Optional row limit | |
| Returns: | |
| DataFrame with query results | |
| """ | |
| table_name = self.catalog.get_table_name(device_id, message) | |
| # Build SELECT clause | |
| if columns: | |
| # Validate columns exist | |
| schema = self.catalog.get_schema(device_id, message) | |
| if schema: | |
| valid_columns = [c for c in columns if c in schema] | |
| if not valid_columns: | |
| logger.warning(f"None of requested columns found, using all columns") | |
| select_clause = "*" | |
| else: | |
| select_clause = ", ".join(valid_columns) | |
| else: | |
| select_clause = "*" | |
| else: | |
| select_clause = "*" | |
| # Build WHERE clause | |
| where_conditions = [] | |
| if date_range: | |
| start_date, end_date = date_range | |
| # Parse dates and filter using $path column | |
| # Format: YYYY-MM-DD | |
| # Data structure: {device_id}/{message}/{year}/{month}/{day}/file.parquet | |
| start_parts = start_date.split('-') | |
| end_parts = end_date.split('-') | |
| if len(start_parts) == 3 and len(end_parts) == 3: | |
| start_year, start_month, start_day = start_parts | |
| end_year, end_month, end_day = end_parts | |
| # Extract year, month, day from path and filter | |
| # Path structure: .../year/month/day/file.parquet | |
| # Use element_at(split($path, '/'), -4) for year, -3 for month, -2 for day | |
| path_year = "try_cast(element_at(split(\"$path\", '/'), -4) AS INTEGER)" | |
| path_month = "try_cast(element_at(split(\"$path\", '/'), -3) AS INTEGER)" | |
| path_day = "try_cast(element_at(split(\"$path\", '/'), -2) AS INTEGER)" | |
| # Build partition filter using path-based extraction | |
| # This handles hierarchical partitioning: {device_id}/{message}/{year}/{month}/{day}/file.parquet | |
| where_conditions.append( | |
| f"({path_year} > {start_year} OR " | |
| f"({path_year} = {start_year} AND " | |
| f"({path_month} > {start_month} OR " | |
| f"({path_month} = {start_month} AND {path_day} >= {start_day}))))" | |
| ) | |
| where_conditions.append( | |
| f"({path_year} < {end_year} OR " | |
| f"({path_year} = {end_year} AND " | |
| f"({path_month} < {end_month} OR " | |
| f"({path_month} = {end_month} AND {path_day} <= {end_day}))))" | |
| ) | |
| else: | |
| # Fallback: try date column if it exists | |
| where_conditions.append(f"date >= '{start_date}' AND date <= '{end_date}'") | |
| where_clause = "" | |
| if where_conditions: | |
| where_clause = "WHERE " + " AND ".join(where_conditions) | |
| # Build LIMIT clause | |
| limit_clause = f"LIMIT {limit}" if limit else "" | |
| query = f""" | |
| SELECT {select_clause} | |
| FROM {self.catalog.config.database_name}.{table_name} | |
| {where_clause} | |
| {limit_clause} | |
| """ | |
| logger.info(f"Executing query for {device_id}/{message}") | |
| return self.athena.query_to_dataframe(query) | |
| def read_date_range( | |
| self, | |
| device_id: str, | |
| message: str, | |
| start_date: str, | |
| end_date: str, | |
| columns: Optional[List[str]] = None, | |
| ) -> pd.DataFrame: | |
| """ | |
| Read data for a specific date range. | |
| Convenience method wrapping read_device_message with date range. | |
| Args: | |
| device_id: Device identifier | |
| message: Message name | |
| start_date: Start date (YYYY-MM-DD format) | |
| end_date: End date (YYYY-MM-DD format) | |
| columns: Optional column subset | |
| Returns: | |
| DataFrame with data for the date range | |
| """ | |
| return self.read_device_message( | |
| device_id=device_id, | |
| message=message, | |
| date_range=(start_date, end_date), | |
| columns=columns, | |
| ) | |
| def time_series_query( | |
| self, | |
| device_id: str, | |
| message: str, | |
| signal_name: str, | |
| start_time: Optional[int] = None, | |
| end_time: Optional[int] = None, | |
| limit: Optional[int] = None, | |
| ) -> pd.DataFrame: | |
| """ | |
| Query single signal as time series. | |
| Args: | |
| device_id: Device identifier | |
| message: Message name | |
| signal_name: Signal column name | |
| start_time: Min timestamp (microseconds since epoch) | |
| end_time: Max timestamp (microseconds since epoch) | |
| limit: Optional row limit | |
| Returns: | |
| DataFrame with 't' (timestamp) and signal columns, sorted by time | |
| """ | |
| table_name = self.catalog.get_table_name(device_id, message) | |
| # Build WHERE clause | |
| where_conditions = [] | |
| if start_time is not None: | |
| where_conditions.append(f"t >= {start_time}") | |
| if end_time is not None: | |
| where_conditions.append(f"t <= {end_time}") | |
| where_clause = "" | |
| if where_conditions: | |
| where_clause = "WHERE " + " AND ".join(where_conditions) | |
| limit_clause = f"LIMIT {limit}" if limit else "" | |
| query = f""" | |
| SELECT t, {signal_name} | |
| FROM {self.catalog.config.database_name}.{table_name} | |
| {where_clause} | |
| ORDER BY t | |
| {limit_clause} | |
| """ | |
| logger.info(f"Time series query for {device_id}/{message}/{signal_name}") | |
| return self.athena.query_to_dataframe(query) | |
| def execute_sql(self, sql: str) -> pd.DataFrame: | |
| """ | |
| Execute custom SQL query. | |
| Args: | |
| sql: SQL query string | |
| Returns: | |
| DataFrame with query results | |
| Note: | |
| Query should reference tables in the format: | |
| {database_name}.{table_name} | |
| """ | |
| logger.info("Executing custom SQL query") | |
| return self.athena.query_to_dataframe(sql) | |
| def aggregate( | |
| self, | |
| device_id: str, | |
| message: str, | |
| aggregation: str, | |
| group_by: Optional[List[str]] = None, | |
| where_clause: Optional[str] = None, | |
| ) -> pd.DataFrame: | |
| """ | |
| Execute aggregation query. | |
| Args: | |
| device_id: Device identifier | |
| message: Message name | |
| aggregation: Aggregation expression (e.g., "COUNT(*), AVG(RPM)") | |
| group_by: Optional list of columns to group by | |
| where_clause: Optional WHERE clause (without WHERE keyword) | |
| Returns: | |
| DataFrame with aggregation results | |
| Example: | |
| df = query.aggregate( | |
| "device_001", "EngineData", | |
| "COUNT(*) as count, AVG(RPM) as avg_rpm, MIN(RPM) as min_rpm", | |
| group_by=["date"] | |
| ) | |
| """ | |
| table_name = self.catalog.get_table_name(device_id, message) | |
| group_by_clause = "" | |
| if group_by: | |
| group_by_clause = f"GROUP BY {', '.join(group_by)}" | |
| where_clause_sql = "" | |
| if where_clause: | |
| where_clause_sql = f"WHERE {where_clause}" | |
| query = f""" | |
| SELECT {aggregation} | |
| FROM {self.catalog.config.database_name}.{table_name} | |
| {where_clause_sql} | |
| {group_by_clause} | |
| """ | |
| logger.info(f"Aggregation query for {device_id}/{message}") | |
| return self.athena.query_to_dataframe(query) | |