petter2025 commited on
Commit
9ec7605
·
verified ·
1 Parent(s): 4dfd09d

Update healing_policies.py

Browse files
Files changed (1) hide show
  1. healing_policies.py +332 -81
healing_policies.py CHANGED
@@ -1,124 +1,375 @@
 
 
 
 
 
1
  import datetime
2
- from models import HealingPolicy, HealingAction, EventSeverity
3
- from typing import Dict, List
 
 
 
 
 
4
 
5
- # Default healing policies
 
6
  DEFAULT_HEALING_POLICIES = [
7
  HealingPolicy(
8
  name="high_latency_restart",
9
- conditions={
10
- "latency_p99": {"operator": ">", "value": 300},
11
- "error_rate": {"operator": "<", "value": 0.1},
12
- },
13
- actions=[HealingAction.RESTART_CONTAINER],
14
- priority=2
 
15
  ),
16
  HealingPolicy(
17
- name="cascading_failure",
18
- conditions={
19
- "error_rate": {"operator": ">", "value": 0.15},
20
- },
21
- actions=[HealingAction.CIRCUIT_BREAKER, HealingAction.ALERT_TEAM],
22
- priority=1
 
 
23
  ),
24
  HealingPolicy(
25
- name="resource_exhaustion",
26
- conditions={
27
- "cpu_util": {"operator": ">", "value": 0.85},
28
- "memory_util": {"operator": ">", "value": 0.85}
29
- },
30
- actions=[HealingAction.SCALE_OUT, HealingAction.ALERT_TEAM],
31
- priority=1
 
32
  ),
33
  HealingPolicy(
34
- name="moderate_performance_issue",
35
- conditions={
36
- "latency_p99": {"operator": ">", "value": 200},
37
- "error_rate": {"operator": ">", "value": 0.05}
38
- },
39
- actions=[HealingAction.TRAFFIC_SHIFT],
40
- priority=3
 
 
41
  ),
42
  HealingPolicy(
43
- name="critical_failure",
44
- conditions={
45
- "latency_p99": {"operator": ">", "value": 500},
46
- "error_rate": {"operator": ">", "value": 0.1}
47
- },
48
- actions=[HealingAction.RESTART_CONTAINER, HealingAction.ALERT_TEAM, HealingAction.TRAFFIC_SHIFT],
49
- priority=1
 
50
  )
51
  ]
52
 
 
53
  class PolicyEngine:
54
- def __init__(self, policies: List[HealingPolicy] = None):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
55
  self.policies = policies or DEFAULT_HEALING_POLICIES
56
- self.last_execution: Dict[str, float] = {}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57
 
58
- def evaluate_policies(self, event) -> List[HealingAction]:
59
- """Evaluate all policies against the event and return matching actions"""
 
 
 
 
 
 
 
 
 
 
 
60
  applicable_actions = []
 
61
 
 
62
  for policy in self.policies:
63
  if not policy.enabled:
64
  continue
65
-
66
- # Check cooldown
67
  policy_key = f"{policy.name}_{event.component}"
68
- current_time = datetime.datetime.now().timestamp()
69
- last_exec = self.last_execution.get(policy_key, 0)
70
 
71
- if current_time - last_exec < policy.cool_down_seconds:
72
- continue
 
 
73
 
74
- if self._evaluate_conditions(policy.conditions, event):
75
- applicable_actions.extend(policy.actions)
76
- self.last_execution[policy_key] = current_time
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
77
 
78
- # Remove duplicates while preserving order
79
  seen = set()
80
  unique_actions = []
81
  for action in applicable_actions:
82
  if action not in seen:
83
  seen.add(action)
84
  unique_actions.append(action)
85
-
86
- return unique_actions or [HealingAction.NO_ACTION]
87
 
88
- def _evaluate_conditions(self, conditions: Dict, event) -> bool:
89
- """Evaluate individual conditions against event data"""
90
- for field, condition in conditions.items():
91
- operator = condition["operator"]
92
- value = condition["value"]
 
 
 
 
 
 
93
 
94
- # Get event field value
95
- event_value = getattr(event, field, None)
 
 
 
 
96
 
97
- if not self._compare_values(event_value, operator, value):
 
 
 
 
98
  return False
99
-
 
 
 
 
 
 
 
 
 
 
 
 
100
  return True
101
 
102
- def _compare_values(self, event_value, operator: str, condition_value) -> bool:
103
- """Compare values based on operator"""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
104
  try:
105
- if operator == ">":
106
- return event_value > condition_value
107
- elif operator == "<":
108
- return event_value < condition_value
109
- elif operator == ">=":
110
- return event_value >= condition_value
111
- elif operator == "<=":
112
- return event_value <= condition_value
113
- elif operator == "==":
114
- return event_value == condition_value
115
- elif operator == "in":
116
- return event_value in condition_value
117
- elif operator == "not_empty":
118
- if isinstance(event_value, list):
119
- return len(event_value) > 0 == condition_value
120
- return bool(event_value) == condition_value
 
 
 
 
 
 
 
 
121
  else:
 
122
  return False
123
- except (TypeError, ValueError):
124
- return False
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Policy Engine for Automated Healing Actions
3
+ Fixed version with thread safety and memory leak prevention
4
+ """
5
+
6
  import datetime
7
+ import threading
8
+ import logging
9
+ from collections import OrderedDict
10
+ from typing import Dict, List, Optional
11
+ from models import HealingPolicy, HealingAction, EventSeverity, ReliabilityEvent, PolicyCondition
12
+
13
+ logger = logging.getLogger(__name__)
14
 
15
+
16
+ # Default healing policies with structured conditions
17
  DEFAULT_HEALING_POLICIES = [
18
  HealingPolicy(
19
  name="high_latency_restart",
20
+ conditions=[
21
+ PolicyCondition(metric="latency_p99", operator="gt", threshold=500.0)
22
+ ],
23
+ actions=[HealingAction.RESTART_CONTAINER, HealingAction.ALERT_TEAM],
24
+ priority=1,
25
+ cool_down_seconds=300,
26
+ max_executions_per_hour=5
27
  ),
28
  HealingPolicy(
29
+ name="critical_error_rate_rollback",
30
+ conditions=[
31
+ PolicyCondition(metric="error_rate", operator="gt", threshold=0.3)
32
+ ],
33
+ actions=[HealingAction.ROLLBACK, HealingAction.CIRCUIT_BREAKER, HealingAction.ALERT_TEAM],
34
+ priority=1,
35
+ cool_down_seconds=600,
36
+ max_executions_per_hour=3
37
  ),
38
  HealingPolicy(
39
+ name="high_error_rate_traffic_shift",
40
+ conditions=[
41
+ PolicyCondition(metric="error_rate", operator="gt", threshold=0.15)
42
+ ],
43
+ actions=[HealingAction.TRAFFIC_SHIFT, HealingAction.ALERT_TEAM],
44
+ priority=2,
45
+ cool_down_seconds=300,
46
+ max_executions_per_hour=5
47
  ),
48
  HealingPolicy(
49
+ name="resource_exhaustion_scale",
50
+ conditions=[
51
+ PolicyCondition(metric="cpu_util", operator="gt", threshold=0.9),
52
+ PolicyCondition(metric="memory_util", operator="gt", threshold=0.9)
53
+ ],
54
+ actions=[HealingAction.SCALE_OUT],
55
+ priority=2,
56
+ cool_down_seconds=600,
57
+ max_executions_per_hour=10
58
  ),
59
  HealingPolicy(
60
+ name="moderate_latency_circuit_breaker",
61
+ conditions=[
62
+ PolicyCondition(metric="latency_p99", operator="gt", threshold=300.0)
63
+ ],
64
+ actions=[HealingAction.CIRCUIT_BREAKER],
65
+ priority=3,
66
+ cool_down_seconds=180,
67
+ max_executions_per_hour=8
68
  )
69
  ]
70
 
71
+
72
  class PolicyEngine:
73
+ """
74
+ Thread-safe policy engine with cooldown and rate limiting
75
+
76
+ CRITICAL FIXES:
77
+ - Added RLock for thread safety
78
+ - Fixed cooldown race condition (atomic check + update)
79
+ - Implemented LRU eviction to prevent memory leak
80
+ - Added priority-based policy evaluation
81
+ - Added rate limiting per policy
82
+ """
83
+
84
+ def __init__(
85
+ self,
86
+ policies: Optional[List[HealingPolicy]] = None,
87
+ max_cooldown_history: int = 10000,
88
+ max_execution_history: int = 1000
89
+ ):
90
+ """
91
+ Initialize policy engine
92
+
93
+ Args:
94
+ policies: List of healing policies (uses defaults if None)
95
+ max_cooldown_history: Maximum cooldown entries to keep (LRU)
96
+ max_execution_history: Maximum execution history per policy
97
+ """
98
  self.policies = policies or DEFAULT_HEALING_POLICIES
99
+
100
+ # FIXED: Added RLock for thread safety
101
+ self._lock = threading.RLock()
102
+
103
+ # FIXED: Use OrderedDict for LRU eviction (prevents memory leak)
104
+ self.last_execution: OrderedDict[str, float] = OrderedDict()
105
+ self.max_cooldown_history = max_cooldown_history
106
+
107
+ # Rate limiting: track executions per hour per policy
108
+ self.execution_timestamps: Dict[str, List[float]] = {}
109
+ self.max_execution_history = max_execution_history
110
+
111
+ # Sort policies by priority (lower number = higher priority)
112
+ self.policies = sorted(self.policies, key=lambda p: p.priority)
113
+
114
+ logger.info(
115
+ f"Initialized PolicyEngine with {len(self.policies)} policies, "
116
+ f"max_cooldown_history={max_cooldown_history}"
117
+ )
118
 
119
+ def evaluate_policies(self, event: ReliabilityEvent) -> List[HealingAction]:
120
+ """
121
+ Evaluate all policies against the event and return matching actions
122
+
123
+ FIXED: Atomic check + update under lock (prevents race condition)
124
+ FIXED: Priority-based evaluation
125
+
126
+ Args:
127
+ event: Reliability event to evaluate
128
+
129
+ Returns:
130
+ List of healing actions to execute
131
+ """
132
  applicable_actions = []
133
+ current_time = datetime.datetime.now(datetime.timezone.utc).timestamp()
134
 
135
+ # Evaluate policies in priority order
136
  for policy in self.policies:
137
  if not policy.enabled:
138
  continue
139
+
 
140
  policy_key = f"{policy.name}_{event.component}"
 
 
141
 
142
+ # FIXED: All cooldown operations under lock (atomic)
143
+ with self._lock:
144
+ # Check cooldown
145
+ last_exec = self.last_execution.get(policy_key, 0)
146
 
147
+ if current_time - last_exec < policy.cool_down_seconds:
148
+ logger.debug(
149
+ f"Policy {policy.name} for {event.component} on cooldown "
150
+ f"({current_time - last_exec:.0f}s / {policy.cool_down_seconds}s)"
151
+ )
152
+ continue
153
+
154
+ # Check rate limit
155
+ if self._is_rate_limited(policy_key, policy, current_time):
156
+ logger.warning(
157
+ f"Policy {policy.name} for {event.component} rate limited "
158
+ f"(max {policy.max_executions_per_hour}/hour)"
159
+ )
160
+ continue
161
+
162
+ # FIXED: Only update timestamp if conditions match
163
+ if self._evaluate_conditions(policy.conditions, event):
164
+ applicable_actions.extend(policy.actions)
165
+
166
+ # Update cooldown timestamp (INSIDE lock, AFTER condition check)
167
+ self._update_cooldown(policy_key, current_time)
168
+
169
+ # Track execution for rate limiting
170
+ self._record_execution(policy_key, current_time)
171
+
172
+ logger.info(
173
+ f"Policy {policy.name} triggered for {event.component}: "
174
+ f"actions={[a.value for a in policy.actions]}"
175
+ )
176
 
177
+ # Deduplicate actions while preserving order
178
  seen = set()
179
  unique_actions = []
180
  for action in applicable_actions:
181
  if action not in seen:
182
  seen.add(action)
183
  unique_actions.append(action)
184
+
185
+ return unique_actions if unique_actions else [HealingAction.NO_ACTION]
186
 
187
+ def _evaluate_conditions(
188
+ self,
189
+ conditions: List[PolicyCondition],
190
+ event: ReliabilityEvent
191
+ ) -> bool:
192
+ """
193
+ Evaluate all conditions against event (AND logic)
194
+
195
+ Args:
196
+ conditions: List of policy conditions
197
+ event: Reliability event
198
 
199
+ Returns:
200
+ True if all conditions match, False otherwise
201
+ """
202
+ for condition in conditions:
203
+ # Get event value
204
+ event_value = getattr(event, condition.metric, None)
205
 
206
+ # Handle None values
207
+ if event_value is None:
208
+ logger.debug(
209
+ f"Condition failed: {condition.metric} is None on event"
210
+ )
211
  return False
212
+
213
+ # Evaluate operator
214
+ if not self._compare_values(
215
+ event_value,
216
+ condition.operator,
217
+ condition.threshold
218
+ ):
219
+ logger.debug(
220
+ f"Condition failed: {event_value} {condition.operator} "
221
+ f"{condition.threshold} = False"
222
+ )
223
+ return False
224
+
225
  return True
226
 
227
+ def _compare_values(
228
+ self,
229
+ event_value: float,
230
+ operator: str,
231
+ threshold: float
232
+ ) -> bool:
233
+ """
234
+ Compare values based on operator with type safety
235
+
236
+ FIXED: Added type checking and better error handling
237
+
238
+ Args:
239
+ event_value: Value from event
240
+ operator: Comparison operator
241
+ threshold: Threshold value
242
+
243
+ Returns:
244
+ Comparison result
245
+ """
246
  try:
247
+ # Type validation
248
+ if not isinstance(event_value, (int, float)):
249
+ logger.error(
250
+ f"Invalid event_value type: {type(event_value)}, expected number"
251
+ )
252
+ return False
253
+
254
+ if not isinstance(threshold, (int, float)):
255
+ logger.error(
256
+ f"Invalid threshold type: {type(threshold)}, expected number"
257
+ )
258
+ return False
259
+
260
+ # Operator evaluation
261
+ if operator == "gt":
262
+ return event_value > threshold
263
+ elif operator == "lt":
264
+ return event_value < threshold
265
+ elif operator == "eq":
266
+ return abs(event_value - threshold) < 1e-6 # Float equality
267
+ elif operator == "gte":
268
+ return event_value >= threshold
269
+ elif operator == "lte":
270
+ return event_value <= threshold
271
  else:
272
+ logger.error(f"Unknown operator: {operator}")
273
  return False
274
+
275
+ except (TypeError, ValueError) as e:
276
+ logger.error(f"Comparison error: {e}", exc_info=True)
277
+ return False
278
+
279
+ def _update_cooldown(self, policy_key: str, timestamp: float) -> None:
280
+ """
281
+ Update cooldown timestamp with LRU eviction
282
+
283
+ FIXED: Prevents unbounded memory growth
284
+
285
+ Args:
286
+ policy_key: Policy identifier
287
+ timestamp: Current timestamp
288
+ """
289
+ # Update timestamp
290
+ self.last_execution[policy_key] = timestamp
291
+
292
+ # Move to end (most recently used)
293
+ self.last_execution.move_to_end(policy_key)
294
+
295
+ # LRU eviction if too large
296
+ while len(self.last_execution) > self.max_cooldown_history:
297
+ evicted_key, _ = self.last_execution.popitem(last=False)
298
+ logger.debug(f"Evicted cooldown entry: {evicted_key}")
299
+
300
+ def _is_rate_limited(
301
+ self,
302
+ policy_key: str,
303
+ policy: HealingPolicy,
304
+ current_time: float
305
+ ) -> bool:
306
+ """
307
+ Check if policy is rate limited
308
+
309
+ Args:
310
+ policy_key: Policy identifier
311
+ policy: Policy configuration
312
+ current_time: Current timestamp
313
+
314
+ Returns:
315
+ True if rate limited, False otherwise
316
+ """
317
+ if policy_key not in self.execution_timestamps:
318
+ return False
319
+
320
+ # Remove executions older than 1 hour
321
+ one_hour_ago = current_time - 3600
322
+ recent_executions = [
323
+ ts for ts in self.execution_timestamps[policy_key]
324
+ if ts > one_hour_ago
325
+ ]
326
+
327
+ self.execution_timestamps[policy_key] = recent_executions
328
+
329
+ # Check rate limit
330
+ return len(recent_executions) >= policy.max_executions_per_hour
331
+
332
+ def _record_execution(self, policy_key: str, timestamp: float) -> None:
333
+ """
334
+ Record policy execution for rate limiting
335
+
336
+ Args:
337
+ policy_key: Policy identifier
338
+ timestamp: Execution timestamp
339
+ """
340
+ if policy_key not in self.execution_timestamps:
341
+ self.execution_timestamps[policy_key] = []
342
+
343
+ self.execution_timestamps[policy_key].append(timestamp)
344
+
345
+ # Limit history size (memory management)
346
+ if len(self.execution_timestamps[policy_key]) > self.max_execution_history:
347
+ self.execution_timestamps[policy_key] = \
348
+ self.execution_timestamps[policy_key][-self.max_execution_history:]
349
+
350
+ def get_policy_stats(self) -> Dict[str, Dict]:
351
+ """
352
+ Get statistics about policy execution
353
+
354
+ Returns:
355
+ Dictionary of policy statistics
356
+ """
357
+ with self._lock:
358
+ stats = {}
359
+
360
+ for policy in self.policies:
361
+ policy_stats = {
362
+ "name": policy.name,
363
+ "priority": policy.priority,
364
+ "enabled": policy.enabled,
365
+ "cooldown_seconds": policy.cool_down_seconds,
366
+ "max_per_hour": policy.max_executions_per_hour,
367
+ "total_components": sum(
368
+ 1 for key in self.last_execution.keys()
369
+ if key.startswith(f"{policy.name}_")
370
+ )
371
+ }
372
+
373
+ stats[policy.name] = policy_stats
374
+
375
+ return stats