Spaces:
Build error
Build error
| """Configuration Application Service for settings management.""" | |
| import logging | |
| import os | |
| import json | |
| from typing import Dict, List, Any, Optional, Union | |
| from pathlib import Path | |
| from dataclasses import asdict | |
| from ..error_handling.error_mapper import ErrorMapper | |
| from ..error_handling.structured_logger import StructuredLogger, LogContext, get_structured_logger | |
| from ...infrastructure.config.app_config import AppConfig, TTSConfig, STTConfig, TranslationConfig, ProcessingConfig, LoggingConfig | |
| from ...infrastructure.config.dependency_container import DependencyContainer | |
| from ...domain.exceptions import DomainException | |
| logger = get_structured_logger(__name__) | |
| class ConfigurationException(DomainException): | |
| """Exception raised for configuration-related errors.""" | |
| pass | |
| class ConfigurationApplicationService: | |
| """Application service for managing application configuration and settings.""" | |
| def __init__( | |
| self, | |
| container: DependencyContainer, | |
| config: Optional[AppConfig] = None | |
| ): | |
| """ | |
| Initialize the configuration application service. | |
| Args: | |
| container: Dependency injection container | |
| config: Application configuration (optional, will be resolved from container) | |
| """ | |
| self._container = container | |
| self._config = config or container.resolve(AppConfig) | |
| # Initialize error handling | |
| self._error_mapper = ErrorMapper() | |
| logger.info("ConfigurationApplicationService initialized") | |
| def get_current_configuration(self) -> Dict[str, Any]: | |
| """ | |
| Get the current application configuration. | |
| Returns: | |
| Dict[str, Any]: Current configuration as dictionary | |
| """ | |
| try: | |
| return { | |
| 'tts': self._config.get_tts_config(), | |
| 'stt': self._config.get_stt_config(), | |
| 'translation': self._config.get_translation_config(), | |
| 'processing': self._config.get_processing_config(), | |
| 'logging': self._config.get_logging_config() | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to get current configuration: {e}") | |
| raise ConfigurationException(f"Failed to retrieve configuration: {str(e)}") | |
| def get_tts_configuration(self) -> Dict[str, Any]: | |
| """ | |
| Get TTS-specific configuration. | |
| Returns: | |
| Dict[str, Any]: TTS configuration | |
| """ | |
| try: | |
| return self._config.get_tts_config() | |
| except Exception as e: | |
| logger.error(f"Failed to get TTS configuration: {e}") | |
| raise ConfigurationException(f"Failed to retrieve TTS configuration: {str(e)}") | |
| def get_stt_configuration(self) -> Dict[str, Any]: | |
| """ | |
| Get STT-specific configuration. | |
| Returns: | |
| Dict[str, Any]: STT configuration | |
| """ | |
| try: | |
| return self._config.get_stt_config() | |
| except Exception as e: | |
| logger.error(f"Failed to get STT configuration: {e}") | |
| raise ConfigurationException(f"Failed to retrieve STT configuration: {str(e)}") | |
| def get_translation_configuration(self) -> Dict[str, Any]: | |
| """ | |
| Get translation-specific configuration. | |
| Returns: | |
| Dict[str, Any]: Translation configuration | |
| """ | |
| try: | |
| return self._config.get_translation_config() | |
| except Exception as e: | |
| logger.error(f"Failed to get translation configuration: {e}") | |
| raise ConfigurationException(f"Failed to retrieve translation configuration: {str(e)}") | |
| def get_processing_configuration(self) -> Dict[str, Any]: | |
| """ | |
| Get processing-specific configuration. | |
| Returns: | |
| Dict[str, Any]: Processing configuration | |
| """ | |
| try: | |
| return self._config.get_processing_config() | |
| except Exception as e: | |
| logger.error(f"Failed to get processing configuration: {e}") | |
| raise ConfigurationException(f"Failed to retrieve processing configuration: {str(e)}") | |
| def get_logging_configuration(self) -> Dict[str, Any]: | |
| """ | |
| Get logging-specific configuration. | |
| Returns: | |
| Dict[str, Any]: Logging configuration | |
| """ | |
| try: | |
| return self._config.get_logging_config() | |
| except Exception as e: | |
| logger.error(f"Failed to get logging configuration: {e}") | |
| raise ConfigurationException(f"Failed to retrieve logging configuration: {str(e)}") | |
| def update_tts_configuration(self, updates: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Update TTS configuration. | |
| Args: | |
| updates: Configuration updates to apply | |
| Returns: | |
| Dict[str, Any]: Updated TTS configuration | |
| Raises: | |
| ConfigurationException: If update fails or validation fails | |
| """ | |
| try: | |
| # Validate updates | |
| self._validate_tts_updates(updates) | |
| # Apply updates to current config | |
| current_config = self._config.get_tts_config() | |
| for key, value in updates.items(): | |
| if key in current_config: | |
| # Update the actual config object | |
| if hasattr(self._config.tts, key): | |
| setattr(self._config.tts, key, value) | |
| logger.info(f"Updated TTS config: {key} = {value}") | |
| else: | |
| logger.warning(f"Unknown TTS configuration key: {key}") | |
| # Return updated configuration | |
| updated_config = self._config.get_tts_config() | |
| logger.info(f"TTS configuration updated: {list(updates.keys())}") | |
| return updated_config | |
| except Exception as e: | |
| logger.error(f"Failed to update TTS configuration: {e}") | |
| raise ConfigurationException(f"Failed to update TTS configuration: {str(e)}") | |
| def update_stt_configuration(self, updates: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Update STT configuration. | |
| Args: | |
| updates: Configuration updates to apply | |
| Returns: | |
| Dict[str, Any]: Updated STT configuration | |
| Raises: | |
| ConfigurationException: If update fails or validation fails | |
| """ | |
| try: | |
| # Validate updates | |
| self._validate_stt_updates(updates) | |
| # Apply updates to current config | |
| current_config = self._config.get_stt_config() | |
| for key, value in updates.items(): | |
| if key in current_config: | |
| # Update the actual config object | |
| if hasattr(self._config.stt, key): | |
| setattr(self._config.stt, key, value) | |
| logger.info(f"Updated STT config: {key} = {value}") | |
| else: | |
| logger.warning(f"Unknown STT configuration key: {key}") | |
| # Return updated configuration | |
| updated_config = self._config.get_stt_config() | |
| logger.info(f"STT configuration updated: {list(updates.keys())}") | |
| return updated_config | |
| except Exception as e: | |
| logger.error(f"Failed to update STT configuration: {e}") | |
| raise ConfigurationException(f"Failed to update STT configuration: {str(e)}") | |
| def update_translation_configuration(self, updates: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Update translation configuration. | |
| Args: | |
| updates: Configuration updates to apply | |
| Returns: | |
| Dict[str, Any]: Updated translation configuration | |
| Raises: | |
| ConfigurationException: If update fails or validation fails | |
| """ | |
| try: | |
| # Validate updates | |
| self._validate_translation_updates(updates) | |
| # Apply updates to current config | |
| current_config = self._config.get_translation_config() | |
| for key, value in updates.items(): | |
| if key in current_config: | |
| # Update the actual config object | |
| if hasattr(self._config.translation, key): | |
| setattr(self._config.translation, key, value) | |
| logger.info(f"Updated translation config: {key} = {value}") | |
| else: | |
| logger.warning(f"Unknown translation configuration key: {key}") | |
| # Return updated configuration | |
| updated_config = self._config.get_translation_config() | |
| logger.info(f"Translation configuration updated: {list(updates.keys())}") | |
| return updated_config | |
| except Exception as e: | |
| logger.error(f"Failed to update translation configuration: {e}") | |
| raise ConfigurationException(f"Failed to update translation configuration: {str(e)}") | |
| def update_processing_configuration(self, updates: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Update processing configuration. | |
| Args: | |
| updates: Configuration updates to apply | |
| Returns: | |
| Dict[str, Any]: Updated processing configuration | |
| Raises: | |
| ConfigurationException: If update fails or validation fails | |
| """ | |
| try: | |
| # Validate updates | |
| self._validate_processing_updates(updates) | |
| # Apply updates to current config | |
| current_config = self._config.get_processing_config() | |
| for key, value in updates.items(): | |
| if key in current_config: | |
| # Update the actual config object | |
| if hasattr(self._config.processing, key): | |
| setattr(self._config.processing, key, value) | |
| logger.info(f"Updated processing config: {key} = {value}") | |
| else: | |
| logger.warning(f"Unknown processing configuration key: {key}") | |
| # Return updated configuration | |
| updated_config = self._config.get_processing_config() | |
| logger.info(f"Processing configuration updated: {list(updates.keys())}") | |
| return updated_config | |
| except Exception as e: | |
| logger.error(f"Failed to update processing configuration: {e}") | |
| raise ConfigurationException(f"Failed to update processing configuration: {str(e)}") | |
| def _validate_tts_updates(self, updates: Dict[str, Any]) -> None: | |
| """ | |
| Validate TTS configuration updates. | |
| Args: | |
| updates: Updates to validate | |
| Raises: | |
| ConfigurationException: If validation fails | |
| """ | |
| valid_providers = ['kokoro', 'dia', 'cosyvoice2', 'dummy'] | |
| valid_languages = ['en', 'es', 'fr', 'de', 'it', 'pt', 'ru', 'ja', 'ko', 'zh'] | |
| for key, value in updates.items(): | |
| if key == 'preferred_providers': | |
| if not isinstance(value, list): | |
| raise ConfigurationException("preferred_providers must be a list") | |
| for provider in value: | |
| if provider not in valid_providers: | |
| raise ConfigurationException(f"Invalid TTS provider: {provider}") | |
| elif key == 'default_speed': | |
| if not isinstance(value, (int, float)) or not (0.1 <= value <= 3.0): | |
| raise ConfigurationException("default_speed must be between 0.1 and 3.0") | |
| elif key == 'default_language': | |
| if value not in valid_languages: | |
| raise ConfigurationException(f"Invalid language: {value}") | |
| elif key == 'enable_streaming': | |
| if not isinstance(value, bool): | |
| raise ConfigurationException("enable_streaming must be a boolean") | |
| elif key == 'max_text_length': | |
| if not isinstance(value, int) or value <= 0: | |
| raise ConfigurationException("max_text_length must be a positive integer") | |
| def _validate_stt_updates(self, updates: Dict[str, Any]) -> None: | |
| """ | |
| Validate STT configuration updates. | |
| Args: | |
| updates: Updates to validate | |
| Raises: | |
| ConfigurationException: If validation fails | |
| """ | |
| valid_providers = ['whisper', 'parakeet'] | |
| for key, value in updates.items(): | |
| if key == 'preferred_providers': | |
| if not isinstance(value, list): | |
| raise ConfigurationException("preferred_providers must be a list") | |
| for provider in value: | |
| if provider not in valid_providers: | |
| raise ConfigurationException(f"Invalid STT provider: {provider}") | |
| elif key == 'default_model': | |
| if value not in valid_providers: | |
| raise ConfigurationException(f"Invalid STT model: {value}") | |
| elif key == 'chunk_length_s': | |
| if not isinstance(value, int) or value <= 0: | |
| raise ConfigurationException("chunk_length_s must be a positive integer") | |
| elif key == 'batch_size': | |
| if not isinstance(value, int) or value <= 0: | |
| raise ConfigurationException("batch_size must be a positive integer") | |
| elif key == 'enable_vad': | |
| if not isinstance(value, bool): | |
| raise ConfigurationException("enable_vad must be a boolean") | |
| def _validate_translation_updates(self, updates: Dict[str, Any]) -> None: | |
| """ | |
| Validate translation configuration updates. | |
| Args: | |
| updates: Updates to validate | |
| Raises: | |
| ConfigurationException: If validation fails | |
| """ | |
| for key, value in updates.items(): | |
| if key == 'default_provider': | |
| if not isinstance(value, str) or not value: | |
| raise ConfigurationException("default_provider must be a non-empty string") | |
| elif key == 'model_name': | |
| if not isinstance(value, str) or not value: | |
| raise ConfigurationException("model_name must be a non-empty string") | |
| elif key == 'max_chunk_length': | |
| if not isinstance(value, int) or value <= 0: | |
| raise ConfigurationException("max_chunk_length must be a positive integer") | |
| elif key == 'batch_size': | |
| if not isinstance(value, int) or value <= 0: | |
| raise ConfigurationException("batch_size must be a positive integer") | |
| elif key == 'cache_translations': | |
| if not isinstance(value, bool): | |
| raise ConfigurationException("cache_translations must be a boolean") | |
| def _validate_processing_updates(self, updates: Dict[str, Any]) -> None: | |
| """ | |
| Validate processing configuration updates. | |
| Args: | |
| updates: Updates to validate | |
| Raises: | |
| ConfigurationException: If validation fails | |
| """ | |
| for key, value in updates.items(): | |
| if key == 'temp_dir': | |
| if not isinstance(value, str) or not value: | |
| raise ConfigurationException("temp_dir must be a non-empty string") | |
| # Try to create directory to validate path | |
| try: | |
| Path(value).mkdir(parents=True, exist_ok=True) | |
| except Exception as e: | |
| raise ConfigurationException(f"Invalid temp_dir path: {e}") | |
| elif key == 'cleanup_temp_files': | |
| if not isinstance(value, bool): | |
| raise ConfigurationException("cleanup_temp_files must be a boolean") | |
| elif key == 'max_file_size_mb': | |
| if not isinstance(value, int) or value <= 0: | |
| raise ConfigurationException("max_file_size_mb must be a positive integer") | |
| elif key == 'supported_audio_formats': | |
| if not isinstance(value, list): | |
| raise ConfigurationException("supported_audio_formats must be a list") | |
| valid_formats = ['wav', 'mp3', 'flac', 'ogg', 'm4a'] | |
| for fmt in value: | |
| if fmt not in valid_formats: | |
| raise ConfigurationException(f"Invalid audio format: {fmt}") | |
| elif key == 'processing_timeout_seconds': | |
| if not isinstance(value, int) or value <= 0: | |
| raise ConfigurationException("processing_timeout_seconds must be a positive integer") | |
| def save_configuration_to_file(self, file_path: str) -> None: | |
| """ | |
| Save current configuration to file. | |
| Args: | |
| file_path: Path to save configuration file | |
| Raises: | |
| ConfigurationException: If save fails | |
| """ | |
| try: | |
| self._config.save_configuration(file_path) | |
| logger.info(f"Configuration saved to {file_path}") | |
| except Exception as e: | |
| logger.error(f"Failed to save configuration to {file_path}: {e}") | |
| raise ConfigurationException(f"Failed to save configuration: {str(e)}") | |
| def load_configuration_from_file(self, file_path: str) -> Dict[str, Any]: | |
| """ | |
| Load configuration from file. | |
| Args: | |
| file_path: Path to configuration file | |
| Returns: | |
| Dict[str, Any]: Loaded configuration | |
| Raises: | |
| ConfigurationException: If load fails | |
| """ | |
| try: | |
| if not os.path.exists(file_path): | |
| raise ConfigurationException(f"Configuration file not found: {file_path}") | |
| # Create new config instance with the file | |
| new_config = AppConfig(config_file=file_path) | |
| # Update current config | |
| self._config = new_config | |
| # Update container with new config | |
| self._container.register_singleton(AppConfig, new_config) | |
| logger.info(f"Configuration loaded from {file_path}") | |
| return self.get_current_configuration() | |
| except Exception as e: | |
| logger.error(f"Failed to load configuration from {file_path}: {e}") | |
| raise ConfigurationException(f"Failed to load configuration: {str(e)}") | |
| def reload_configuration(self) -> Dict[str, Any]: | |
| """ | |
| Reload configuration from original sources. | |
| Returns: | |
| Dict[str, Any]: Reloaded configuration | |
| Raises: | |
| ConfigurationException: If reload fails | |
| """ | |
| try: | |
| self._config.reload_configuration() | |
| logger.info("Configuration reloaded successfully") | |
| return self.get_current_configuration() | |
| except Exception as e: | |
| logger.error(f"Failed to reload configuration: {e}") | |
| raise ConfigurationException(f"Failed to reload configuration: {str(e)}") | |
| def get_provider_availability(self) -> Dict[str, Dict[str, bool]]: | |
| """ | |
| Get availability status of all providers. | |
| Returns: | |
| Dict[str, Dict[str, bool]]: Provider availability by category | |
| """ | |
| try: | |
| availability = { | |
| 'tts': {}, | |
| 'stt': {}, | |
| 'translation': {} | |
| } | |
| # Check TTS providers | |
| tts_factory = self._container.resolve(type(self._container._get_tts_factory())) | |
| for provider in ['kokoro', 'dia', 'cosyvoice2', 'dummy']: | |
| try: | |
| tts_factory.create_provider(provider) | |
| availability['tts'][provider] = True | |
| except Exception: | |
| availability['tts'][provider] = False | |
| # Check STT providers | |
| stt_factory = self._container.resolve(type(self._container._get_stt_factory())) | |
| for provider in ['whisper', 'parakeet']: | |
| try: | |
| stt_factory.create_provider(provider) | |
| availability['stt'][provider] = True | |
| except Exception: | |
| availability['stt'][provider] = False | |
| # Check translation providers | |
| translation_factory = self._container.resolve(type(self._container._get_translation_factory())) | |
| try: | |
| translation_factory.get_default_provider() | |
| availability['translation']['nllb'] = True | |
| except Exception: | |
| availability['translation']['nllb'] = False | |
| return availability | |
| except Exception as e: | |
| logger.error(f"Failed to check provider availability: {e}") | |
| raise ConfigurationException(f"Failed to check provider availability: {str(e)}") | |
| def get_system_info(self) -> Dict[str, Any]: | |
| """ | |
| Get system information and configuration summary. | |
| Returns: | |
| Dict[str, Any]: System information | |
| """ | |
| try: | |
| return { | |
| 'config_file': self._config.config_file, | |
| 'temp_directory': self._config.processing.temp_dir, | |
| 'log_level': self._config.logging.level, | |
| 'provider_availability': self.get_provider_availability(), | |
| 'supported_languages': [ | |
| 'en', 'es', 'fr', 'de', 'it', 'pt', 'ru', 'ja', 'ko', 'zh', | |
| 'ar', 'hi', 'tr', 'pl', 'nl', 'sv', 'da', 'no', 'fi' | |
| ], | |
| 'supported_audio_formats': self._config.processing.supported_audio_formats, | |
| 'max_file_size_mb': self._config.processing.max_file_size_mb, | |
| 'processing_timeout_seconds': self._config.processing.processing_timeout_seconds | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to get system info: {e}") | |
| raise ConfigurationException(f"Failed to get system info: {str(e)}") | |
| def validate_configuration(self) -> Dict[str, List[str]]: | |
| """ | |
| Validate current configuration and return any issues. | |
| Returns: | |
| Dict[str, List[str]]: Validation issues by category | |
| """ | |
| issues = { | |
| 'tts': [], | |
| 'stt': [], | |
| 'translation': [], | |
| 'processing': [], | |
| 'logging': [] | |
| } | |
| try: | |
| # Validate TTS configuration | |
| tts_config = self._config.get_tts_config() | |
| if not (0.1 <= tts_config['default_speed'] <= 3.0): | |
| issues['tts'].append(f"Invalid default_speed: {tts_config['default_speed']}") | |
| if tts_config['max_text_length'] <= 0: | |
| issues['tts'].append(f"Invalid max_text_length: {tts_config['max_text_length']}") | |
| # Validate STT configuration | |
| stt_config = self._config.get_stt_config() | |
| if stt_config['chunk_length_s'] <= 0: | |
| issues['stt'].append(f"Invalid chunk_length_s: {stt_config['chunk_length_s']}") | |
| if stt_config['batch_size'] <= 0: | |
| issues['stt'].append(f"Invalid batch_size: {stt_config['batch_size']}") | |
| # Validate processing configuration | |
| processing_config = self._config.get_processing_config() | |
| if not os.path.exists(processing_config['temp_dir']): | |
| issues['processing'].append(f"Temp directory does not exist: {processing_config['temp_dir']}") | |
| if processing_config['max_file_size_mb'] <= 0: | |
| issues['processing'].append(f"Invalid max_file_size_mb: {processing_config['max_file_size_mb']}") | |
| # Validate logging configuration | |
| logging_config = self._config.get_logging_config() | |
| valid_levels = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] | |
| if logging_config['level'].upper() not in valid_levels: | |
| issues['logging'].append(f"Invalid log level: {logging_config['level']}") | |
| except Exception as e: | |
| issues['general'] = [f"Configuration validation error: {str(e)}"] | |
| return issues | |
| def reset_to_defaults(self) -> Dict[str, Any]: | |
| """ | |
| Reset configuration to default values. | |
| Returns: | |
| Dict[str, Any]: Reset configuration | |
| Raises: | |
| ConfigurationException: If reset fails | |
| """ | |
| try: | |
| # Create new config with defaults | |
| default_config = AppConfig() | |
| # Update current config | |
| self._config = default_config | |
| # Update container with new config | |
| self._container.register_singleton(AppConfig, default_config) | |
| logger.info("Configuration reset to defaults") | |
| return self.get_current_configuration() | |
| except Exception as e: | |
| logger.error(f"Failed to reset configuration: {e}") | |
| raise ConfigurationException(f"Failed to reset configuration: {str(e)}") | |
| def cleanup(self) -> None: | |
| """Cleanup configuration service resources.""" | |
| logger.info("Cleaning up ConfigurationApplicationService") | |
| # No specific cleanup needed for configuration service | |
| logger.info("ConfigurationApplicationService cleanup completed") | |
| def __enter__(self): | |
| """Context manager entry.""" | |
| return self | |
| def __exit__(self, exc_type, exc_val, exc_tb): | |
| """Context manager exit with cleanup.""" | |
| self.cleanup() |