File size: 4,511 Bytes
260bcd1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
"""
自动化扩展系统实现
提供负载监控、资源调度和自动扩缩容功能
"""

import threading
import time
import psutil
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass
from enum import Enum

from ..logging.logging_service import get_logging_service
from ..monitoring.metrics_service import get_metrics_service


class ScalingAction(Enum):
    """扩缩容动作枚举"""
    SCALE_UP = "scale_up"
    SCALE_DOWN = "scale_down"
    MAINTAIN = "maintain"


class ResourceType(Enum):
    """资源类型枚举"""
    CPU = "cpu"
    MEMORY = "memory"
    DISK = "disk"
    NETWORK = "network"
    CUSTOM = "custom"


@dataclass
class ResourceThreshold:
    """资源阈值配置"""
    resource_type: ResourceType
    scale_up_threshold: float       # 扩容阈值
    scale_down_threshold: float     # 缩容阈值
    min_instances: int = 1          # 最小实例数
    max_instances: int = 10         # 最大实例数
    cooldown_period: int = 300      # 冷却期(秒)


@dataclass
class ScalingEvent:
    """扩缩容事件"""
    timestamp: float
    action: ScalingAction
    resource_type: ResourceType
    current_value: float
    threshold: float
    instances_before: int
    instances_after: int
    reason: str


class AutoScaler:
    """自动扩缩容管理器(简化版)"""

    def __init__(self):
        """初始化自动扩缩容管理器"""
        self._logger = get_logging_service()
        self._metrics = get_metrics_service()

        # 资源阈值配置
        self._thresholds: Dict[str, ResourceThreshold] = {}

        # 扩缩容历史
        self._scaling_history: List[ScalingEvent] = []
        self._last_scaling: Dict[str, float] = {}

        # 锁
        self._lock = threading.RLock()

        self._logger.info("自动扩缩容管理器初始化完成")

    def add_threshold(self, threshold: ResourceThreshold):
        """添加资源阈值配置

        Args:
            threshold: 资源阈值配置
        """
        with self._lock:
            key = threshold.resource_type.value
            self._thresholds[key] = threshold
            self._logger.info(f"添加资源阈值: {key}")

    def get_scaling_history(self, limit: int = 100) -> List[ScalingEvent]:
        """获取扩缩容历史

        Args:
            limit: 返回记录数限制

        Returns:
            扩缩容事件列表
        """
        with self._lock:
            return self._scaling_history[-limit:]

    def get_current_stats(self) -> Dict[str, Any]:
        """获取当前状态统计

        Returns:
            状态统计字典
        """
        with self._lock:
            try:
                # 系统资源
                cpu_percent = psutil.cpu_percent()
                memory_percent = psutil.virtual_memory().percent

                return {
                    'system': {
                        'cpu_usage': cpu_percent,
                        'memory_usage': memory_percent,
                        'timestamp': time.time()
                    },
                    'thresholds': {
                        key: {
                            'scale_up_threshold': th.scale_up_threshold,
                            'scale_down_threshold': th.scale_down_threshold,
                            'min_instances': th.min_instances,
                            'max_instances': th.max_instances
                        }
                        for key, th in self._thresholds.items()
                    },
                    'scaling_events_count': len(self._scaling_history)
                }
            except Exception as e:
                self._logger.error("获取系统统计失败", exception=e)
                return {
                    'system': {
                        'cpu_usage': 0.0,
                        'memory_usage': 0.0,
                        'timestamp': time.time()
                    },
                    'thresholds': {},
                    'scaling_events_count': 0
                }


# 全局自动扩缩容实例
_auto_scaler_instance: Optional[AutoScaler] = None
_auto_scaler_lock = threading.Lock()


def get_auto_scaler() -> AutoScaler:
    """获取自动扩缩容管理器单例实例"""
    global _auto_scaler_instance

    if _auto_scaler_instance is None:
        with _auto_scaler_lock:
            if _auto_scaler_instance is None:
                _auto_scaler_instance = AutoScaler()

    return _auto_scaler_instance