| """API endpoints for user-registered database connections. |
| |
| Credential schemas (DbType, PostgresCredentials, etc.) live in |
| `src/models/credentials.py` β they are imported below (with noqa: F401) so |
| FastAPI/Swagger picks them up for OpenAPI schema generation even though they |
| are not referenced by name in this file. |
| """ |
|
|
| from typing import Any, Dict, List, Literal, Optional |
| from datetime import datetime |
|
|
| from fastapi import APIRouter, Depends, HTTPException, Query, Request, status |
| from pydantic import BaseModel, Field |
| from sqlalchemy.ext.asyncio import AsyncSession |
|
|
| from src.database_client.database_client_service import database_client_service |
| from src.db.postgres.connection import get_db |
| from src.middlewares.logging import get_logger, log_execution |
| from src.middlewares.rate_limit import limiter |
| from src.models.credentials import ( |
| BigQueryCredentials, |
| CredentialSchemas, |
| DbType, |
| MysqlCredentials, |
| PostgresCredentials, |
| SnowflakeCredentials, |
| SqlServerCredentials, |
| SupabaseCredentials, |
| ) |
| from src.pipeline.db_pipeline import db_pipeline_service |
| from src.utils.db_credential_encryption import decrypt_credentials_dict |
|
|
| logger = get_logger("database_client_api") |
|
|
| router = APIRouter(prefix="/api/v1", tags=["Database Clients"]) |
|
|
|
|
| |
| |
| |
|
|
|
|
| class DatabaseClientCreate(BaseModel): |
| """ |
| Payload to register a new external database connection. |
| |
| The `credentials` object shape depends on `db_type`: |
| |
| | db_type | Required fields | |
| |-------------|----------------------------------------------------------| |
| | postgres | host, port, database, username, password, ssl_mode | |
| | mysql | host, port, database, username, password, ssl | |
| | sqlserver | host, port, database, username, password, driver? | |
| | supabase | host, port, database, username, password, ssl_mode | |
| | bigquery | project_id, dataset_id, location?, service_account_json | |
| | snowflake | account, warehouse, database, schema?, username, password, role? | |
| |
| Sensitive fields (`password`, `service_account_json`) are encrypted |
| at rest using Fernet symmetric encryption. |
| """ |
|
|
| name: str = Field(..., description="Display name for this connection.", examples=["Production DB"]) |
| db_type: DbType = Field(..., description="Type of the database engine.", examples=["postgres"]) |
| credentials: Dict[str, Any] = Field( |
| ..., |
| description="Connection credentials. Shape depends on db_type. See schema descriptions above.", |
| examples=[ |
| { |
| "host": "db.example.com", |
| "port": 5432, |
| "database": "mydb", |
| "username": "admin", |
| "password": "s3cr3t!", |
| "ssl_mode": "require", |
| } |
| ], |
| ) |
|
|
|
|
| class DatabaseClientUpdate(BaseModel): |
| """ |
| Payload to update an existing database connection. |
| |
| All fields are optional β only provided fields will be updated. |
| If `credentials` is provided, it replaces the entire credentials object |
| and sensitive fields are re-encrypted. |
| """ |
|
|
| name: Optional[str] = Field(None, description="New display name for this connection.", examples=["Staging DB"]) |
| credentials: Optional[Dict[str, Any]] = Field( |
| None, |
| description="Updated credentials object. Replaces existing credentials entirely if provided.", |
| examples=[{"host": "new-host.example.com", "port": 5432, "database": "mydb", "username": "admin", "password": "n3wP@ss!", "ssl_mode": "require"}], |
| ) |
| status: Optional[Literal["active", "inactive"]] = Field( |
| None, |
| description="Set to 'inactive' to soft-disable the connection without deleting it.", |
| examples=["inactive"], |
| ) |
|
|
|
|
| class DatabaseClientResponse(BaseModel): |
| """ |
| Database connection record returned by the API. |
| |
| Credentials are **never** included in the response for security reasons. |
| """ |
|
|
| id: str = Field(..., description="Unique identifier of the database connection.") |
| user_id: str = Field(..., description="ID of the user who owns this connection.") |
| name: str = Field(..., description="Display name of the connection.") |
| db_type: str = Field(..., description="Database engine type.") |
| status: str = Field(..., description="Connection status: 'active' or 'inactive'.") |
| created_at: datetime = Field(..., description="Timestamp when the connection was registered.") |
| updated_at: Optional[datetime] = Field(None, description="Timestamp of the last update, if any.") |
|
|
| model_config = {"from_attributes": True} |
|
|
|
|
| |
| |
| |
|
|
|
|
| @router.post( |
| "/database-clients", |
| response_model=DatabaseClientResponse, |
| status_code=status.HTTP_201_CREATED, |
| summary="Register a new database connection", |
| response_description="The newly created database connection record (credentials excluded).", |
| responses={ |
| 201: {"description": "Connection registered successfully."}, |
| 422: {"description": "Validation error β check the credentials shape for the given db_type."}, |
| 500: {"description": "Internal server error."}, |
| }, |
| ) |
| @limiter.limit("10/minute") |
| @log_execution(logger) |
| async def create_database_client( |
| request: Request, |
| payload: DatabaseClientCreate, |
| user_id: str = Query(..., description="ID of the user registering the connection."), |
| db: AsyncSession = Depends(get_db), |
| ): |
| """ |
| Register a new external database connection for a user. |
| |
| The `credentials` object must match the shape for the chosen `db_type` |
| (see **CredentialSchemas** in the schema section below for exact fields). |
| Sensitive fields (`password`, `service_account_json`) are encrypted |
| before being persisted β they are never returned in any response. |
| """ |
| try: |
| client = await database_client_service.create( |
| db=db, |
| user_id=user_id, |
| name=payload.name, |
| db_type=payload.db_type, |
| credentials=payload.credentials, |
| ) |
| return DatabaseClientResponse.model_validate(client) |
| except Exception as e: |
| logger.error(f"Failed to create database client for user {user_id}", error=str(e)) |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail=f"Failed to create database client: {str(e)}", |
| ) |
|
|
|
|
| @router.get( |
| "/database-clients/{user_id}", |
| response_model=List[DatabaseClientResponse], |
| summary="List all database connections for a user", |
| response_description="List of database connections (credentials excluded).", |
| responses={ |
| 200: {"description": "Returns an empty list if the user has no connections."}, |
| }, |
| ) |
| @log_execution(logger) |
| async def list_database_clients( |
| user_id: str, |
| db: AsyncSession = Depends(get_db), |
| ): |
| """ |
| Return all database connections registered by the specified user, |
| ordered by creation date (newest first). |
| |
| Credentials are never included in the response. |
| """ |
| clients = await database_client_service.get_user_clients(db, user_id) |
| return [DatabaseClientResponse.model_validate(c) for c in clients] |
|
|
|
|
| @router.get( |
| "/database-clients/{user_id}/{client_id}", |
| response_model=DatabaseClientResponse, |
| summary="Get a single database connection", |
| response_description="Database connection detail (credentials excluded).", |
| responses={ |
| 404: {"description": "Connection not found."}, |
| 403: {"description": "Access denied β user_id does not own this connection."}, |
| }, |
| ) |
| @log_execution(logger) |
| async def get_database_client( |
| user_id: str, |
| client_id: str, |
| db: AsyncSession = Depends(get_db), |
| ): |
| """ |
| Return the detail of a single database connection. |
| |
| Returns **403** if the `user_id` in the path does not match the owner |
| of the requested connection. |
| """ |
| client = await database_client_service.get(db, client_id) |
|
|
| if not client: |
| raise HTTPException(status_code=404, detail="Database client not found") |
|
|
| if client.user_id != user_id: |
| raise HTTPException(status_code=403, detail="Access denied") |
|
|
| return DatabaseClientResponse.model_validate(client) |
|
|
|
|
| @router.put( |
| "/database-clients/{client_id}", |
| response_model=DatabaseClientResponse, |
| summary="Update a database connection", |
| response_description="Updated database connection record (credentials excluded).", |
| responses={ |
| 404: {"description": "Connection not found."}, |
| 403: {"description": "Access denied β user_id does not own this connection."}, |
| }, |
| ) |
| @log_execution(logger) |
| async def update_database_client( |
| client_id: str, |
| payload: DatabaseClientUpdate, |
| user_id: str = Query(..., description="ID of the user who owns the connection."), |
| db: AsyncSession = Depends(get_db), |
| ): |
| """ |
| Update an existing database connection. |
| |
| Only fields present in the request body are updated. |
| If `credentials` is provided it **replaces** the entire credentials object |
| and sensitive fields are re-encrypted automatically. |
| """ |
| client = await database_client_service.get(db, client_id) |
|
|
| if not client: |
| raise HTTPException(status_code=404, detail="Database client not found") |
|
|
| if client.user_id != user_id: |
| raise HTTPException(status_code=403, detail="Access denied") |
|
|
| updated = await database_client_service.update( |
| db=db, |
| client_id=client_id, |
| name=payload.name, |
| credentials=payload.credentials, |
| status=payload.status, |
| ) |
| return DatabaseClientResponse.model_validate(updated) |
|
|
|
|
| @router.delete( |
| "/database-clients/{client_id}", |
| status_code=status.HTTP_200_OK, |
| summary="Delete a database connection", |
| responses={ |
| 200: {"description": "Connection deleted successfully."}, |
| 404: {"description": "Connection not found."}, |
| 403: {"description": "Access denied β user_id does not own this connection."}, |
| }, |
| ) |
| @log_execution(logger) |
| async def delete_database_client( |
| client_id: str, |
| user_id: str = Query(..., description="ID of the user who owns the connection."), |
| db: AsyncSession = Depends(get_db), |
| ): |
| """ |
| Permanently delete a database connection. |
| |
| This action is irreversible. The stored credentials are also removed. |
| """ |
| client = await database_client_service.get(db, client_id) |
|
|
| if not client: |
| raise HTTPException(status_code=404, detail="Database client not found") |
|
|
| if client.user_id != user_id: |
| raise HTTPException(status_code=403, detail="Access denied") |
|
|
| await database_client_service.delete(db, client_id) |
| return {"status": "success", "message": "Database client deleted successfully"} |
|
|
|
|
| @router.post( |
| "/database-clients/{client_id}/ingest", |
| status_code=status.HTTP_200_OK, |
| summary="Ingest schema from a registered database into the vector store", |
| response_description="Count of chunks ingested.", |
| responses={ |
| 200: {"description": "Ingestion completed successfully."}, |
| 403: {"description": "Access denied β user_id does not own this connection."}, |
| 404: {"description": "Connection not found."}, |
| 501: {"description": "The connection's db_type is not yet supported by the pipeline."}, |
| 500: {"description": "Ingestion failed (connection error, profiling error, etc.)."}, |
| }, |
| ) |
| @limiter.limit("5/minute") |
| @log_execution(logger) |
| async def ingest_database_client( |
| request: Request, |
| client_id: str, |
| user_id: str = Query(..., description="ID of the user who owns the connection."), |
| db: AsyncSession = Depends(get_db), |
| ): |
| """ |
| Decrypt the stored credentials, connect to the user's database, introspect |
| its schema, profile each column, embed the descriptions, and store them in |
| the shared PGVector collection tagged with `source_type="database"`. |
| |
| Chunks become retrievable via the same retriever used for document chunks. |
| """ |
| client = await database_client_service.get(db, client_id) |
|
|
| if not client: |
| raise HTTPException(status_code=404, detail="Database client not found") |
|
|
| if client.user_id != user_id: |
| raise HTTPException(status_code=403, detail="Access denied") |
|
|
| creds = decrypt_credentials_dict(client.credentials) |
|
|
| try: |
| with db_pipeline_service.engine_scope( |
| db_type=client.db_type, |
| credentials=creds, |
| ) as engine: |
| total = await db_pipeline_service.run(user_id=user_id, engine=engine) |
| except NotImplementedError as e: |
| raise HTTPException(status_code=status.HTTP_501_NOT_IMPLEMENTED, detail=str(e)) |
| except Exception as e: |
| logger.error( |
| f"Ingestion failed for client {client_id}", user_id=user_id, error=str(e) |
| ) |
| raise HTTPException( |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
| detail=f"Ingestion failed: {e}", |
| ) |
|
|
| return {"status": "success", "client_id": client_id, "chunks_ingested": total} |
|
|