jebin2's picture
db services
50c20bf
"""
BaseQuery - Base class for all query types with shared filtering logic.
Uses DBServiceConfig for plug-and-play configuration of models and columns.
"""
import os
import logging
from typing import Type
from fastapi import HTTPException, status as http_status
from sqlalchemy import Select
from sqlalchemy.ext.asyncio import AsyncSession
from core.models import User
from services.db_service.config import DBServiceConfig
logger = logging.getLogger(__name__)
class BaseQuery:
"""
Base class for all query operations.
Uses DBServiceConfig for model scopes and column names.
Configuration must be registered at application startup.
"""
def __init__(self, user: User, db: AsyncSession, is_system: bool = False):
"""Initialize base query with user and database session."""
# Ensure config is registered
DBServiceConfig.assert_registered()
self.user = user
self.db = db
self._is_system = is_system
self._config = DBServiceConfig
@property
def is_admin(self) -> bool:
"""Check if current user is an admin."""
admin_emails_str = os.getenv("ADMIN_EMAILS", "")
if not admin_emails_str:
return False
admin_emails = [email.strip() for email in admin_emails_str.split(",")]
is_admin = self.user.email in admin_emails
if is_admin:
logger.info(f"Admin access granted for {self.user.email}")
return is_admin
def _verify_operation_access(self, model_class: Type, operation: str) -> None:
"""
Check if user has permission for this operation on this model.
Permission hierarchy: SYSTEM > ADMIN > USER
Uses DBServiceConfig for scope checking.
"""
# System operations have highest priority
if self._is_system:
logger.info(f"System operation: {operation} on {model_class.__name__}")
return
# Admins can do anything
if self.is_admin:
return
# Map operation to config scope sets
admin_only_sets = {
'read': self._config.admin_read_only,
'create': self._config.admin_create_only,
'update': self._config.admin_update_only,
'delete': self._config.admin_delete_only,
}
user_allowed_sets = {
'read': self._config.user_read_scoped,
'create': self._config.user_create_scoped,
'update': self._config.user_update_scoped,
'delete': self._config.user_delete_scoped,
}
system_only_sets = {
'read': self._config.system_read_scoped,
'create': self._config.system_create_scoped,
'update': self._config.system_update_scoped,
'delete': self._config.system_delete_scoped,
}
admin_only = admin_only_sets.get(operation, set())
user_allowed = user_allowed_sets.get(operation, set())
system_only = system_only_sets.get(operation, set())
# Check if this is system-only operation
if model_class in system_only and model_class not in user_allowed and model_class not in admin_only:
raise HTTPException(
status_code=http_status.HTTP_403_FORBIDDEN,
detail=f"Only system processes can {operation} {model_class.__name__}"
)
# Check if admin-only
if model_class in admin_only:
raise HTTPException(
status_code=http_status.HTTP_403_FORBIDDEN,
detail=f"Only administrators can {operation} {model_class.__name__}"
)
# Check if user is allowed
if model_class not in user_allowed:
raise HTTPException(
status_code=http_status.HTTP_403_FORBIDDEN,
detail=f"You do not have permission to {operation} {model_class.__name__}"
)
def _apply_ownership_filter(self, stmt, model_class: Type, operation: str):
"""
Shared method to apply ownership filter for UPDATE/DELETE operations.
Uses DBServiceConfig.user_filter_column for filtering.
"""
# Admins can modify all records
if self.is_admin:
logger.info(f"Admin {self.user.email} {operation}ing {model_class.__name__} records")
return stmt
# Non-admins can only modify their own records
filter_column = self._config.user_filter_column
if hasattr(model_class, filter_column):
user_id_col = getattr(model_class, filter_column)
stmt = stmt.where(user_id_col == self.user.id)
logger.info(f"User {self.user.email} {operation}ing own {model_class.__name__} records")
return stmt
def _verify_admin_access(self, query: Select) -> None:
"""
Check if query is for admin-only models (READ operation).
Uses DBServiceConfig.admin_read_only.
"""
if self.is_admin:
return
# Check if querying admin-only models
if hasattr(query, 'column_descriptions'):
for description in query.column_descriptions:
entity = description.get('entity') or description.get('type')
if entity in self._config.admin_read_only:
raise HTTPException(
status_code=http_status.HTTP_403_FORBIDDEN,
detail=f"Only administrators can read {entity.__name__}"
)
# Check froms for admin-only models
for from_clause in query.froms:
table_class = from_clause.entity_namespace.get('__class__')
if table_class in self._config.admin_read_only:
raise HTTPException(
status_code=http_status.HTTP_403_FORBIDDEN,
detail=f"Only administrators can read {table_class.__name__}"
)
def _apply_user_filter(self, query: Select) -> Select:
"""
Automatically apply user_id filter to READ queries.
Uses DBServiceConfig for model scopes and filter column.
"""
# First check if this is an admin-only query
self._verify_admin_access(query)
# Admins see all data
if self.is_admin:
logger.debug(f"Admin query - no user filter applied")
return query
# Get filter column from config
filter_column = self._config.user_filter_column
special_user_model = self._config.special_user_model
user_id_column = self._config.user_id_column
# Detect which model is being queried
if hasattr(query, 'column_descriptions'):
for description in query.column_descriptions:
entity = description.get('entity') or description.get('type')
if entity in self._config.user_read_scoped:
logger.debug(f"Applying user filter to {entity.__name__} query")
# Special handling for User model (uses id instead of user_id)
if entity == special_user_model:
user_col = getattr(entity, user_id_column)
return query.where(user_col == self.user.id)
# Standard user_id filtering
if hasattr(entity, filter_column):
user_col = getattr(entity, filter_column)
return query.where(user_col == self.user.id)
# Check froms
for from_clause in query.froms:
table_class = from_clause.entity_namespace.get('__class__')
if table_class in self._config.user_read_scoped:
logger.debug(f"Applying user filter to {table_class.__name__} query")
# Special handling for User model
if table_class == special_user_model:
user_col = getattr(table_class, user_id_column)
return query.where(user_col == self.user.id)
# Standard user_id filtering
if hasattr(table_class, filter_column):
user_col = getattr(table_class, filter_column)
return query.where(user_col == self.user.id)
return query
def _filter_deleted(self, query: Select) -> Select:
"""
Add filter to exclude soft-deleted records.
Uses DBServiceConfig.soft_delete_column.
"""
delete_column = self._config.soft_delete_column
# Detect which model is being queried
if hasattr(query, 'column_descriptions'):
for description in query.column_descriptions:
entity = description.get('entity') or description.get('type')
if entity and hasattr(entity, delete_column):
deleted_at_col = getattr(entity, delete_column)
return query.where(deleted_at_col == None)
# Check froms
for from_clause in query.froms:
table_class = from_clause.entity_namespace.get('__class__')
if table_class and hasattr(table_class, delete_column):
deleted_at_col = getattr(table_class, delete_column)
return query.where(deleted_at_col == None)
return query