CSS_EDA_Dashboard / src /README.md
arash7920's picture
Upload 38 files
e869d90 verified

CANedge Data Lake Python SDK

Production-ready Python package for querying and analyzing CAN/LIN data lakes created from CSS Electronics CANedge MDF4 logs using AWS Athena.

Features

  • AWS Athena Integration: Query Parquet data using SQL via Athena
  • CloudFormation Configuration: Automatic configuration from CloudFormation stack outputs
  • Scalable: Leverage Athena's distributed query engine for large datasets
  • Type-safe: Full type hints and docstrings
  • Well-architected: Clean module design with logging and error handling

Installation

# Clone or download project
cd CSS

# Install in development mode
pip install -e .

# Or install from requirements
pip install -r requirements.txt

Prerequisites

  1. AWS Account with:

    • CloudFormation stack named datalake-stack (or specify custom name)
    • Athena database configured
    • S3 bucket with Parquet data
    • AWS Glue catalog with table definitions
  2. CloudFormation Stack Outputs: Your datalake-stack must have the following outputs:

    • DatabaseName: Athena database name
    • S3OutputLocation: S3 location for Athena query results (e.g., s3://bucket/athena-results/)
    • WorkGroup: (Optional) Athena workgroup name
    • Region: (Optional) AWS region
  3. AWS Credentials:

    • AWS CLI configured: aws configure
    • Or IAM role (for EC2/ECS/Lambda)
    • Or environment variables

Quick Start

Option 1: Using Explicit Credentials (Recommended for Testing)

from datalake.config import DataLakeConfig
from datalake.athena import AthenaQuery
from datalake.catalog import DataLakeCatalog
from datalake.query import DataLakeQuery

# Load config with explicit credentials
config = DataLakeConfig.from_credentials(
    database_name="dbparquetdatalake05",
    workgroup="athenaworkgroup-datalake05",
    s3_output_location="s3://canedge-raw-data-parquet/athena-results/",
    region="eu-north-1",
    access_key_id="YOUR_ACCESS_KEY_ID",
    secret_access_key="YOUR_SECRET_ACCESS_KEY",
)

# Initialize Athena and catalog
athena = AthenaQuery(config)
catalog = DataLakeCatalog(athena, config)
query = DataLakeQuery(athena, catalog)

# List devices
devices = catalog.list_devices()
print(f"Devices: {devices}")

# Query data
df = query.read_device_message(
    device_id="device_001",
    message="EngineData",
    date_range=("2024-01-15", "2024-01-20"),
    limit=1000
)
print(f"Loaded {len(df)} records")

Option 2: Using CloudFormation Stack

from datalake.config import DataLakeConfig
from datalake.athena import AthenaQuery
from datalake.catalog import DataLakeCatalog
from datalake.query import DataLakeQuery

# Load config from CloudFormation stack
config = DataLakeConfig.from_cloudformation(
    stack_name="datalake-stack",
    region=None,  # Auto-detect from stack or use default
    profile=None,  # Use default profile or IAM role
)

# Initialize Athena and catalog
athena = AthenaQuery(config)
catalog = DataLakeCatalog(athena, config)
query = DataLakeQuery(athena, catalog)

Configuration

Option 1: Using Explicit Credentials

For direct access with AWS credentials:

config = DataLakeConfig.from_credentials(
    database_name="dbparquetdatalake05",
    workgroup="athenaworkgroup-datalake05",
    s3_output_location="s3://canedge-raw-data-parquet/athena-results/",
    region="eu-north-1",
    access_key_id="AKIARJQJFFVASPMSGNNY",
    secret_access_key="YOUR_SECRET_KEY",
)

Parameters:

  • database_name: Athena database name
  • workgroup: Athena workgroup name
  • s3_output_location: S3 path for query results (must end with /)
  • region: AWS region
  • access_key_id: AWS access key ID
  • secret_access_key: AWS secret access key

Option 2: Using CloudFormation Stack

CloudFormation Stack Setup

Your CloudFormation stack (datalake-stack) should output:

Outputs:
  DatabaseName:
    Description: Athena database name
    Value: canedge_datalake
  
  S3OutputLocation:
    Description: S3 location for Athena query results
    Value: s3://my-bucket/athena-results/
  
  WorkGroup:
    Description: Athena workgroup name (optional)
    Value: primary
  
  Region:
    Description: AWS region
    Value: us-east-1

Loading Configuration

from datalake.config import DataLakeConfig

# Load from CloudFormation stack (default: 'datalake-stack')
config = DataLakeConfig.from_cloudformation()

# Or specify custom stack name
config = DataLakeConfig.from_cloudformation(
    stack_name="my-custom-stack",
    region="us-east-1",  # Optional: override region
    profile="myprofile",  # Optional: use named AWS profile
)

Data Lake Structure

Athena Database Organization

The data lake is organized in Athena with:

  • Database: Contains all tables (from CloudFormation output DatabaseName)
  • Tables: Named by device and message (e.g., device_001_EngineData)
  • Partitions: Date-based partitioning for efficient queries
  • Schema: Each table has columns: t (timestamp), signal columns from DBC files

Table Naming Convention

Tables are typically named:

  • {device_id}_{message_rule} (e.g., device_001_EngineData)
  • Or {device_id}__{message_rule} (double underscore)
  • The catalog automatically detects the pattern

Usage Patterns

1. Explore Data Lake

from datalake.config import DataLakeConfig
from datalake.athena import AthenaQuery
from datalake.catalog import DataLakeCatalog

config = DataLakeConfig.from_cloudformation()
athena = AthenaQuery(config)
catalog = DataLakeCatalog(athena, config)

# List all tables
tables = catalog.list_tables()
print(f"Tables: {tables}")

# List devices
devices = catalog.list_devices()
print(f"Devices: {devices}")

# List messages for device
messages = catalog.list_messages("device_001")
print(f"Messages: {messages}")

# Get schema
schema = catalog.get_schema("device_001", "EngineData")
print(f"Columns: {list(schema.keys())}")

# List partitions (dates)
partitions = catalog.list_partitions("device_001", "EngineData")
print(f"Dates: {partitions}")

2. Query Data

from datalake.query import DataLakeQuery

query = DataLakeQuery(athena, catalog)

# Read all data for device/message
df = query.read_device_message(
    device_id="device_001",
    message="EngineData",
    date_range=("2024-01-15", "2024-01-20"),
    columns=["t", "RPM", "Temperature"],
    limit=10000
)
print(f"Loaded {len(df)} records")

3. Time Series Query

# Query single signal over time window
df_ts = query.time_series_query(
    device_id="device_001",
    message="EngineData",
    signal_name="RPM",
    start_time=1000000000000000,  # microseconds
    end_time=2000000000000000,
    limit=10000
)

# Convert timestamp and plot
df_ts['timestamp'] = pd.to_datetime(df_ts['t'], unit='us')
print(df_ts[['timestamp', 'RPM']].head())

4. Custom SQL Queries

# Execute custom SQL
# Note: Use path-based filtering for date ranges
# Data structure: {device_id}/{message}/{year}/{month}/{day}/file.parquet
sql = """
SELECT 
    COUNT(*) as record_count,
    AVG(RPM) as avg_rpm,
    MAX(Temperature) as max_temp
FROM canedge_datalake.device_001_EngineData
WHERE try_cast(element_at(split("$path", '/'), -4) AS INTEGER) = 2024
  AND try_cast(element_at(split("$path", '/'), -3) AS INTEGER) >= 1
  AND try_cast(element_at(split("$path", '/'), -2) AS INTEGER) >= 15
"""

df = query.execute_sql(sql)
print(df)

5. Aggregation Queries

# Use built-in aggregation method
# For date filtering, use path-based extraction
path_year = "try_cast(element_at(split(\"$path\", '/'), -4) AS INTEGER)"
path_month = "try_cast(element_at(split(\"$path\", '/'), -3) AS INTEGER)"
path_day = "try_cast(element_at(split(\"$path\", '/'), -2) AS INTEGER)"
where_clause = f"{path_year} = 2024 AND {path_month} >= 1 AND {path_day} >= 15"

df_agg = query.aggregate(
    device_id="device_001",
    message="EngineData",
    aggregation="COUNT(*) as count, AVG(RPM) as avg_rpm, MIN(RPM) as min_rpm",
    where_clause=where_clause
)
print(df_agg)

6. Batch Processing

from datalake.batch import BatchProcessor

processor = BatchProcessor(query)

# Compute statistics across all data
stats = processor.aggregate_by_device_message(
    aggregation_func=processor.compute_statistics,
    message_filter="Engine.*"
)

for device, messages in stats.items():
    for message, metrics in messages.items():
        print(f"{device}/{message}: {metrics['count']} records")

# Export to CSV
processor.export_to_csv(
    device_id="device_001",
    message="EngineData",
    output_path="engine_export.csv",
    limit=100000
)

Running Examples

# Test connection first
python test_connection.py

# Explore data lake structure
python examples/explore_example.py

# Query and analyze data
python examples/query_example.py

# Batch processing
python examples/batch_example.py

Note: All examples use explicit credentials. Update them with your actual credentials or modify to use CloudFormation stack.

CloudFormation Stack Requirements

Required Stack Outputs

  1. DatabaseName (required)

    • Athena database name containing your tables
    • Example: canedge_datalake
  2. S3OutputLocation (required)

    • S3 bucket/path for Athena query results
    • Must end with /
    • Example: s3://my-bucket/athena-results/
    • Must have write permissions for Athena
  3. WorkGroup (optional)

    • Athena workgroup name
    • If not provided, uses default workgroup
  4. Region (optional)

    • AWS region
    • If not provided, uses default region or stack region

Example CloudFormation Template

Resources:
  AthenaDatabase:
    Type: AWS::Glue::Database
    Properties:
      CatalogId: !Ref AWS::AccountId
      DatabaseInput:
        Name: canedge_datalake

Outputs:
  DatabaseName:
    Description: Athena database name
    Value: canedge_datalake
    Export:
      Name: !Sub "${AWS::StackName}-DatabaseName"
  
  S3OutputLocation:
    Description: S3 location for Athena query results
    Value: !Sub "s3://${ResultsBucket}/athena-results/"
    Export:
      Name: !Sub "${AWS::StackName}-S3OutputLocation"
  
  WorkGroup:
    Description: Athena workgroup name
    Value: primary
    Export:
      Name: !Sub "${AWS::StackName}-WorkGroup"
  
  Region:
    Description: AWS region
    Value: !Ref AWS::Region
    Export:
      Name: !Sub "${AWS::StackName}-Region"

Performance Notes

  • Athena Query Limits: Use limit parameter to control result size
  • Partition Pruning: Date-based queries automatically use partition pruning
  • Query Costs: Athena charges per TB scanned - use column selection and filters
  • Result Caching: Athena caches query results for 24 hours
  • Concurrent Queries: Athena supports multiple concurrent queries

Troubleshooting

"Stack not found"

  • Verify stack name: aws cloudformation describe-stacks --stack-name datalake-stack
  • Check AWS credentials and region
  • Ensure you have CloudFormation read permissions

"Required output not found"

  • Verify stack outputs: aws cloudformation describe-stacks --stack-name datalake-stack --query 'Stacks[0].Outputs'
  • Ensure DatabaseName and S3OutputLocation outputs exist

"Query execution failed"

  • Check Athena permissions (Glue catalog access, S3 read permissions)
  • Verify table names exist in the database
  • Check S3 output location has write permissions

"Table not found"

  • List tables: catalog.list_tables() to see available tables
  • Verify table naming convention matches expected pattern
  • Check Glue catalog for table definitions

License

MIT

References