Spaces:
Running
Running
| import os | |
| import logging | |
| from typing import List, Any | |
| from dotenv import load_dotenv | |
| import psycopg2 # For PostgreSQL | |
| import pymysql # For MySQL | |
| from urllib.parse import urlparse, parse_qs | |
| from datetime import datetime | |
| from pydantic import BaseModel | |
| # Load environment variables | |
| load_dotenv() | |
| mysql_uris_env = os.getenv("AIVEN_MYSQL_URIS", "") | |
| postgres_uris_env = os.getenv("AIVEN_POSTGRES_URIS", "") | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[logging.StreamHandler()] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Constants | |
| HEARTBEAT_TABLE = "service_health_heartbeat" | |
| # ---------- Pydantic Response Model ---------- | |
| class AivenPingResponse(BaseModel): | |
| service_name: str | |
| success: bool | |
| error: str | None = None | |
| time: str | |
| host: str | None = None | |
| db_type: str | None = None | |
| class AivenAllPingResponse(BaseModel): | |
| aiven_services: List[AivenPingResponse] | |
| # ---------- Helper: URI Parsing ---------- | |
| def get_uris_from_env(env_str: str) -> List[str]: | |
| if not env_str: | |
| return [] | |
| return [uri.strip() for uri in env_str.split(',') if uri.strip()] | |
| # ========================================== | |
| # MySQL CRUD Operations (Modular) | |
| # ========================================== | |
| def mysql_ensure_table(cursor): | |
| """Creates the heartbeat table if it doesn't exist.""" | |
| sql = f""" | |
| CREATE TABLE IF NOT EXISTS {HEARTBEAT_TABLE} ( | |
| id INT AUTO_INCREMENT PRIMARY KEY, | |
| check_time DATETIME, | |
| message VARCHAR(255) | |
| ) | |
| """ | |
| cursor.execute(sql) | |
| logger.debug("MySQL: Table checked/created.") | |
| def mysql_insert_data(cursor) -> int: | |
| """Inserts a record and returns the ID.""" | |
| sql = f"INSERT INTO {HEARTBEAT_TABLE} (check_time, message) VALUES (NOW(), 'Health Check Init')" | |
| cursor.execute(sql) | |
| row_id = cursor.lastrowid | |
| logger.debug(f"MySQL: Inserted record ID {row_id}") | |
| return row_id | |
| def mysql_update_data(cursor, row_id: int): | |
| """Updates the record.""" | |
| sql = f"UPDATE {HEARTBEAT_TABLE} SET message = 'Health Check Updated' WHERE id = %s" | |
| cursor.execute(sql, (row_id,)) | |
| logger.debug(f"MySQL: Updated record ID {row_id}") | |
| def mysql_delete_data(cursor, row_id: int): | |
| """Deletes the record.""" | |
| sql = f"DELETE FROM {HEARTBEAT_TABLE} WHERE id = %s" | |
| cursor.execute(sql, (row_id,)) | |
| logger.debug(f"MySQL: Deleted record ID {row_id}") | |
| def execute_mysql_crud_cycle(connection): | |
| """Orchestrates the CRUD cycle for MySQL.""" | |
| with connection.cursor() as cursor: | |
| mysql_ensure_table(cursor) | |
| pk = mysql_insert_data(cursor) | |
| mysql_update_data(cursor, pk) | |
| mysql_delete_data(cursor, pk) | |
| connection.commit() # Commit transaction to finalize | |
| # ========================================== | |
| # PostgreSQL CRUD Operations (Modular) | |
| # ========================================== | |
| def pg_ensure_table(cursor): | |
| """Creates the heartbeat table if it doesn't exist.""" | |
| sql = f""" | |
| CREATE TABLE IF NOT EXISTS {HEARTBEAT_TABLE} ( | |
| id SERIAL PRIMARY KEY, | |
| check_time TIMESTAMP, | |
| message TEXT | |
| ) | |
| """ | |
| cursor.execute(sql) | |
| logger.debug("Postgres: Table checked/created.") | |
| def pg_insert_data(cursor) -> int: | |
| """Inserts a record and returns the ID.""" | |
| sql = f"INSERT INTO {HEARTBEAT_TABLE} (check_time, message) VALUES (NOW(), 'Health Check Init') RETURNING id" | |
| cursor.execute(sql) | |
| row_id = cursor.fetchone()[0] | |
| logger.debug(f"Postgres: Inserted record ID {row_id}") | |
| return row_id | |
| def pg_update_data(cursor, row_id: int): | |
| """Updates the record.""" | |
| sql = f"UPDATE {HEARTBEAT_TABLE} SET message = 'Health Check Updated' WHERE id = %s" | |
| cursor.execute(sql, (row_id,)) | |
| logger.debug(f"Postgres: Updated record ID {row_id}") | |
| def pg_delete_data(cursor, row_id: int): | |
| """Deletes the record.""" | |
| sql = f"DELETE FROM {HEARTBEAT_TABLE} WHERE id = %s" | |
| cursor.execute(sql, (row_id,)) | |
| logger.debug(f"Postgres: Deleted record ID {row_id}") | |
| def execute_postgres_crud_cycle(connection): | |
| """Orchestrates the CRUD cycle for PostgreSQL.""" | |
| cursor = connection.cursor() | |
| try: | |
| pg_ensure_table(cursor) | |
| pk = pg_insert_data(cursor) | |
| pg_update_data(cursor, pk) | |
| pg_delete_data(cursor, pk) | |
| connection.commit() # Commit transaction | |
| except Exception as e: | |
| connection.rollback() | |
| raise e | |
| finally: | |
| cursor.close() | |
| # ========================================== | |
| # Main Checkers | |
| # ========================================== | |
| def check_mysql(uri: str) -> AivenPingResponse: | |
| now = datetime.utcnow().isoformat() | |
| host = "unknown" | |
| try: | |
| parsed_uri = urlparse(uri) | |
| host = parsed_uri.hostname | |
| port = parsed_uri.port or 3306 | |
| user = parsed_uri.username | |
| password = parsed_uri.password | |
| database = parsed_uri.path.lstrip('/') or 'defaultdb' | |
| query_params = parse_qs(parsed_uri.query) | |
| ssl_mode = query_params.get('ssl-mode', ['REQUIRED'])[0] | |
| connection = pymysql.connect( | |
| host=host, port=port, user=user, password=password, | |
| database=database, ssl={'mode': ssl_mode} if ssl_mode else None | |
| ) | |
| # Run the CRUD Cycle | |
| execute_mysql_crud_cycle(connection) | |
| connection.close() | |
| logger.info(f"MySQL CRUD check successful: {host}") | |
| return AivenPingResponse( | |
| service_name=database, success=True, time=now, host=host, db_type="mysql" | |
| ) | |
| except Exception as e: | |
| logger.error(f"MySQL CRUD failed: {str(e)}") | |
| return AivenPingResponse( | |
| service_name="mysql_service", success=False, error=str(e), time=now, host=host, db_type="mysql" | |
| ) | |
| def check_postgres(uri: str) -> AivenPingResponse: | |
| now = datetime.utcnow().isoformat() | |
| host = "unknown" | |
| try: | |
| parsed_uri = urlparse(uri) | |
| host = parsed_uri.hostname | |
| db_name = parsed_uri.path.lstrip('/') | |
| connection = psycopg2.connect(uri) | |
| # Run the CRUD Cycle | |
| execute_postgres_crud_cycle(connection) | |
| connection.close() | |
| logger.info(f"Postgres CRUD check successful: {host}") | |
| return AivenPingResponse( | |
| service_name=db_name, success=True, time=now, host=host, db_type="postgres" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Postgres CRUD failed: {str(e)}") | |
| return AivenPingResponse( | |
| service_name="postgres_service", success=False, error=str(e), time=now, host=host, db_type="postgres" | |
| ) | |
| # ========================================== | |
| # Entry Point | |
| # ========================================== | |
| def ping_aiven_projects() -> AivenAllPingResponse: | |
| logger.info("Starting Deep Health Checks (CRUD)...") | |
| results = [] | |
| mysql_uris = get_uris_from_env(mysql_uris_env) | |
| postgres_uris = get_uris_from_env(postgres_uris_env) | |
| for uri in mysql_uris: | |
| results.append(check_mysql(uri)) | |
| for uri in postgres_uris: | |
| results.append(check_postgres(uri)) | |
| return AivenAllPingResponse(aiven_services=results) | |