Spaces:
Sleeping
Sleeping
CANedge Data Lake Python SDK
Production-ready Python package for querying and analyzing CAN/LIN data lakes created from CSS Electronics CANedge MDF4 logs using AWS Athena.
Features
- AWS Athena Integration: Query Parquet data using SQL via Athena
- CloudFormation Configuration: Automatic configuration from CloudFormation stack outputs
- Scalable: Leverage Athena's distributed query engine for large datasets
- Type-safe: Full type hints and docstrings
- Well-architected: Clean module design with logging and error handling
Installation
# Clone or download project
cd CSS
# Install in development mode
pip install -e .
# Or install from requirements
pip install -r requirements.txt
Prerequisites
AWS Account with:
- CloudFormation stack named
datalake-stack(or specify custom name) - Athena database configured
- S3 bucket with Parquet data
- AWS Glue catalog with table definitions
- CloudFormation stack named
CloudFormation Stack Outputs: Your
datalake-stackmust have the following outputs:DatabaseName: Athena database nameS3OutputLocation: S3 location for Athena query results (e.g.,s3://bucket/athena-results/)WorkGroup: (Optional) Athena workgroup nameRegion: (Optional) AWS region
AWS Credentials:
- AWS CLI configured:
aws configure - Or IAM role (for EC2/ECS/Lambda)
- Or environment variables
- AWS CLI configured:
Quick Start
Option 1: Using Explicit Credentials (Recommended for Testing)
from datalake.config import DataLakeConfig
from datalake.athena import AthenaQuery
from datalake.catalog import DataLakeCatalog
from datalake.query import DataLakeQuery
# Load config with explicit credentials
config = DataLakeConfig.from_credentials(
database_name="dbparquetdatalake05",
workgroup="athenaworkgroup-datalake05",
s3_output_location="s3://canedge-raw-data-parquet/athena-results/",
region="eu-north-1",
access_key_id="YOUR_ACCESS_KEY_ID",
secret_access_key="YOUR_SECRET_ACCESS_KEY",
)
# Initialize Athena and catalog
athena = AthenaQuery(config)
catalog = DataLakeCatalog(athena, config)
query = DataLakeQuery(athena, catalog)
# List devices
devices = catalog.list_devices()
print(f"Devices: {devices}")
# Query data
df = query.read_device_message(
device_id="device_001",
message="EngineData",
date_range=("2024-01-15", "2024-01-20"),
limit=1000
)
print(f"Loaded {len(df)} records")
Option 2: Using CloudFormation Stack
from datalake.config import DataLakeConfig
from datalake.athena import AthenaQuery
from datalake.catalog import DataLakeCatalog
from datalake.query import DataLakeQuery
# Load config from CloudFormation stack
config = DataLakeConfig.from_cloudformation(
stack_name="datalake-stack",
region=None, # Auto-detect from stack or use default
profile=None, # Use default profile or IAM role
)
# Initialize Athena and catalog
athena = AthenaQuery(config)
catalog = DataLakeCatalog(athena, config)
query = DataLakeQuery(athena, catalog)
Configuration
Option 1: Using Explicit Credentials
For direct access with AWS credentials:
config = DataLakeConfig.from_credentials(
database_name="dbparquetdatalake05",
workgroup="athenaworkgroup-datalake05",
s3_output_location="s3://canedge-raw-data-parquet/athena-results/",
region="eu-north-1",
access_key_id="AKIARJQJFFVASPMSGNNY",
secret_access_key="YOUR_SECRET_KEY",
)
Parameters:
database_name: Athena database nameworkgroup: Athena workgroup names3_output_location: S3 path for query results (must end with/)region: AWS regionaccess_key_id: AWS access key IDsecret_access_key: AWS secret access key
Option 2: Using CloudFormation Stack
CloudFormation Stack Setup
Your CloudFormation stack (datalake-stack) should output:
Outputs:
DatabaseName:
Description: Athena database name
Value: canedge_datalake
S3OutputLocation:
Description: S3 location for Athena query results
Value: s3://my-bucket/athena-results/
WorkGroup:
Description: Athena workgroup name (optional)
Value: primary
Region:
Description: AWS region
Value: us-east-1
Loading Configuration
from datalake.config import DataLakeConfig
# Load from CloudFormation stack (default: 'datalake-stack')
config = DataLakeConfig.from_cloudformation()
# Or specify custom stack name
config = DataLakeConfig.from_cloudformation(
stack_name="my-custom-stack",
region="us-east-1", # Optional: override region
profile="myprofile", # Optional: use named AWS profile
)
Data Lake Structure
Athena Database Organization
The data lake is organized in Athena with:
- Database: Contains all tables (from CloudFormation output
DatabaseName) - Tables: Named by device and message (e.g.,
device_001_EngineData) - Partitions: Date-based partitioning for efficient queries
- Schema: Each table has columns:
t(timestamp), signal columns from DBC files
Table Naming Convention
Tables are typically named:
{device_id}_{message_rule}(e.g.,device_001_EngineData)- Or
{device_id}__{message_rule}(double underscore) - The catalog automatically detects the pattern
Usage Patterns
1. Explore Data Lake
from datalake.config import DataLakeConfig
from datalake.athena import AthenaQuery
from datalake.catalog import DataLakeCatalog
config = DataLakeConfig.from_cloudformation()
athena = AthenaQuery(config)
catalog = DataLakeCatalog(athena, config)
# List all tables
tables = catalog.list_tables()
print(f"Tables: {tables}")
# List devices
devices = catalog.list_devices()
print(f"Devices: {devices}")
# List messages for device
messages = catalog.list_messages("device_001")
print(f"Messages: {messages}")
# Get schema
schema = catalog.get_schema("device_001", "EngineData")
print(f"Columns: {list(schema.keys())}")
# List partitions (dates)
partitions = catalog.list_partitions("device_001", "EngineData")
print(f"Dates: {partitions}")
2. Query Data
from datalake.query import DataLakeQuery
query = DataLakeQuery(athena, catalog)
# Read all data for device/message
df = query.read_device_message(
device_id="device_001",
message="EngineData",
date_range=("2024-01-15", "2024-01-20"),
columns=["t", "RPM", "Temperature"],
limit=10000
)
print(f"Loaded {len(df)} records")
3. Time Series Query
# Query single signal over time window
df_ts = query.time_series_query(
device_id="device_001",
message="EngineData",
signal_name="RPM",
start_time=1000000000000000, # microseconds
end_time=2000000000000000,
limit=10000
)
# Convert timestamp and plot
df_ts['timestamp'] = pd.to_datetime(df_ts['t'], unit='us')
print(df_ts[['timestamp', 'RPM']].head())
4. Custom SQL Queries
# Execute custom SQL
# Note: Use path-based filtering for date ranges
# Data structure: {device_id}/{message}/{year}/{month}/{day}/file.parquet
sql = """
SELECT
COUNT(*) as record_count,
AVG(RPM) as avg_rpm,
MAX(Temperature) as max_temp
FROM canedge_datalake.device_001_EngineData
WHERE try_cast(element_at(split("$path", '/'), -4) AS INTEGER) = 2024
AND try_cast(element_at(split("$path", '/'), -3) AS INTEGER) >= 1
AND try_cast(element_at(split("$path", '/'), -2) AS INTEGER) >= 15
"""
df = query.execute_sql(sql)
print(df)
5. Aggregation Queries
# Use built-in aggregation method
# For date filtering, use path-based extraction
path_year = "try_cast(element_at(split(\"$path\", '/'), -4) AS INTEGER)"
path_month = "try_cast(element_at(split(\"$path\", '/'), -3) AS INTEGER)"
path_day = "try_cast(element_at(split(\"$path\", '/'), -2) AS INTEGER)"
where_clause = f"{path_year} = 2024 AND {path_month} >= 1 AND {path_day} >= 15"
df_agg = query.aggregate(
device_id="device_001",
message="EngineData",
aggregation="COUNT(*) as count, AVG(RPM) as avg_rpm, MIN(RPM) as min_rpm",
where_clause=where_clause
)
print(df_agg)
6. Batch Processing
from datalake.batch import BatchProcessor
processor = BatchProcessor(query)
# Compute statistics across all data
stats = processor.aggregate_by_device_message(
aggregation_func=processor.compute_statistics,
message_filter="Engine.*"
)
for device, messages in stats.items():
for message, metrics in messages.items():
print(f"{device}/{message}: {metrics['count']} records")
# Export to CSV
processor.export_to_csv(
device_id="device_001",
message="EngineData",
output_path="engine_export.csv",
limit=100000
)
Running Examples
# Test connection first
python test_connection.py
# Explore data lake structure
python examples/explore_example.py
# Query and analyze data
python examples/query_example.py
# Batch processing
python examples/batch_example.py
Note: All examples use explicit credentials. Update them with your actual credentials or modify to use CloudFormation stack.
CloudFormation Stack Requirements
Required Stack Outputs
DatabaseName (required)
- Athena database name containing your tables
- Example:
canedge_datalake
S3OutputLocation (required)
- S3 bucket/path for Athena query results
- Must end with
/ - Example:
s3://my-bucket/athena-results/ - Must have write permissions for Athena
WorkGroup (optional)
- Athena workgroup name
- If not provided, uses default workgroup
Region (optional)
- AWS region
- If not provided, uses default region or stack region
Example CloudFormation Template
Resources:
AthenaDatabase:
Type: AWS::Glue::Database
Properties:
CatalogId: !Ref AWS::AccountId
DatabaseInput:
Name: canedge_datalake
Outputs:
DatabaseName:
Description: Athena database name
Value: canedge_datalake
Export:
Name: !Sub "${AWS::StackName}-DatabaseName"
S3OutputLocation:
Description: S3 location for Athena query results
Value: !Sub "s3://${ResultsBucket}/athena-results/"
Export:
Name: !Sub "${AWS::StackName}-S3OutputLocation"
WorkGroup:
Description: Athena workgroup name
Value: primary
Export:
Name: !Sub "${AWS::StackName}-WorkGroup"
Region:
Description: AWS region
Value: !Ref AWS::Region
Export:
Name: !Sub "${AWS::StackName}-Region"
Performance Notes
- Athena Query Limits: Use
limitparameter to control result size - Partition Pruning: Date-based queries automatically use partition pruning
- Query Costs: Athena charges per TB scanned - use column selection and filters
- Result Caching: Athena caches query results for 24 hours
- Concurrent Queries: Athena supports multiple concurrent queries
Troubleshooting
"Stack not found"
- Verify stack name:
aws cloudformation describe-stacks --stack-name datalake-stack - Check AWS credentials and region
- Ensure you have CloudFormation read permissions
"Required output not found"
- Verify stack outputs:
aws cloudformation describe-stacks --stack-name datalake-stack --query 'Stacks[0].Outputs' - Ensure
DatabaseNameandS3OutputLocationoutputs exist
"Query execution failed"
- Check Athena permissions (Glue catalog access, S3 read permissions)
- Verify table names exist in the database
- Check S3 output location has write permissions
"Table not found"
- List tables:
catalog.list_tables()to see available tables - Verify table naming convention matches expected pattern
- Check Glue catalog for table definitions
License
MIT