File size: 3,949 Bytes
5760448
 
4099797
 
 
 
 
5760448
 
4099797
 
 
 
 
5760448
 
4099797
 
 
 
 
 
 
 
 
 
5760448
 
4099797
 
 
 
 
 
755099f
4099797
 
 
5760448
 
4099797
 
5760448
 
4099797
 
5760448
4099797
 
 
 
 
 
 
 
 
 
 
5760448
eadae2e
4099797
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
eadae2e
 
4099797
eadae2e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.pool import QueuePool
from fastapi import HTTPException
import asyncio
import logging
from app.db.models import Base

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Use SQLite with aiosqlite and connection pooling
DATABASE_URL = "sqlite+aiosqlite:///./sql_app.db"

# Configure the engine with connection pooling and timeouts
engine = create_async_engine(
    DATABASE_URL,
    echo=True,
    pool_size=20,  # Maximum number of connections in the pool
    max_overflow=10,  # Maximum number of connections that can be created beyond pool_size
    pool_timeout=30,  # Timeout for getting a connection from the pool
    pool_recycle=1800,  # Recycle connections after 30 minutes
    pool_pre_ping=True,  # Enable connection health checks
    poolclass=QueuePool
)

# Configure session with retry logic
AsyncSessionLocal = sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False
)

# Semaphore to limit concurrent database operations
MAX_CONCURRENT_DB_OPS = 10
db_semaphore = asyncio.Semaphore(MAX_CONCURRENT_DB_OPS)

async def get_db() -> AsyncSession:
    async with db_semaphore:  # Limit concurrent database operations
        session = AsyncSessionLocal()
        try:
            yield session
        except SQLAlchemyError as e:
            logger.error(f"Database error: {str(e)}")
            await session.rollback()
            raise HTTPException(
                status_code=503,
                detail="Database service temporarily unavailable. Please try again."
            )
        except Exception as e:
            logger.error(f"Unexpected error: {str(e)}")
            await session.rollback()
            raise HTTPException(
                status_code=500,
                detail="An unexpected error occurred. Please try again."
            )
        finally:
            await session.close()

# Rate limiting configuration
from fastapi import Request
import time
from collections import defaultdict

class RateLimiter:
    def __init__(self, requests_per_minute=60):
        self.requests_per_minute = requests_per_minute
        self.requests = defaultdict(list)
    
    def is_allowed(self, client_ip: str) -> bool:
        now = time.time()
        minute_ago = now - 60
        
        # Clean old requests
        self.requests[client_ip] = [req_time for req_time in self.requests[client_ip] 
                                  if req_time > minute_ago]
        
        # Check if allowed
        if len(self.requests[client_ip]) >= self.requests_per_minute:
            return False
        
        # Add new request
        self.requests[client_ip].append(now)
        return True

rate_limiter = RateLimiter(requests_per_minute=60)

# Add this to your database initialization
async def init_db():
    try:
        async with engine.begin() as conn:
            # Check if the database already has tables
            if not await database_exists(conn):
                logger.info("Initializing database: Creating tables...")
                await conn.run_sync(Base.metadata.create_all)
                logger.info("Database initialization completed successfully")
            else:
                logger.info("Database already exists. Skipping initialization.")
    except Exception as e:
        logger.error(f"Error initializing database: {str(e)}")
        raise

async def database_exists(conn) -> bool:
    """Check if any tables exist in the database."""
    try:
        result = await conn.run_sync(
            lambda sync_conn: sync_conn.dialect.has_table(sync_conn, "invoices")
        )
        return result
    except Exception as e:
        logger.error(f"Error checking database existence: {str(e)}")
        return False
            
            

###sdf