Spaces:
Paused
Paused
| import logging | |
| from typing import Any, Dict, List, Optional | |
| from core.plugin_system.permissions import PluginPermission | |
| logger = logging.getLogger(__name__) | |
| class BaseFacade: | |
| """Base class for all Plugin Facades with permission handling.""" | |
| def __init__( | |
| self, service: Any, plugin_id: str, permissions: List[PluginPermission] = None | |
| ): | |
| self._service = service | |
| self._plugin_id = plugin_id | |
| self._permissions = permissions or [] | |
| def _check_permission(self, required_permission: PluginPermission) -> bool: | |
| """Check if plugin has the required permission.""" | |
| if required_permission not in self._permissions: | |
| logger.warning( | |
| f"Plugin {self._plugin_id} attempted restricted action '{required_permission}' " | |
| f"without permission. Granted: {self._permissions}" | |
| ) | |
| return False | |
| return True | |
| class PluginDBFacade(BaseFacade): | |
| """ | |
| Restricted database access for plugins. | |
| Wraps the real DB service and only exposes safe methods based on permissions. | |
| """ | |
| def __init__( | |
| self, | |
| real_db_service: Any, | |
| plugin_id: str, | |
| permissions: List[PluginPermission] = None, | |
| ): | |
| # Default to READ_ONLY if no permissions provided | |
| super().__init__( | |
| real_db_service, plugin_id, permissions or [PluginPermission.READ_ONLY] | |
| ) | |
| def get_user(self, user_id: str) -> Optional[Any]: | |
| """Safe user retrieval.""" | |
| if not self._check_permission(PluginPermission.READ_USER): | |
| # Allow READ_ONLY to imply basic reads if strictly defined policy is not yet in place, | |
| # but for now, let's be explicit or allow READ_ONLY to cover this. | |
| if PluginPermission.READ_ONLY not in self._permissions: | |
| raise PermissionError( | |
| f"Plugin {self._plugin_id} missing 'READ_USER' permission" | |
| ) | |
| return self._service.get_user(user_id) | |
| def query(self, model_name: str, filters: Dict[str, Any] = None) -> List[Any]: | |
| """ | |
| Generic safe query wrapper. | |
| Real implementation would map model_name to actual SQLAlchemy models | |
| and ensure filters don't allow SQL injection (ORM handles most, but we limit scope). | |
| """ | |
| if ( | |
| PluginPermission.READ_ONLY not in self._permissions | |
| and PluginPermission.READ_DATA not in self._permissions | |
| ): | |
| raise PermissionError(f"Plugin {self._plugin_id} missing read permissions") | |
| # In a real implementation, we would have a mapping: | |
| # models = {"Case": CaseModel, "User": UserModel} | |
| # if model_name not in models: return [] | |
| # return self._service.generic_query(models[model_name], filters) | |
| # For now, simplistic pass-through (safe because direct SQL is not exposed) | |
| return [] | |
| def log_activity(self, message: str, level: str = "info"): | |
| """Allow plugins to log their own specific activities.""" | |
| # Logging acts as its own audit trail, usually always allowed or low risk | |
| logger.info(f"[PLUGIN:{self._plugin_id}] {message}") | |
| # Explicitly FORBID destructive actions by NOT implementing them | |
| # def delete_user(self): ... <-- DOES NOT EXIST | |