Spaces:
Running
Running
# import logging | |
# from typing import Dict, List, Optional, AsyncGenerator | |
# from pydantic import BaseSettings, PostgresDsn | |
# import pg8000 | |
# from pg8000 import Connection, Cursor | |
# from pg8000.exceptions import DatabaseError | |
# import asyncio | |
# from contextlib import asynccontextmanager | |
# from dataclasses import dataclass | |
# from threading import Lock | |
# # Set up structured logging | |
# logging.basicConfig( | |
# level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" | |
# ) | |
# logger = logging.getLogger(__name__) | |
# class DatabaseSettings(BaseSettings): | |
# db_url: PostgresDsn | |
# pool_size: int = 5 | |
# class Config: | |
# env_file = ".env" | |
# @dataclass | |
# class DatabaseConfig: | |
# username: str | |
# password: str | |
# hostname: str | |
# port: int | |
# database: str | |
# class DatabaseError(Exception): | |
# """Custom exception for database errors.""" | |
# pass | |
# class Database: | |
# def __init__(self, db_url: str, pool_size: int): | |
# self.db_url = db_url | |
# self.pool_size = pool_size | |
# self.pool: List[Connection] = [] | |
# self.lock = Lock() | |
# self.config = self._parse_db_url() | |
# def _parse_db_url(self) -> DatabaseConfig: | |
# """Parse the database URL into components.""" | |
# result = urlparse(self.db_url) | |
# return DatabaseConfig( | |
# username=result.username, | |
# password=result.password, | |
# hostname=result.hostname, | |
# port=result.port or 5432, | |
# database=result.path.lstrip("/"), | |
# ) | |
# async def connect(self) -> None: | |
# """Create a connection pool.""" | |
# try: | |
# for _ in range(self.pool_size): | |
# conn = await self._create_connection() | |
# self.pool.append(conn) | |
# logger.info( | |
# f"Database connection pool created with {self.pool_size} connections." | |
# ) | |
# except DatabaseError as e: | |
# logger.error(f"Failed to create database connection pool: {e}") | |
# raise | |
# async def _create_connection(self) -> Connection: | |
# """Create a single database connection.""" | |
# try: | |
# conn = pg8000.connect( | |
# user=self.config.username, | |
# password=self.config.password, | |
# host=self.config.hostname, | |
# port=self.config.port, | |
# database=self.config.database, | |
# ) | |
# return conn | |
# except DatabaseError as e: | |
# logger.error(f"Failed to create database connection: {e}") | |
# raise DatabaseError("Failed to create database connection.") | |
# async def disconnect(self) -> None: | |
# """Close all connections in the pool.""" | |
# with self.lock: | |
# for conn in self.pool: | |
# conn.close() | |
# self.pool.clear() | |
# logger.info("Database connection pool closed.") | |
# @asynccontextmanager | |
# async def get_connection(self) -> AsyncGenerator[Connection, None]: | |
# """Acquire a connection from the pool.""" | |
# with self.lock: | |
# if not self.pool: | |
# raise DatabaseError("Database connection pool is empty.") | |
# conn = self.pool.pop() | |
# try: | |
# yield conn | |
# finally: | |
# with self.lock: | |
# self.pool.append(conn) | |
# async def fetch(self, query: str, *args) -> List[Dict]: | |
# """ | |
# Execute a SELECT query and return the results as a list of dictionaries. | |
# Args: | |
# query (str): The SQL query to execute. | |
# *args: Query parameters. | |
# Returns: | |
# List[Dict]: A list of dictionaries where keys are column names and values are column values. | |
# """ | |
# try: | |
# async with self.get_connection() as conn: | |
# cursor: Cursor = conn.cursor() | |
# cursor.execute(query, args) | |
# rows = cursor.fetchall() | |
# columns = [desc[0] for desc in cursor.description] | |
# return [dict(zip(columns, row)) for row in rows] | |
# except DatabaseError as e: | |
# logger.error(f"Error executing query: {query}. Error: {e}") | |
# raise DatabaseError(f"Failed to execute query: {query}") | |
# async def execute(self, query: str, *args) -> None: | |
# """ | |
# Execute an INSERT, UPDATE, or DELETE query. | |
# Args: | |
# query (str): The SQL query to execute. | |
# *args: Query parameters. | |
# """ | |
# try: | |
# async with self.get_connection() as conn: | |
# cursor: Cursor = conn.cursor() | |
# cursor.execute(query, args) | |
# conn.commit() | |
# except DatabaseError as e: | |
# logger.error(f"Error executing query: {query}. Error: {e}") | |
# raise DatabaseError(f"Failed to execute query: {query}") | |
# # Dependency to get the database instance | |
# async def get_db() -> AsyncGenerator[Database, None]: | |
# settings = DatabaseSettings() | |
# db = Database(db_url=settings.db_url, pool_size=settings.pool_size) | |
# await db.connect() | |
# try: | |
# yield db | |
# finally: | |
# await db.disconnect() | |
# # Example usage | |
# if __name__ == "__main__": | |
# async def main(): | |
# settings = DatabaseSettings() | |
# db = Database(db_url=settings.db_url, pool_size=settings.pool_size) | |
# await db.connect() | |
# try: | |
# # Example query | |
# query = """ | |
# SELECT | |
# ppt.type AS product_type, | |
# pc.name AS product_category | |
# FROM | |
# product_producttype ppt | |
# INNER JOIN | |
# product_category pc | |
# ON | |
# ppt.category_id = pc.id | |
# """ | |
# result = await db.fetch(query) | |
# print(result) | |
# finally: | |
# await db.disconnect() | |
# asyncio.run(main()) | |
# import logging | |
# from urllib.parse import urlparse | |
# from typing import Dict, List, Optional, AsyncGenerator | |
# from pydantic_settings import BaseSettings | |
# from pydantic import PostgresDsn | |
# import pg8000 | |
# from pg8000 import Connection, Cursor | |
# from pg8000.exceptions import DatabaseError | |
# import asyncio | |
# from contextlib import asynccontextmanager | |
# from dataclasses import dataclass | |
# from threading import Lock | |
# # Set up structured logging | |
# logging.basicConfig( | |
# level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" | |
# ) | |
# logger = logging.getLogger(__name__) | |
# class DatabaseSettings(BaseSettings): | |
# db_url: PostgresDsn | |
# pool_size: int = 5 | |
# class Config: | |
# env_file = ".env" | |
# @dataclass | |
# class DatabaseConfig: | |
# username: str | |
# password: str | |
# hostname: str | |
# port: int | |
# database: str | |
# class DatabaseError(Exception): | |
# """Custom exception for database errors.""" | |
# pass | |
# class Database: | |
# def __init__(self, db_url: str, pool_size: int): | |
# self.db_url = db_url | |
# self.pool_size = pool_size | |
# self.pool: List[Connection] = [] | |
# self.lock = Lock() | |
# self.config = self._parse_db_url() | |
# def _parse_db_url(self) -> DatabaseConfig: | |
# """Parse the database URL into components.""" | |
# # Convert PostgresDsn to a string | |
# db_url_str = str(self.db_url) | |
# result = urlparse(db_url_str) | |
# return DatabaseConfig( | |
# username=result.username, | |
# password=result.password, | |
# hostname=result.hostname, | |
# port=result.port or 5432, | |
# database=result.path.lstrip("/"), | |
# ) | |
# async def connect(self) -> None: | |
# """Create a connection pool.""" | |
# try: | |
# for _ in range(self.pool_size): | |
# conn = await self._create_connection() | |
# self.pool.append(conn) | |
# logger.info( | |
# f"Database connection pool created with {self.pool_size} connections." | |
# ) | |
# except DatabaseError as e: | |
# logger.error(f"Failed to create database connection pool: {e}") | |
# raise | |
# async def _create_connection(self) -> Connection: | |
# """Create a single database connection.""" | |
# try: | |
# conn = pg8000.connect( | |
# user=self.config.username, | |
# password=self.config.password, | |
# host=self.config.hostname, | |
# port=self.config.port, | |
# database=self.config.database, | |
# ) | |
# return conn | |
# except DatabaseError as e: | |
# logger.error(f"Failed to create database connection: {e}") | |
# raise DatabaseError("Failed to create database connection.") | |
# async def disconnect(self) -> None: | |
# """Close all connections in the pool.""" | |
# with self.lock: | |
# for conn in self.pool: | |
# conn.close() | |
# self.pool.clear() | |
# logger.info("Database connection pool closed.") | |
# @asynccontextmanager | |
# async def get_connection(self) -> AsyncGenerator[Connection, None]: | |
# """Acquire a connection from the pool.""" | |
# with self.lock: | |
# if not self.pool: | |
# raise DatabaseError("Database connection pool is empty.") | |
# conn = self.pool.pop() | |
# try: | |
# yield conn | |
# finally: | |
# with self.lock: | |
# self.pool.append(conn) | |
# async def fetch(self, query: str, *args) -> List[Dict]: | |
# """ | |
# Execute a SELECT query and return the results as a list of dictionaries. | |
# Args: | |
# query (str): The SQL query to execute. | |
# *args: Query parameters. | |
# Returns: | |
# List[Dict]: A list of dictionaries where keys are column names and values are column values. | |
# """ | |
# try: | |
# async with self.get_connection() as conn: | |
# cursor: Cursor = conn.cursor() | |
# cursor.execute(query, args) | |
# rows = cursor.fetchall() | |
# columns = [desc[0] for desc in cursor.description] | |
# return [dict(zip(columns, row)) for row in rows] | |
# except DatabaseError as e: | |
# logger.error(f"Error executing query: {query}. Error: {e}") | |
# raise DatabaseError(f"Failed to execute query: {query}") | |
# async def execute(self, query: str, *args) -> None: | |
# """ | |
# Execute an INSERT, UPDATE, or DELETE query. | |
# Args: | |
# query (str): The SQL query to execute. | |
# *args: Query parameters. | |
# """ | |
# try: | |
# async with self.get_connection() as conn: | |
# cursor: Cursor = conn.cursor() | |
# cursor.execute(query, args) | |
# conn.commit() | |
# except DatabaseError as e: | |
# logger.error(f"Error executing query: {query}. Error: {e}") | |
# raise DatabaseError(f"Failed to execute query: {query}") | |
# # Dependency to get the database instance | |
# async def get_db() -> AsyncGenerator[Database, None]: | |
# settings = DatabaseSettings() | |
# db = Database(db_url=settings.db_url, pool_size=settings.pool_size) | |
# await db.connect() | |
# try: | |
# yield db | |
# finally: | |
# await db.disconnect() | |
# # Example usage | |
# if __name__ == "__main__": | |
# async def main(): | |
# settings = DatabaseSettings() | |
# db = Database(db_url=settings.db_url, pool_size=settings.pool_size) | |
# await db.connect() | |
# try: | |
# # Example query | |
# query = "SELECT * FROM your_table LIMIT 10" | |
# query = """ | |
# SELECT | |
# ppt.type AS product_type, | |
# pc.name AS product_category | |
# FROM | |
# product_producttype ppt | |
# INNER JOIN | |
# product_category pc | |
# ON | |
# ppt.category_id = pc.id | |
# """ | |
# result = await db.fetch(query) | |
# print(result) | |
# finally: | |
# await db.disconnect() | |
# asyncio.run(main()) | |
import logging | |
from typing import AsyncGenerator, List, Optional, Dict | |
from pydantic_settings import BaseSettings | |
from pydantic import PostgresDsn | |
import pg8000 | |
from pg8000 import Connection | |
from pg8000.exceptions import DatabaseError as Pg8000DatabaseError | |
import asyncio | |
from contextlib import asynccontextmanager | |
from threading import Lock | |
from urllib.parse import urlparse | |
# Set up structured logging | |
logging.basicConfig( | |
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" | |
) | |
logger = logging.getLogger(__name__) | |
class DatabaseSettings(BaseSettings): | |
db_url: PostgresDsn | |
pool_size: int = 5 # Default pool size is 5 | |
class Config: | |
env_file = ".env" | |
# Custom database errors | |
class DatabaseError(Exception): | |
"""Base exception for database errors.""" | |
pass | |
class ConnectionError(DatabaseError): | |
"""Exception raised when a database connection fails.""" | |
pass | |
class PoolExhaustedError(DatabaseError): | |
"""Exception raised when the connection pool is exhausted.""" | |
pass | |
class QueryExecutionError(DatabaseError): | |
"""Exception raised when a query execution fails.""" | |
pass | |
class HealthCheckError(DatabaseError): | |
"""Exception raised when a health check fails.""" | |
pass | |
class Database: | |
def __init__(self, db_url: PostgresDsn, pool_size: int): | |
self.db_url = db_url | |
self.pool_size = pool_size | |
self.pool: List[Connection] = [] | |
self.lock = Lock() | |
async def connect(self) -> None: | |
"""Create a connection pool.""" | |
try: | |
# Convert PostgresDsn to a string | |
db_url_str = str(self.db_url) | |
result = urlparse(db_url_str) | |
for _ in range(self.pool_size): | |
conn = pg8000.connect( | |
user=result.username, | |
password=result.password, | |
host=result.hostname, | |
port=result.port or 5432, | |
database=result.path.lstrip("/"), | |
) | |
self.pool.append(conn) | |
logger.info( | |
f"Database connection pool created with {self.pool_size} connections." | |
) | |
except Pg8000DatabaseError as e: | |
logger.error(f"Failed to create database connection pool: {e}") | |
raise ConnectionError("Failed to create database connection pool.") from e | |
async def disconnect(self) -> None: | |
"""Close all connections in the pool.""" | |
with self.lock: | |
for conn in self.pool: | |
conn.close() | |
self.pool.clear() | |
logger.info("Database connection pool closed.") | |
async def get_connection(self) -> AsyncGenerator[Connection, None]: | |
"""Acquire a connection from the pool.""" | |
with self.lock: | |
if not self.pool: | |
logger.error("Connection pool is exhausted.") | |
raise PoolExhaustedError("No available connections in the pool.") | |
conn = self.pool.pop() | |
try: | |
yield conn | |
except Pg8000DatabaseError as e: | |
logger.error(f"Connection error: {e}") | |
raise ConnectionError("Failed to use database connection.") from e | |
finally: | |
with self.lock: | |
self.pool.append(conn) | |
async def fetch(self, query: str, *args) -> List[Dict]: | |
""" | |
Execute a SELECT query and return the results as a list of dictionaries. | |
Args: | |
query (str): The SQL query to execute. | |
*args: Query parameters. | |
Returns: | |
List[Dict]: A list of dictionaries where keys are column names and values are column values. | |
Raises: | |
QueryExecutionError: If the query execution fails. | |
""" | |
try: | |
async with self.get_connection() as conn: | |
cursor = conn.cursor() | |
cursor.execute(query, args) | |
rows = cursor.fetchall() | |
columns = [desc[0] for desc in cursor.description] | |
return [dict(zip(columns, row)) for row in rows] | |
except Pg8000DatabaseError as e: | |
logger.error(f"Query execution failed: {e}") | |
raise QueryExecutionError(f"Failed to execute query: {query}") from e | |
async def execute(self, query: str, *args) -> None: | |
""" | |
Execute an INSERT, UPDATE, or DELETE query. | |
Args: | |
query (str): The SQL query to execute. | |
*args: Query parameters. | |
Raises: | |
QueryExecutionError: If the query execution fails. | |
""" | |
try: | |
async with self.get_connection() as conn: | |
cursor = conn.cursor() | |
cursor.execute(query, args) | |
conn.commit() | |
except Pg8000DatabaseError as e: | |
logger.error(f"Query execution failed: {e}") | |
raise QueryExecutionError(f"Failed to execute query: {query}") from e | |
async def health_check(self) -> bool: | |
""" | |
Perform a health check by executing a simple query (e.g., SELECT 1). | |
Returns: | |
bool: True if the database is healthy, False otherwise. | |
Raises: | |
HealthCheckError: If the health check fails. | |
""" | |
try: | |
async with self.get_connection() as conn: | |
cursor = conn.cursor() | |
cursor.execute("SELECT 1") | |
result = cursor.fetchone() | |
cursor.close() | |
# Check if the result is as expected | |
if result and result[0] == 1: | |
logger.info("Database health check succeeded.") | |
return True | |
else: | |
logger.error("Database health check failed: Unexpected result.") | |
raise HealthCheckError("Unexpected result from health check query.") | |
except Pg8000DatabaseError as e: | |
logger.error(f"Health check failed: {e}") | |
raise HealthCheckError("Failed to perform health check.") from e | |
# Dependency to get the database instance | |
async def get_db() -> AsyncGenerator[Database, None]: | |
settings = DatabaseSettings() | |
db = Database(db_url=settings.db_url, pool_size=settings.pool_size) | |
await db.connect() | |
try: | |
yield db | |
finally: | |
await db.disconnect() | |
# Example usage | |
if __name__ == "__main__": | |
async def main(): | |
settings = DatabaseSettings() | |
db = Database(db_url=settings.db_url, pool_size=settings.pool_size) | |
await db.connect() | |
try: | |
# Perform a health check | |
is_healthy = await db.health_check() | |
print(f"Database health check: {'Success' if is_healthy else 'Failure'}") | |
except HealthCheckError as e: | |
print(f"Health check failed: {e}") | |
finally: | |
await db.disconnect() | |
asyncio.run(main()) | |