| """ |
| Main settings module for BackgroundFX Pro. |
| |
| Automatically loads the appropriate configuration based on environment. |
| """ |
|
|
| import os |
| from pathlib import Path |
| from typing import Optional, Dict, Any |
| from dataclasses import dataclass, field |
| import logging |
|
|
| from .base import BaseConfig, config_manager, ConfigurationError |
| from .validators import validate_production_config |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| @dataclass |
| class Settings(BaseConfig): |
| """ |
| Main application settings with all configuration options. |
| """ |
| |
| |
| api_version: str = "v1" |
| api_prefix: str = "/api" |
| api_title: str = "BackgroundFX Pro API" |
| api_description: str = "AI-powered background removal and replacement" |
| api_docs_enabled: bool = True |
| |
| |
| database_url: str = field( |
| default_factory=lambda: os.getenv( |
| "DATABASE_URL", |
| "postgresql://user:password@localhost/backgroundfx" |
| ) |
| ) |
| database_pool_size: int = 20 |
| database_max_overflow: int = 40 |
| database_echo: bool = False |
| |
| |
| mongodb_url: str = field( |
| default_factory=lambda: os.getenv( |
| "MONGODB_URL", |
| "mongodb://localhost:27017/backgroundfx" |
| ) |
| ) |
| mongodb_database: str = "backgroundfx" |
| |
| |
| redis_url: str = field( |
| default_factory=lambda: os.getenv( |
| "REDIS_URL", |
| "redis://localhost:6379/0" |
| ) |
| ) |
| redis_max_connections: int = 50 |
| redis_decode_responses: bool = True |
| |
| |
| storage_backend: str = "s3" |
| storage_bucket: str = "backgroundfx-uploads" |
| storage_region: str = "us-east-1" |
| storage_access_key: str = field(default_factory=lambda: os.getenv("AWS_ACCESS_KEY_ID", "")) |
| storage_secret_key: str = field(default_factory=lambda: os.getenv("AWS_SECRET_ACCESS_KEY", "")) |
| storage_endpoint_url: Optional[str] = None |
| storage_use_ssl: bool = True |
| local_storage_path: Path = field(default_factory=lambda: Path("storage")) |
| |
| |
| max_image_size: int = 50 * 1024 * 1024 |
| max_video_size: int = 500 * 1024 * 1024 |
| max_batch_size: int = 100 |
| processing_timeout: int = 300 |
| enable_gpu: bool = True |
| gpu_memory_fraction: float = 0.8 |
| |
| |
| models_dir: Path = field(default_factory=lambda: Path("models")) |
| model_cache_dir: Path = field(default_factory=lambda: Path("/tmp/model_cache")) |
| default_model: str = "rembg" |
| model_download_timeout: int = 600 |
| model_configs: Dict[str, Dict[str, Any]] = field(default_factory=lambda: { |
| "rembg": { |
| "path": "models/rembg", |
| "version": "1.0.0", |
| "gpu_enabled": True, |
| "batch_size": 4 |
| }, |
| "u2net": { |
| "path": "models/u2net", |
| "version": "1.0.0", |
| "gpu_enabled": True, |
| "batch_size": 2 |
| }, |
| "sam2": { |
| "path": "models/sam2", |
| "version": "2.0.0", |
| "gpu_enabled": True, |
| "batch_size": 1 |
| } |
| }) |
| |
| |
| celery_broker_url: str = field( |
| default_factory=lambda: os.getenv( |
| "CELERY_BROKER_URL", |
| "redis://localhost:6379/1" |
| ) |
| ) |
| celery_result_backend: str = field( |
| default_factory=lambda: os.getenv( |
| "CELERY_RESULT_BACKEND", |
| "redis://localhost:6379/2" |
| ) |
| ) |
| celery_task_time_limit: int = 600 |
| celery_task_soft_time_limit: int = 540 |
| celery_worker_concurrency: int = 4 |
| celery_worker_prefetch_multiplier: int = 1 |
| |
| |
| jwt_secret_key: str = field( |
| default_factory=lambda: os.getenv( |
| "JWT_SECRET_KEY", |
| "change-me-in-production" |
| ) |
| ) |
| jwt_algorithm: str = "HS256" |
| jwt_expiration_delta: int = 3600 |
| jwt_refresh_expiration_delta: int = 86400 * 7 |
| |
| |
| oauth_google_client_id: str = field(default_factory=lambda: os.getenv("GOOGLE_CLIENT_ID", "")) |
| oauth_google_client_secret: str = field(default_factory=lambda: os.getenv("GOOGLE_CLIENT_SECRET", "")) |
| oauth_github_client_id: str = field(default_factory=lambda: os.getenv("GITHUB_CLIENT_ID", "")) |
| oauth_github_client_secret: str = field(default_factory=lambda: os.getenv("GITHUB_CLIENT_SECRET", "")) |
| |
| |
| smtp_host: str = field(default_factory=lambda: os.getenv("SMTP_HOST", "smtp.gmail.com")) |
| smtp_port: int = 587 |
| smtp_user: str = field(default_factory=lambda: os.getenv("SMTP_USER", "")) |
| smtp_password: str = field(default_factory=lambda: os.getenv("SMTP_PASSWORD", "")) |
| smtp_use_tls: bool = True |
| email_from: str = "noreply@backgroundfx.pro" |
| email_from_name: str = "BackgroundFX Pro" |
| |
| |
| sentry_dsn: str = field(default_factory=lambda: os.getenv("SENTRY_DSN", "")) |
| sentry_environment: str = field(default_factory=lambda: os.getenv("ENVIRONMENT", "development")) |
| sentry_traces_sample_rate: float = 0.1 |
| |
| prometheus_enabled: bool = True |
| prometheus_port: int = 9090 |
| |
| |
| webhook_timeout: int = 30 |
| webhook_max_retries: int = 3 |
| webhook_retry_delay: int = 5 |
| |
| |
| cache_ttl: int = 3600 |
| cache_max_entries: int = 10000 |
| cache_eviction_policy: str = "lru" |
| |
| |
| cdn_enabled: bool = True |
| cdn_base_url: str = "https://cdn.backgroundfx.pro" |
| cdn_cache_control: str = "public, max-age=31536000" |
| |
| |
| stripe_secret_key: str = field(default_factory=lambda: os.getenv("STRIPE_SECRET_KEY", "")) |
| stripe_publishable_key: str = field(default_factory=lambda: os.getenv("STRIPE_PUBLISHABLE_KEY", "")) |
| stripe_webhook_secret: str = field(default_factory=lambda: os.getenv("STRIPE_WEBHOOK_SECRET", "")) |
| |
| |
| plan_limits: Dict[str, Dict[str, Any]] = field(default_factory=lambda: { |
| "free": { |
| "images_per_month": 100, |
| "videos_per_month": 5, |
| "max_image_size": 10 * 1024 * 1024, |
| "max_video_size": 50 * 1024 * 1024, |
| "api_calls_per_hour": 100, |
| "storage_gb": 1, |
| "quality": "medium" |
| }, |
| "pro": { |
| "images_per_month": 5000, |
| "videos_per_month": 100, |
| "max_image_size": 50 * 1024 * 1024, |
| "max_video_size": 200 * 1024 * 1024, |
| "api_calls_per_hour": 1000, |
| "storage_gb": 50, |
| "quality": "high" |
| }, |
| "enterprise": { |
| "images_per_month": -1, |
| "videos_per_month": -1, |
| "max_image_size": 100 * 1024 * 1024, |
| "max_video_size": 500 * 1024 * 1024, |
| "api_calls_per_hour": -1, |
| "storage_gb": 500, |
| "quality": "ultra" |
| } |
| }) |
| |
| def __post_init__(self): |
| """Initialize and validate settings.""" |
| super().__post_init__() |
| |
| |
| if self.environment == "production": |
| self.add_validator(validate_production_config) |
| |
| |
| self.models_dir.mkdir(parents=True, exist_ok=True) |
| self.model_cache_dir.mkdir(parents=True, exist_ok=True) |
| |
| |
| if self.storage_backend == "local": |
| self.local_storage_path.mkdir(parents=True, exist_ok=True) |
| |
| def get_database_config(self) -> Dict[str, Any]: |
| """Get database configuration as dictionary.""" |
| return { |
| "url": self.database_url, |
| "pool_size": self.database_pool_size, |
| "max_overflow": self.database_max_overflow, |
| "echo": self.database_echo |
| } |
| |
| def get_redis_config(self) -> Dict[str, Any]: |
| """Get Redis configuration as dictionary.""" |
| return { |
| "url": self.redis_url, |
| "max_connections": self.redis_max_connections, |
| "decode_responses": self.redis_decode_responses |
| } |
| |
| def get_storage_config(self) -> Dict[str, Any]: |
| """Get storage configuration as dictionary.""" |
| config = { |
| "backend": self.storage_backend, |
| "bucket": self.storage_bucket, |
| "region": self.storage_region, |
| "use_ssl": self.storage_use_ssl |
| } |
| |
| if self.storage_backend == "s3": |
| config.update({ |
| "access_key": self.storage_access_key, |
| "secret_key": self.storage_secret_key, |
| "endpoint_url": self.storage_endpoint_url |
| }) |
| elif self.storage_backend == "local": |
| config["path"] = str(self.local_storage_path) |
| |
| return config |
| |
| def get_model_config(self, model_name: str) -> Dict[str, Any]: |
| """Get configuration for specific model.""" |
| if model_name not in self.model_configs: |
| raise ConfigurationError(f"Model configuration not found: {model_name}") |
| return self.model_configs[model_name] |
| |
| def get_plan_limits(self, plan: str) -> Dict[str, Any]: |
| """Get limits for specific plan.""" |
| if plan not in self.plan_limits: |
| raise ConfigurationError(f"Plan not found: {plan}") |
| return self.plan_limits[plan] |
| |
| def is_feature_enabled(self, feature: str) -> bool: |
| """Check if a feature is enabled.""" |
| feature_map = { |
| "video": self.enable_video_processing, |
| "batch": self.enable_batch_processing, |
| "ai_backgrounds": self.enable_ai_backgrounds, |
| "webhooks": self.enable_webhooks, |
| "gpu": self.enable_gpu |
| } |
| return feature_map.get(feature, False) |
|
|
|
|
| def load_settings(environment: Optional[str] = None) -> Settings: |
| """ |
| Load settings for specified environment. |
| |
| Args: |
| environment: Environment name (development, staging, production, testing) |
| If not specified, uses ENVIRONMENT env var or defaults to development |
| |
| Returns: |
| Settings instance configured for the environment |
| """ |
| environment = environment or os.getenv("ENVIRONMENT", "development") |
| |
| |
| try: |
| if environment == "development": |
| from .environments.development import DevelopmentSettings |
| settings_class = DevelopmentSettings |
| elif environment == "staging": |
| from .environments.staging import StagingSettings |
| settings_class = StagingSettings |
| elif environment == "production": |
| from .environments.production import ProductionSettings |
| settings_class = ProductionSettings |
| elif environment == "testing": |
| from .environments.testing import TestingSettings |
| settings_class = TestingSettings |
| else: |
| logger.warning(f"Unknown environment: {environment}, using base settings") |
| settings_class = Settings |
| except ImportError: |
| logger.warning(f"Environment module not found: {environment}, using base settings") |
| settings_class = Settings |
| |
| |
| settings = settings_class() |
| |
| |
| from .base import EnvironmentSource |
| settings.add_source(EnvironmentSource()) |
| |
| |
| env_file = Path(f".env.{environment}") |
| if env_file.exists(): |
| from .base import EnvFileSource |
| settings.add_source(EnvFileSource(str(env_file))) |
| elif Path(".env").exists(): |
| from .base import EnvFileSource |
| settings.add_source(EnvFileSource(".env")) |
| |
| |
| config_file = Path(f"config/{environment}.yaml") |
| if config_file.exists(): |
| from .base import YAMLFileSource |
| settings.add_source(YAMLFileSource(str(config_file))) |
| |
| |
| settings.load_from_sources() |
| |
| |
| settings.validate() |
| |
| |
| config_manager.register(environment, settings) |
| config_manager.set_active(environment) |
| |
| logger.info(f"Loaded settings for environment: {environment}") |
| |
| return settings |
|
|
|
|
| |
| settings = load_settings() |
|
|
| |
| DEBUG = settings.debug |
| SECRET_KEY = settings.secret_key |
| DATABASE_URL = settings.database_url |
| REDIS_URL = settings.redis_url |