|
""" |
|
自动化扩展系统实现 |
|
提供负载监控、资源调度和自动扩缩容功能 |
|
""" |
|
|
|
import threading |
|
import time |
|
import psutil |
|
from typing import Dict, List, Any, Optional, Callable |
|
from dataclasses import dataclass |
|
from enum import Enum |
|
|
|
from ..logging.logging_service import get_logging_service |
|
from ..monitoring.metrics_service import get_metrics_service |
|
|
|
|
|
class ScalingAction(Enum): |
|
"""扩缩容动作枚举""" |
|
SCALE_UP = "scale_up" |
|
SCALE_DOWN = "scale_down" |
|
MAINTAIN = "maintain" |
|
|
|
|
|
class ResourceType(Enum): |
|
"""资源类型枚举""" |
|
CPU = "cpu" |
|
MEMORY = "memory" |
|
DISK = "disk" |
|
NETWORK = "network" |
|
CUSTOM = "custom" |
|
|
|
|
|
@dataclass |
|
class ResourceThreshold: |
|
"""资源阈值配置""" |
|
resource_type: ResourceType |
|
scale_up_threshold: float |
|
scale_down_threshold: float |
|
min_instances: int = 1 |
|
max_instances: int = 10 |
|
cooldown_period: int = 300 |
|
|
|
|
|
@dataclass |
|
class ScalingEvent: |
|
"""扩缩容事件""" |
|
timestamp: float |
|
action: ScalingAction |
|
resource_type: ResourceType |
|
current_value: float |
|
threshold: float |
|
instances_before: int |
|
instances_after: int |
|
reason: str |
|
|
|
|
|
class AutoScaler: |
|
"""自动扩缩容管理器(简化版)""" |
|
|
|
def __init__(self): |
|
"""初始化自动扩缩容管理器""" |
|
self._logger = get_logging_service() |
|
self._metrics = get_metrics_service() |
|
|
|
|
|
self._thresholds: Dict[str, ResourceThreshold] = {} |
|
|
|
|
|
self._scaling_history: List[ScalingEvent] = [] |
|
self._last_scaling: Dict[str, float] = {} |
|
|
|
|
|
self._lock = threading.RLock() |
|
|
|
self._logger.info("自动扩缩容管理器初始化完成") |
|
|
|
def add_threshold(self, threshold: ResourceThreshold): |
|
"""添加资源阈值配置 |
|
|
|
Args: |
|
threshold: 资源阈值配置 |
|
""" |
|
with self._lock: |
|
key = threshold.resource_type.value |
|
self._thresholds[key] = threshold |
|
self._logger.info(f"添加资源阈值: {key}") |
|
|
|
def get_scaling_history(self, limit: int = 100) -> List[ScalingEvent]: |
|
"""获取扩缩容历史 |
|
|
|
Args: |
|
limit: 返回记录数限制 |
|
|
|
Returns: |
|
扩缩容事件列表 |
|
""" |
|
with self._lock: |
|
return self._scaling_history[-limit:] |
|
|
|
def get_current_stats(self) -> Dict[str, Any]: |
|
"""获取当前状态统计 |
|
|
|
Returns: |
|
状态统计字典 |
|
""" |
|
with self._lock: |
|
try: |
|
|
|
cpu_percent = psutil.cpu_percent() |
|
memory_percent = psutil.virtual_memory().percent |
|
|
|
return { |
|
'system': { |
|
'cpu_usage': cpu_percent, |
|
'memory_usage': memory_percent, |
|
'timestamp': time.time() |
|
}, |
|
'thresholds': { |
|
key: { |
|
'scale_up_threshold': th.scale_up_threshold, |
|
'scale_down_threshold': th.scale_down_threshold, |
|
'min_instances': th.min_instances, |
|
'max_instances': th.max_instances |
|
} |
|
for key, th in self._thresholds.items() |
|
}, |
|
'scaling_events_count': len(self._scaling_history) |
|
} |
|
except Exception as e: |
|
self._logger.error("获取系统统计失败", exception=e) |
|
return { |
|
'system': { |
|
'cpu_usage': 0.0, |
|
'memory_usage': 0.0, |
|
'timestamp': time.time() |
|
}, |
|
'thresholds': {}, |
|
'scaling_events_count': 0 |
|
} |
|
|
|
|
|
|
|
_auto_scaler_instance: Optional[AutoScaler] = None |
|
_auto_scaler_lock = threading.Lock() |
|
|
|
|
|
def get_auto_scaler() -> AutoScaler: |
|
"""获取自动扩缩容管理器单例实例""" |
|
global _auto_scaler_instance |
|
|
|
if _auto_scaler_instance is None: |
|
with _auto_scaler_lock: |
|
if _auto_scaler_instance is None: |
|
_auto_scaler_instance = AutoScaler() |
|
|
|
return _auto_scaler_instance |