| | """ |
| | PostgreSQL connection pool management. |
| | Provides async connection pool for PostgreSQL operations. |
| | """ |
| | import asyncpg |
| | import ssl |
| | from typing import Optional, Dict, Any |
| | from app.core.logging import get_logger |
| | from app.core.config import settings |
| | import time |
| |
|
| | logger = get_logger(__name__) |
| |
|
| |
|
| | class PostgreSQLConnectionPool: |
| | """Singleton class to manage PostgreSQL connection pool""" |
| | _pool: Optional[asyncpg.Pool] = None |
| | |
| | _metrics = { |
| | "connections_acquired": 0, |
| | "connections_released": 0, |
| | "connections_failed": 0, |
| | "health_check_failures": 0, |
| | "acquisition_times": [], |
| | } |
| |
|
| | @classmethod |
| | async def initialize(cls) -> None: |
| | """Initialize PostgreSQL connection pool.""" |
| | if cls._pool is not None: |
| | logger.warning("PostgreSQL connection pool already initialized") |
| | return |
| |
|
| | try: |
| | logger.info("Initializing PostgreSQL connection pool", extra={ |
| | "host": settings.POSTGRES_HOST, |
| | "port": settings.POSTGRES_PORT, |
| | "database": settings.POSTGRES_DB, |
| | "user": settings.POSTGRES_USER, |
| | "min_pool_size": settings.POSTGRES_MIN_POOL_SIZE, |
| | "max_pool_size": settings.POSTGRES_MAX_POOL_SIZE |
| | }) |
| |
|
| | |
| | ssl_context = None |
| | mode = (settings.POSTGRES_SSL_MODE or "disable").lower() |
| | if mode != "disable": |
| | if mode == "verify-full": |
| | ssl_context = ssl.create_default_context() |
| | ssl_context.check_hostname = True |
| | ssl_context.verify_mode = ssl.CERT_REQUIRED |
| | else: |
| | ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) |
| | ssl_context.check_hostname = False |
| | ssl_context.verify_mode = ssl.CERT_NONE |
| | logger.info("PostgreSQL pool SSL enabled", extra={"ssl_mode": settings.POSTGRES_SSL_MODE}) |
| |
|
| | |
| | cls._pool = await asyncpg.create_pool( |
| | host=settings.POSTGRES_HOST, |
| | port=settings.POSTGRES_PORT, |
| | database=settings.POSTGRES_DB, |
| | user=settings.POSTGRES_USER, |
| | password=settings.POSTGRES_PASSWORD, |
| | min_size=settings.POSTGRES_MIN_POOL_SIZE, |
| | max_size=settings.POSTGRES_MAX_POOL_SIZE, |
| | command_timeout=30.0, |
| | timeout=30.0, |
| | ssl=ssl_context, |
| | ) |
| |
|
| | |
| | async with cls._pool.acquire() as conn: |
| | await conn.fetchval("SELECT 1") |
| |
|
| | logger.info("PostgreSQL connection pool initialized successfully") |
| |
|
| | except Exception as e: |
| | logger.error("Failed to initialize PostgreSQL connection pool", exc_info=e) |
| | raise |
| |
|
| | @classmethod |
| | async def get_connection(cls) -> asyncpg.Connection: |
| | """Acquire a connection from the pool.""" |
| | if cls._pool is None: |
| | raise RuntimeError("PostgreSQL connection pool not initialized. Call initialize() first.") |
| |
|
| | start_time = time.time() |
| | |
| | try: |
| | conn = await cls._pool.acquire() |
| | |
| | |
| | try: |
| | await conn.fetchval("SELECT 1") |
| | except Exception as health_check_error: |
| | logger.warning("Connection health check failed, releasing dead connection", |
| | exc_info=health_check_error) |
| | cls._metrics["health_check_failures"] += 1 |
| | await cls._pool.release(conn) |
| | conn = await cls._pool.acquire() |
| | await conn.fetchval("SELECT 1") |
| | |
| | acquisition_time = (time.time() - start_time) * 1000 |
| | cls._metrics["connections_acquired"] += 1 |
| | cls._metrics["acquisition_times"].append(acquisition_time) |
| | |
| | if len(cls._metrics["acquisition_times"]) > 1000: |
| | cls._metrics["acquisition_times"] = cls._metrics["acquisition_times"][-1000:] |
| | |
| | return conn |
| | |
| | except Exception as e: |
| | cls._metrics["connections_failed"] += 1 |
| | logger.error("Failed to acquire PostgreSQL connection", exc_info=e) |
| | raise |
| |
|
| | @classmethod |
| | async def release_connection(cls, conn: asyncpg.Connection) -> None: |
| | """Release a connection back to the pool.""" |
| | if cls._pool is None: |
| | raise RuntimeError("PostgreSQL connection pool not initialized") |
| |
|
| | try: |
| | await cls._pool.release(conn) |
| | cls._metrics["connections_released"] += 1 |
| | except Exception as e: |
| | logger.error("Failed to release PostgreSQL connection", exc_info=e) |
| |
|
| | @classmethod |
| | async def close(cls) -> None: |
| | """Close all connections in the pool.""" |
| | if cls._pool is None: |
| | logger.warning("PostgreSQL connection pool not initialized, nothing to close") |
| | return |
| |
|
| | try: |
| | logger.info("Closing PostgreSQL connection pool") |
| | await cls._pool.close() |
| | cls._pool = None |
| | logger.info("PostgreSQL connection pool closed successfully") |
| | except Exception as e: |
| | logger.error("Error closing PostgreSQL connection pool", exc_info=e) |
| | cls._pool = None |
| |
|
| | @classmethod |
| | def is_initialized(cls) -> bool: |
| | """Check if connection pool is initialized.""" |
| | return cls._pool is not None |
| |
|
| |
|
| | |
| | async def connect_to_postgres() -> None: |
| | """Initialize PostgreSQL connection pool.""" |
| | await PostgreSQLConnectionPool.initialize() |
| |
|
| |
|
| | async def close_postgres_connection() -> None: |
| | """Close PostgreSQL connection pool.""" |
| | await PostgreSQLConnectionPool.close() |
| |
|
| |
|
| | async def get_postgres_connection() -> asyncpg.Connection: |
| | """Get a connection from the PostgreSQL pool.""" |
| | return await PostgreSQLConnectionPool.get_connection() |
| |
|
| |
|
| | async def release_postgres_connection(conn: asyncpg.Connection) -> None: |
| | """Release a connection back to the pool.""" |
| | await PostgreSQLConnectionPool.release_connection(conn) |
| |
|
| |
|
| | def is_postgres_connected() -> bool: |
| | """Check if PostgreSQL connection pool is initialized.""" |
| | return PostgreSQLConnectionPool.is_initialized() |
| |
|