Spaces:
Sleeping
Sleeping
| """ | |
| BaseQuery - Base class for all query types with shared filtering logic. | |
| Uses DBServiceConfig for plug-and-play configuration of models and columns. | |
| """ | |
| import os | |
| import logging | |
| from typing import Type | |
| from fastapi import HTTPException, status as http_status | |
| from sqlalchemy import Select | |
| from sqlalchemy.ext.asyncio import AsyncSession | |
| from core.models import User | |
| from services.db_service.config import DBServiceConfig | |
| logger = logging.getLogger(__name__) | |
| class BaseQuery: | |
| """ | |
| Base class for all query operations. | |
| Uses DBServiceConfig for model scopes and column names. | |
| Configuration must be registered at application startup. | |
| """ | |
| def __init__(self, user: User, db: AsyncSession, is_system: bool = False): | |
| """Initialize base query with user and database session.""" | |
| # Ensure config is registered | |
| DBServiceConfig.assert_registered() | |
| self.user = user | |
| self.db = db | |
| self._is_system = is_system | |
| self._config = DBServiceConfig | |
| def is_admin(self) -> bool: | |
| """Check if current user is an admin.""" | |
| admin_emails_str = os.getenv("ADMIN_EMAILS", "") | |
| if not admin_emails_str: | |
| return False | |
| admin_emails = [email.strip() for email in admin_emails_str.split(",")] | |
| is_admin = self.user.email in admin_emails | |
| if is_admin: | |
| logger.info(f"Admin access granted for {self.user.email}") | |
| return is_admin | |
| def _verify_operation_access(self, model_class: Type, operation: str) -> None: | |
| """ | |
| Check if user has permission for this operation on this model. | |
| Permission hierarchy: SYSTEM > ADMIN > USER | |
| Uses DBServiceConfig for scope checking. | |
| """ | |
| # System operations have highest priority | |
| if self._is_system: | |
| logger.info(f"System operation: {operation} on {model_class.__name__}") | |
| return | |
| # Admins can do anything | |
| if self.is_admin: | |
| return | |
| # Map operation to config scope sets | |
| admin_only_sets = { | |
| 'read': self._config.admin_read_only, | |
| 'create': self._config.admin_create_only, | |
| 'update': self._config.admin_update_only, | |
| 'delete': self._config.admin_delete_only, | |
| } | |
| user_allowed_sets = { | |
| 'read': self._config.user_read_scoped, | |
| 'create': self._config.user_create_scoped, | |
| 'update': self._config.user_update_scoped, | |
| 'delete': self._config.user_delete_scoped, | |
| } | |
| system_only_sets = { | |
| 'read': self._config.system_read_scoped, | |
| 'create': self._config.system_create_scoped, | |
| 'update': self._config.system_update_scoped, | |
| 'delete': self._config.system_delete_scoped, | |
| } | |
| admin_only = admin_only_sets.get(operation, set()) | |
| user_allowed = user_allowed_sets.get(operation, set()) | |
| system_only = system_only_sets.get(operation, set()) | |
| # Check if this is system-only operation | |
| if model_class in system_only and model_class not in user_allowed and model_class not in admin_only: | |
| raise HTTPException( | |
| status_code=http_status.HTTP_403_FORBIDDEN, | |
| detail=f"Only system processes can {operation} {model_class.__name__}" | |
| ) | |
| # Check if admin-only | |
| if model_class in admin_only: | |
| raise HTTPException( | |
| status_code=http_status.HTTP_403_FORBIDDEN, | |
| detail=f"Only administrators can {operation} {model_class.__name__}" | |
| ) | |
| # Check if user is allowed | |
| if model_class not in user_allowed: | |
| raise HTTPException( | |
| status_code=http_status.HTTP_403_FORBIDDEN, | |
| detail=f"You do not have permission to {operation} {model_class.__name__}" | |
| ) | |
| def _apply_ownership_filter(self, stmt, model_class: Type, operation: str): | |
| """ | |
| Shared method to apply ownership filter for UPDATE/DELETE operations. | |
| Uses DBServiceConfig.user_filter_column for filtering. | |
| """ | |
| # Admins can modify all records | |
| if self.is_admin: | |
| logger.info(f"Admin {self.user.email} {operation}ing {model_class.__name__} records") | |
| return stmt | |
| # Non-admins can only modify their own records | |
| filter_column = self._config.user_filter_column | |
| if hasattr(model_class, filter_column): | |
| user_id_col = getattr(model_class, filter_column) | |
| stmt = stmt.where(user_id_col == self.user.id) | |
| logger.info(f"User {self.user.email} {operation}ing own {model_class.__name__} records") | |
| return stmt | |
| def _verify_admin_access(self, query: Select) -> None: | |
| """ | |
| Check if query is for admin-only models (READ operation). | |
| Uses DBServiceConfig.admin_read_only. | |
| """ | |
| if self.is_admin: | |
| return | |
| # Check if querying admin-only models | |
| if hasattr(query, 'column_descriptions'): | |
| for description in query.column_descriptions: | |
| entity = description.get('entity') or description.get('type') | |
| if entity in self._config.admin_read_only: | |
| raise HTTPException( | |
| status_code=http_status.HTTP_403_FORBIDDEN, | |
| detail=f"Only administrators can read {entity.__name__}" | |
| ) | |
| # Check froms for admin-only models | |
| for from_clause in query.froms: | |
| table_class = from_clause.entity_namespace.get('__class__') | |
| if table_class in self._config.admin_read_only: | |
| raise HTTPException( | |
| status_code=http_status.HTTP_403_FORBIDDEN, | |
| detail=f"Only administrators can read {table_class.__name__}" | |
| ) | |
| def _apply_user_filter(self, query: Select) -> Select: | |
| """ | |
| Automatically apply user_id filter to READ queries. | |
| Uses DBServiceConfig for model scopes and filter column. | |
| """ | |
| # First check if this is an admin-only query | |
| self._verify_admin_access(query) | |
| # Admins see all data | |
| if self.is_admin: | |
| logger.debug(f"Admin query - no user filter applied") | |
| return query | |
| # Get filter column from config | |
| filter_column = self._config.user_filter_column | |
| special_user_model = self._config.special_user_model | |
| user_id_column = self._config.user_id_column | |
| # Detect which model is being queried | |
| if hasattr(query, 'column_descriptions'): | |
| for description in query.column_descriptions: | |
| entity = description.get('entity') or description.get('type') | |
| if entity in self._config.user_read_scoped: | |
| logger.debug(f"Applying user filter to {entity.__name__} query") | |
| # Special handling for User model (uses id instead of user_id) | |
| if entity == special_user_model: | |
| user_col = getattr(entity, user_id_column) | |
| return query.where(user_col == self.user.id) | |
| # Standard user_id filtering | |
| if hasattr(entity, filter_column): | |
| user_col = getattr(entity, filter_column) | |
| return query.where(user_col == self.user.id) | |
| # Check froms | |
| for from_clause in query.froms: | |
| table_class = from_clause.entity_namespace.get('__class__') | |
| if table_class in self._config.user_read_scoped: | |
| logger.debug(f"Applying user filter to {table_class.__name__} query") | |
| # Special handling for User model | |
| if table_class == special_user_model: | |
| user_col = getattr(table_class, user_id_column) | |
| return query.where(user_col == self.user.id) | |
| # Standard user_id filtering | |
| if hasattr(table_class, filter_column): | |
| user_col = getattr(table_class, filter_column) | |
| return query.where(user_col == self.user.id) | |
| return query | |
| def _filter_deleted(self, query: Select) -> Select: | |
| """ | |
| Add filter to exclude soft-deleted records. | |
| Uses DBServiceConfig.soft_delete_column. | |
| """ | |
| delete_column = self._config.soft_delete_column | |
| # Detect which model is being queried | |
| if hasattr(query, 'column_descriptions'): | |
| for description in query.column_descriptions: | |
| entity = description.get('entity') or description.get('type') | |
| if entity and hasattr(entity, delete_column): | |
| deleted_at_col = getattr(entity, delete_column) | |
| return query.where(deleted_at_col == None) | |
| # Check froms | |
| for from_clause in query.froms: | |
| table_class = from_clause.entity_namespace.get('__class__') | |
| if table_class and hasattr(table_class, delete_column): | |
| deleted_at_col = getattr(table_class, delete_column) | |
| return query.where(deleted_at_col == None) | |
| return query | |