Spaces:
Runtime error
Runtime error
| """ | |
| Connection pooling utilities for database and Redis connections. | |
| This module provides optimized connection pooling for both PostgreSQL and Redis | |
| to improve performance under concurrent load. | |
| """ | |
| import os | |
| import logging | |
| from typing import Optional, Dict, Any | |
| from contextlib import contextmanager | |
| import redis | |
| from redis.connection import ConnectionPool | |
| from sqlalchemy import create_engine, event | |
| from sqlalchemy.engine import Engine | |
| from sqlalchemy.pool import QueuePool, StaticPool | |
| logger = logging.getLogger(__name__) | |
| class DatabaseConnectionPool: | |
| """Manages optimized database connection pooling.""" | |
| def __init__(self, database_url: str, **kwargs): | |
| """ | |
| Initialize database connection pool. | |
| Args: | |
| database_url: Database connection URL | |
| **kwargs: Additional engine options | |
| """ | |
| self.database_url = database_url | |
| # Default pool configuration optimized for chat workload | |
| default_config = { | |
| 'pool_size': int(os.getenv('DB_POOL_SIZE', '10')), | |
| 'max_overflow': int(os.getenv('DB_MAX_OVERFLOW', '20')), | |
| 'pool_recycle': int(os.getenv('DB_POOL_RECYCLE', '3600')), # 1 hour | |
| 'pool_pre_ping': True, # Validate connections before use | |
| 'pool_timeout': int(os.getenv('DB_POOL_TIMEOUT', '30')), | |
| 'echo': os.getenv('SQLALCHEMY_ECHO', 'False').lower() == 'true' | |
| } | |
| # Override with provided kwargs | |
| default_config.update(kwargs) | |
| # Create engine with optimized settings | |
| self.engine = create_engine( | |
| database_url, | |
| poolclass=QueuePool, | |
| **default_config | |
| ) | |
| # Add connection event listeners for monitoring | |
| self._setup_connection_events() | |
| logger.info(f"Database connection pool initialized", extra={ | |
| 'pool_size': default_config['pool_size'], | |
| 'max_overflow': default_config['max_overflow'], | |
| 'pool_recycle': default_config['pool_recycle'] | |
| }) | |
| def _setup_connection_events(self): | |
| """Setup SQLAlchemy event listeners for connection monitoring.""" | |
| def set_sqlite_pragma(dbapi_connection, connection_record): | |
| """Set SQLite pragmas for better performance (if using SQLite).""" | |
| if 'sqlite' in self.database_url.lower(): | |
| cursor = dbapi_connection.cursor() | |
| cursor.execute("PRAGMA foreign_keys=ON") | |
| cursor.execute("PRAGMA journal_mode=WAL") | |
| cursor.execute("PRAGMA synchronous=NORMAL") | |
| cursor.execute("PRAGMA cache_size=10000") | |
| cursor.execute("PRAGMA temp_store=MEMORY") | |
| cursor.close() | |
| def receive_checkout(dbapi_connection, connection_record, connection_proxy): | |
| """Log connection checkout for monitoring.""" | |
| logger.debug("Database connection checked out from pool") | |
| def receive_checkin(dbapi_connection, connection_record): | |
| """Log connection checkin for monitoring.""" | |
| logger.debug("Database connection returned to pool") | |
| def get_pool_status(self) -> Dict[str, Any]: | |
| """ | |
| Get current pool status for monitoring. | |
| Returns: | |
| Dictionary with pool statistics | |
| """ | |
| pool = self.engine.pool | |
| status = { | |
| 'pool_size': pool.size(), | |
| 'checked_in': pool.checkedin(), | |
| 'checked_out': pool.checkedout(), | |
| 'overflow': pool.overflow() | |
| } | |
| # Add invalid count if available (not all pool types have this) | |
| if hasattr(pool, 'invalid'): | |
| status['invalid'] = pool.invalid() | |
| else: | |
| status['invalid'] = 0 | |
| return status | |
| def get_connection(self): | |
| """ | |
| Context manager for getting database connections. | |
| Yields: | |
| Database connection from the pool | |
| """ | |
| connection = self.engine.connect() | |
| try: | |
| yield connection | |
| finally: | |
| connection.close() | |
| class RedisConnectionPool: | |
| """Manages optimized Redis connection pooling.""" | |
| def __init__(self, redis_url: str, **kwargs): | |
| """ | |
| Initialize Redis connection pool. | |
| Args: | |
| redis_url: Redis connection URL | |
| **kwargs: Additional pool options | |
| """ | |
| self.redis_url = redis_url | |
| # Default pool configuration optimized for chat workload | |
| default_config = { | |
| 'max_connections': int(os.getenv('REDIS_MAX_CONNECTIONS', '20')), | |
| 'retry_on_timeout': True, | |
| 'socket_timeout': int(os.getenv('REDIS_SOCKET_TIMEOUT', '5')), | |
| 'socket_connect_timeout': int(os.getenv('REDIS_CONNECT_TIMEOUT', '5')), | |
| 'socket_keepalive': True, | |
| 'socket_keepalive_options': {}, | |
| 'health_check_interval': int(os.getenv('REDIS_HEALTH_CHECK_INTERVAL', '30')) | |
| } | |
| # Override with provided kwargs | |
| default_config.update(kwargs) | |
| # Create connection pool | |
| self.connection_pool = ConnectionPool.from_url( | |
| redis_url, | |
| **default_config | |
| ) | |
| # Create Redis client with the pool | |
| self.redis_client = redis.Redis(connection_pool=self.connection_pool) | |
| # Test connection | |
| try: | |
| self.redis_client.ping() | |
| logger.info(f"Redis connection pool initialized", extra={ | |
| 'max_connections': default_config['max_connections'], | |
| 'socket_timeout': default_config['socket_timeout'] | |
| }) | |
| except redis.RedisError as e: | |
| logger.error(f"Failed to initialize Redis connection pool: {e}") | |
| raise | |
| def get_client(self) -> redis.Redis: | |
| """ | |
| Get Redis client with connection pooling. | |
| Returns: | |
| Redis client instance | |
| """ | |
| return self.redis_client | |
| def get_pool_status(self) -> Dict[str, Any]: | |
| """ | |
| Get current pool status for monitoring. | |
| Returns: | |
| Dictionary with pool statistics | |
| """ | |
| pool = self.connection_pool | |
| status = { | |
| 'max_connections': getattr(pool, 'max_connections', 0), | |
| } | |
| # Add connection counts if available (attributes may vary by Redis version) | |
| if hasattr(pool, '_created_connections'): | |
| status['created_connections'] = pool._created_connections | |
| elif hasattr(pool, 'created_connections'): | |
| status['created_connections'] = pool.created_connections | |
| else: | |
| status['created_connections'] = 0 | |
| if hasattr(pool, '_available_connections'): | |
| status['available_connections'] = len(pool._available_connections) | |
| else: | |
| status['available_connections'] = 0 | |
| if hasattr(pool, '_in_use_connections'): | |
| status['in_use_connections'] = len(pool._in_use_connections) | |
| else: | |
| status['in_use_connections'] = 0 | |
| return status | |
| def health_check(self) -> bool: | |
| """ | |
| Perform health check on Redis connection. | |
| Returns: | |
| True if Redis is healthy, False otherwise | |
| """ | |
| try: | |
| self.redis_client.ping() | |
| return True | |
| except redis.RedisError as e: | |
| logger.warning(f"Redis health check failed: {e}") | |
| return False | |
| def close(self): | |
| """Close all connections in the pool.""" | |
| try: | |
| self.connection_pool.disconnect() | |
| logger.info("Redis connection pool closed") | |
| except Exception as e: | |
| logger.error(f"Error closing Redis connection pool: {e}") | |
| class ConnectionPoolManager: | |
| """Manages both database and Redis connection pools.""" | |
| def __init__(self, database_url: str, redis_url: Optional[str] = None): | |
| """ | |
| Initialize connection pool manager. | |
| Args: | |
| database_url: Database connection URL | |
| redis_url: Redis connection URL (optional) | |
| """ | |
| self.database_pool = DatabaseConnectionPool(database_url) | |
| self.redis_pool = None | |
| if redis_url and redis_url != 'None': | |
| try: | |
| self.redis_pool = RedisConnectionPool(redis_url) | |
| except Exception as e: | |
| logger.warning(f"Failed to initialize Redis pool: {e}") | |
| logger.info("Connection pool manager initialized") | |
| def get_database_engine(self) -> Engine: | |
| """Get database engine with connection pooling.""" | |
| return self.database_pool.engine | |
| def get_redis_client(self) -> Optional[redis.Redis]: | |
| """Get Redis client with connection pooling.""" | |
| return self.redis_pool.get_client() if self.redis_pool else None | |
| def get_status(self) -> Dict[str, Any]: | |
| """ | |
| Get status of all connection pools. | |
| Returns: | |
| Dictionary with pool status information | |
| """ | |
| status = { | |
| 'database': self.database_pool.get_pool_status(), | |
| 'redis': None | |
| } | |
| if self.redis_pool: | |
| status['redis'] = self.redis_pool.get_pool_status() | |
| status['redis']['healthy'] = self.redis_pool.health_check() | |
| return status | |
| def close_all(self): | |
| """Close all connection pools.""" | |
| try: | |
| self.database_pool.engine.dispose() | |
| logger.info("Database connection pool closed") | |
| except Exception as e: | |
| logger.error(f"Error closing database pool: {e}") | |
| if self.redis_pool: | |
| self.redis_pool.close() | |
| # Global connection pool manager instance | |
| _connection_pool_manager: Optional[ConnectionPoolManager] = None | |
| def initialize_connection_pools(database_url: str, redis_url: Optional[str] = None) -> ConnectionPoolManager: | |
| """ | |
| Initialize global connection pool manager. | |
| Args: | |
| database_url: Database connection URL | |
| redis_url: Redis connection URL (optional) | |
| Returns: | |
| ConnectionPoolManager instance | |
| """ | |
| global _connection_pool_manager | |
| if _connection_pool_manager is None: | |
| _connection_pool_manager = ConnectionPoolManager(database_url, redis_url) | |
| return _connection_pool_manager | |
| def get_connection_pool_manager() -> Optional[ConnectionPoolManager]: | |
| """ | |
| Get the global connection pool manager. | |
| Returns: | |
| ConnectionPoolManager instance or None if not initialized | |
| """ | |
| return _connection_pool_manager | |
| def cleanup_connection_pools(): | |
| """Cleanup all connection pools.""" | |
| global _connection_pool_manager | |
| if _connection_pool_manager: | |
| _connection_pool_manager.close_all() | |
| _connection_pool_manager = None |