backend / aiven_keep_alive_service.py
Soumik Bose
go
1eebb1e
import os
import logging
from typing import List, Any
from dotenv import load_dotenv
import psycopg2 # For PostgreSQL
import pymysql # For MySQL
from urllib.parse import urlparse, parse_qs
from datetime import datetime
from pydantic import BaseModel
# Load environment variables
load_dotenv()
mysql_uris_env = os.getenv("AIVEN_MYSQL_URIS", "")
postgres_uris_env = os.getenv("AIVEN_POSTGRES_URIS", "")
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
# Constants
HEARTBEAT_TABLE = "service_health_heartbeat"
# ---------- Pydantic Response Model ----------
class AivenPingResponse(BaseModel):
service_name: str
success: bool
error: str | None = None
time: str
host: str | None = None
db_type: str | None = None
class AivenAllPingResponse(BaseModel):
aiven_services: List[AivenPingResponse]
# ---------- Helper: URI Parsing ----------
def get_uris_from_env(env_str: str) -> List[str]:
if not env_str:
return []
return [uri.strip() for uri in env_str.split(',') if uri.strip()]
# ==========================================
# MySQL CRUD Operations (Modular)
# ==========================================
def mysql_ensure_table(cursor):
"""Creates the heartbeat table if it doesn't exist."""
sql = f"""
CREATE TABLE IF NOT EXISTS {HEARTBEAT_TABLE} (
id INT AUTO_INCREMENT PRIMARY KEY,
check_time DATETIME,
message VARCHAR(255)
)
"""
cursor.execute(sql)
logger.debug("MySQL: Table checked/created.")
def mysql_insert_data(cursor) -> int:
"""Inserts a record and returns the ID."""
sql = f"INSERT INTO {HEARTBEAT_TABLE} (check_time, message) VALUES (NOW(), 'Health Check Init')"
cursor.execute(sql)
row_id = cursor.lastrowid
logger.debug(f"MySQL: Inserted record ID {row_id}")
return row_id
def mysql_update_data(cursor, row_id: int):
"""Updates the record."""
sql = f"UPDATE {HEARTBEAT_TABLE} SET message = 'Health Check Updated' WHERE id = %s"
cursor.execute(sql, (row_id,))
logger.debug(f"MySQL: Updated record ID {row_id}")
def mysql_delete_data(cursor, row_id: int):
"""Deletes the record."""
sql = f"DELETE FROM {HEARTBEAT_TABLE} WHERE id = %s"
cursor.execute(sql, (row_id,))
logger.debug(f"MySQL: Deleted record ID {row_id}")
def execute_mysql_crud_cycle(connection):
"""Orchestrates the CRUD cycle for MySQL."""
with connection.cursor() as cursor:
mysql_ensure_table(cursor)
pk = mysql_insert_data(cursor)
mysql_update_data(cursor, pk)
mysql_delete_data(cursor, pk)
connection.commit() # Commit transaction to finalize
# ==========================================
# PostgreSQL CRUD Operations (Modular)
# ==========================================
def pg_ensure_table(cursor):
"""Creates the heartbeat table if it doesn't exist."""
sql = f"""
CREATE TABLE IF NOT EXISTS {HEARTBEAT_TABLE} (
id SERIAL PRIMARY KEY,
check_time TIMESTAMP,
message TEXT
)
"""
cursor.execute(sql)
logger.debug("Postgres: Table checked/created.")
def pg_insert_data(cursor) -> int:
"""Inserts a record and returns the ID."""
sql = f"INSERT INTO {HEARTBEAT_TABLE} (check_time, message) VALUES (NOW(), 'Health Check Init') RETURNING id"
cursor.execute(sql)
row_id = cursor.fetchone()[0]
logger.debug(f"Postgres: Inserted record ID {row_id}")
return row_id
def pg_update_data(cursor, row_id: int):
"""Updates the record."""
sql = f"UPDATE {HEARTBEAT_TABLE} SET message = 'Health Check Updated' WHERE id = %s"
cursor.execute(sql, (row_id,))
logger.debug(f"Postgres: Updated record ID {row_id}")
def pg_delete_data(cursor, row_id: int):
"""Deletes the record."""
sql = f"DELETE FROM {HEARTBEAT_TABLE} WHERE id = %s"
cursor.execute(sql, (row_id,))
logger.debug(f"Postgres: Deleted record ID {row_id}")
def execute_postgres_crud_cycle(connection):
"""Orchestrates the CRUD cycle for PostgreSQL."""
cursor = connection.cursor()
try:
pg_ensure_table(cursor)
pk = pg_insert_data(cursor)
pg_update_data(cursor, pk)
pg_delete_data(cursor, pk)
connection.commit() # Commit transaction
except Exception as e:
connection.rollback()
raise e
finally:
cursor.close()
# ==========================================
# Main Checkers
# ==========================================
def check_mysql(uri: str) -> AivenPingResponse:
now = datetime.utcnow().isoformat()
host = "unknown"
try:
parsed_uri = urlparse(uri)
host = parsed_uri.hostname
port = parsed_uri.port or 3306
user = parsed_uri.username
password = parsed_uri.password
database = parsed_uri.path.lstrip('/') or 'defaultdb'
query_params = parse_qs(parsed_uri.query)
ssl_mode = query_params.get('ssl-mode', ['REQUIRED'])[0]
connection = pymysql.connect(
host=host, port=port, user=user, password=password,
database=database, ssl={'mode': ssl_mode} if ssl_mode else None
)
# Run the CRUD Cycle
execute_mysql_crud_cycle(connection)
connection.close()
logger.info(f"MySQL CRUD check successful: {host}")
return AivenPingResponse(
service_name=database, success=True, time=now, host=host, db_type="mysql"
)
except Exception as e:
logger.error(f"MySQL CRUD failed: {str(e)}")
return AivenPingResponse(
service_name="mysql_service", success=False, error=str(e), time=now, host=host, db_type="mysql"
)
def check_postgres(uri: str) -> AivenPingResponse:
now = datetime.utcnow().isoformat()
host = "unknown"
try:
parsed_uri = urlparse(uri)
host = parsed_uri.hostname
db_name = parsed_uri.path.lstrip('/')
connection = psycopg2.connect(uri)
# Run the CRUD Cycle
execute_postgres_crud_cycle(connection)
connection.close()
logger.info(f"Postgres CRUD check successful: {host}")
return AivenPingResponse(
service_name=db_name, success=True, time=now, host=host, db_type="postgres"
)
except Exception as e:
logger.error(f"Postgres CRUD failed: {str(e)}")
return AivenPingResponse(
service_name="postgres_service", success=False, error=str(e), time=now, host=host, db_type="postgres"
)
# ==========================================
# Entry Point
# ==========================================
def ping_aiven_projects() -> AivenAllPingResponse:
logger.info("Starting Deep Health Checks (CRUD)...")
results = []
mysql_uris = get_uris_from_env(mysql_uris_env)
postgres_uris = get_uris_from_env(postgres_uris_env)
for uri in mysql_uris:
results.append(check_mysql(uri))
for uri in postgres_uris:
results.append(check_postgres(uri))
return AivenAllPingResponse(aiven_services=results)