Spaces:
Sleeping
Sleeping
| """ | |
| Example: Explore data lake structure using Athena. | |
| This script demonstrates how to discover devices, messages, dates, | |
| and schemas in the CANedge Athena data lake. | |
| """ | |
| import sys | |
| from pathlib import Path | |
| _project_root = Path(__file__).resolve().parent.parent.parent | |
| if str(_project_root) not in sys.path: | |
| sys.path.insert(0, str(_project_root)) | |
| from src.datalake.config import DataLakeConfig | |
| from src.datalake.athena import AthenaQuery | |
| from src.datalake.catalog import DataLakeCatalog | |
| def main(): | |
| """Explore data lake structure.""" | |
| # Load config with explicit credentials | |
| config = DataLakeConfig.from_credentials( | |
| database_name="dbparquetdatalake05", | |
| workgroup="athenaworkgroup-datalake05", | |
| s3_output_location="s3://canedge-raw-data-parquet/athena-results/", | |
| region="eu-north-1", | |
| access_key_id="AKIARJQJFFVASPMSGNNY", | |
| secret_access_key="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", | |
| ) | |
| # Initialize Athena and catalog | |
| athena = AthenaQuery(config) | |
| catalog = DataLakeCatalog(athena, config) | |
| # List available devices | |
| print("=" * 60) | |
| print("Exploring Data Lake (Athena)") | |
| print("=" * 60) | |
| print(f"Database: {config.database_name}") | |
| print(f"Region: {config.region}") | |
| print(f"Workgroup: {config.workgroup}") | |
| print() | |
| # List all tables | |
| try: | |
| tables = catalog.list_tables() | |
| print(f"Found {len(tables)} table(s) in database") | |
| if tables: | |
| print(f"Sample tables: {tables[:10]}") | |
| print() | |
| except Exception as e: | |
| print(f"Error listing tables: {e}") | |
| return | |
| # List devices | |
| try: | |
| devices = catalog.list_devices(device_filter=config.device_filter) | |
| print(f"Found {len(devices)} device(s):") | |
| for device in devices: | |
| print(f" - {device}") | |
| except Exception as e: | |
| print(f"Error listing devices: {e}") | |
| return | |
| # List messages for first device | |
| if devices: | |
| device_id = devices[0] | |
| print(f"\nMessages for device '{device_id}':") | |
| try: | |
| messages = catalog.list_messages(device_id, message_filter=config.message_filter) | |
| for message in messages: | |
| print(f" - {message}") | |
| # Get schema | |
| try: | |
| schema = catalog.get_schema(device_id, message) | |
| if schema: | |
| print(f" Schema: {len(schema)} column(s)") | |
| print(f" Columns: {', '.join(list(schema.keys())[:5])}") | |
| if len(schema) > 5: | |
| print(f" ... and {len(schema) - 5} more") | |
| except Exception as e: | |
| print(f" Error getting schema: {e}") | |
| # Try to list partitions (dates) | |
| try: | |
| partitions = catalog.list_partitions(device_id, message) | |
| if partitions: | |
| print(f" Partitions: {len(partitions)} date(s)") | |
| if partitions: | |
| print(f" Date range: {partitions[0]} to {partitions[-1]}") | |
| except Exception as e: | |
| print(f" Could not list partitions: {e}") | |
| print() | |
| except Exception as e: | |
| print(f"Error listing messages: {e}") | |
| if __name__ == "__main__": | |
| main() | |