CSS_EDA_Dashboard / src /examples /explore_example.py
arash7920's picture
Upload 35 files
29854ee verified
"""
Example: Explore data lake structure using Athena.
This script demonstrates how to discover devices, messages, dates,
and schemas in the CANedge Athena data lake.
"""
import sys
from pathlib import Path
_project_root = Path(__file__).resolve().parent.parent.parent
if str(_project_root) not in sys.path:
sys.path.insert(0, str(_project_root))
from src.datalake.config import DataLakeConfig
from src.datalake.athena import AthenaQuery
from src.datalake.catalog import DataLakeCatalog
def main():
"""Explore data lake structure."""
# Load config with explicit credentials
config = DataLakeConfig.from_credentials(
database_name="dbparquetdatalake05",
workgroup="athenaworkgroup-datalake05",
s3_output_location="s3://canedge-raw-data-parquet/athena-results/",
region="eu-north-1",
access_key_id="AKIARJQJFFVASPMSGNNY",
secret_access_key="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",
)
# Initialize Athena and catalog
athena = AthenaQuery(config)
catalog = DataLakeCatalog(athena, config)
# List available devices
print("=" * 60)
print("Exploring Data Lake (Athena)")
print("=" * 60)
print(f"Database: {config.database_name}")
print(f"Region: {config.region}")
print(f"Workgroup: {config.workgroup}")
print()
# List all tables
try:
tables = catalog.list_tables()
print(f"Found {len(tables)} table(s) in database")
if tables:
print(f"Sample tables: {tables[:10]}")
print()
except Exception as e:
print(f"Error listing tables: {e}")
return
# List devices
try:
devices = catalog.list_devices(device_filter=config.device_filter)
print(f"Found {len(devices)} device(s):")
for device in devices:
print(f" - {device}")
except Exception as e:
print(f"Error listing devices: {e}")
return
# List messages for first device
if devices:
device_id = devices[0]
print(f"\nMessages for device '{device_id}':")
try:
messages = catalog.list_messages(device_id, message_filter=config.message_filter)
for message in messages:
print(f" - {message}")
# Get schema
try:
schema = catalog.get_schema(device_id, message)
if schema:
print(f" Schema: {len(schema)} column(s)")
print(f" Columns: {', '.join(list(schema.keys())[:5])}")
if len(schema) > 5:
print(f" ... and {len(schema) - 5} more")
except Exception as e:
print(f" Error getting schema: {e}")
# Try to list partitions (dates)
try:
partitions = catalog.list_partitions(device_id, message)
if partitions:
print(f" Partitions: {len(partitions)} date(s)")
if partitions:
print(f" Date range: {partitions[0]} to {partitions[-1]}")
except Exception as e:
print(f" Could not list partitions: {e}")
print()
except Exception as e:
print(f"Error listing messages: {e}")
if __name__ == "__main__":
main()