File size: 4,511 Bytes
260bcd1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
"""
自动化扩展系统实现
提供负载监控、资源调度和自动扩缩容功能
"""
import threading
import time
import psutil
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass
from enum import Enum
from ..logging.logging_service import get_logging_service
from ..monitoring.metrics_service import get_metrics_service
class ScalingAction(Enum):
"""扩缩容动作枚举"""
SCALE_UP = "scale_up"
SCALE_DOWN = "scale_down"
MAINTAIN = "maintain"
class ResourceType(Enum):
"""资源类型枚举"""
CPU = "cpu"
MEMORY = "memory"
DISK = "disk"
NETWORK = "network"
CUSTOM = "custom"
@dataclass
class ResourceThreshold:
"""资源阈值配置"""
resource_type: ResourceType
scale_up_threshold: float # 扩容阈值
scale_down_threshold: float # 缩容阈值
min_instances: int = 1 # 最小实例数
max_instances: int = 10 # 最大实例数
cooldown_period: int = 300 # 冷却期(秒)
@dataclass
class ScalingEvent:
"""扩缩容事件"""
timestamp: float
action: ScalingAction
resource_type: ResourceType
current_value: float
threshold: float
instances_before: int
instances_after: int
reason: str
class AutoScaler:
"""自动扩缩容管理器(简化版)"""
def __init__(self):
"""初始化自动扩缩容管理器"""
self._logger = get_logging_service()
self._metrics = get_metrics_service()
# 资源阈值配置
self._thresholds: Dict[str, ResourceThreshold] = {}
# 扩缩容历史
self._scaling_history: List[ScalingEvent] = []
self._last_scaling: Dict[str, float] = {}
# 锁
self._lock = threading.RLock()
self._logger.info("自动扩缩容管理器初始化完成")
def add_threshold(self, threshold: ResourceThreshold):
"""添加资源阈值配置
Args:
threshold: 资源阈值配置
"""
with self._lock:
key = threshold.resource_type.value
self._thresholds[key] = threshold
self._logger.info(f"添加资源阈值: {key}")
def get_scaling_history(self, limit: int = 100) -> List[ScalingEvent]:
"""获取扩缩容历史
Args:
limit: 返回记录数限制
Returns:
扩缩容事件列表
"""
with self._lock:
return self._scaling_history[-limit:]
def get_current_stats(self) -> Dict[str, Any]:
"""获取当前状态统计
Returns:
状态统计字典
"""
with self._lock:
try:
# 系统资源
cpu_percent = psutil.cpu_percent()
memory_percent = psutil.virtual_memory().percent
return {
'system': {
'cpu_usage': cpu_percent,
'memory_usage': memory_percent,
'timestamp': time.time()
},
'thresholds': {
key: {
'scale_up_threshold': th.scale_up_threshold,
'scale_down_threshold': th.scale_down_threshold,
'min_instances': th.min_instances,
'max_instances': th.max_instances
}
for key, th in self._thresholds.items()
},
'scaling_events_count': len(self._scaling_history)
}
except Exception as e:
self._logger.error("获取系统统计失败", exception=e)
return {
'system': {
'cpu_usage': 0.0,
'memory_usage': 0.0,
'timestamp': time.time()
},
'thresholds': {},
'scaling_events_count': 0
}
# 全局自动扩缩容实例
_auto_scaler_instance: Optional[AutoScaler] = None
_auto_scaler_lock = threading.Lock()
def get_auto_scaler() -> AutoScaler:
"""获取自动扩缩容管理器单例实例"""
global _auto_scaler_instance
if _auto_scaler_instance is None:
with _auto_scaler_lock:
if _auto_scaler_instance is None:
_auto_scaler_instance = AutoScaler()
return _auto_scaler_instance |