petter2025 commited on
Commit
e840d23
·
verified ·
1 Parent(s): 770d0b0

Update infrastructure/policies.py

Browse files
Files changed (1) hide show
  1. infrastructure/policies.py +3 -76
infrastructure/policies.py CHANGED
@@ -1,14 +1,5 @@
1
- # agentic_reliability_framework/infrastructure/policies.py
2
  """
3
  Policy Algebra – Composable, typed policies for infrastructure governance.
4
-
5
- This module defines a composable policy system using a monoid-like structure.
6
- Policies can be combined (AND, OR) and evaluated against intents. The algebra
7
- enables building complex rules from simple primitives while maintaining
8
- deterministic evaluation.
9
-
10
- The design draws from knowledge engineering (rule-based systems), decision
11
- engineering (explicit trade-offs), and platform engineering (pluggable backends).
12
  """
13
 
14
  from __future__ import annotations
@@ -19,7 +10,8 @@ from typing import Any, Callable, Dict, List, Optional, Protocol, Set, TypeVar,
19
 
20
  from pydantic import BaseModel
21
 
22
- from agentic_reliability_framework.infrastructure.intents import (
 
23
  InfrastructureIntent,
24
  ProvisionResourceIntent,
25
  GrantAccessIntent,
@@ -28,48 +20,28 @@ from agentic_reliability_framework.infrastructure.intents import (
28
  Environment,
29
  )
30
 
31
- # -----------------------------------------------------------------------------
32
- # Type Aliases for Readability
33
- # -----------------------------------------------------------------------------
34
  Violation = str
35
  EvalResult = List[Violation]
36
 
37
- # -----------------------------------------------------------------------------
38
- # Abstract Policy Class – using Composite pattern
39
- # -----------------------------------------------------------------------------
40
  class Policy(ABC):
41
- """Abstract base for all policies. Evaluates an intent and returns violations."""
42
-
43
  @abstractmethod
44
  def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
45
- """Return list of violations. Empty list means the policy is satisfied."""
46
  pass
47
 
48
- # -------------------------------------------------------------------------
49
- # Combinators (returning new composite policies)
50
- # -------------------------------------------------------------------------
51
  def __and__(self, other: Policy) -> Policy:
52
- """Logical AND: violations from both policies."""
53
  return AndPolicy(self, other)
54
 
55
  def __or__(self, other: Policy) -> Policy:
56
- """Logical OR: violations only if both policies produce violations."""
57
  return OrPolicy(self, other)
58
 
59
  def __invert__(self) -> Policy:
60
- """Logical NOT: violations if the original policy yields no violations."""
61
  return NotPolicy(self)
62
 
63
- # -----------------------------------------------------------------------------
64
- # Atomic Policies (Primitives)
65
- # -----------------------------------------------------------------------------
66
  class AtomicPolicy(Policy, ABC):
67
- """Base class for policies that don't contain other policies."""
68
  pass
69
 
70
  @dataclass(frozen=True)
71
  class RegionAllowedPolicy(AtomicPolicy):
72
- """Ensure the intent's region is in an allowed set."""
73
  allowed_regions: Set[str]
74
 
75
  def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
@@ -80,7 +52,6 @@ class RegionAllowedPolicy(AtomicPolicy):
80
 
81
  @dataclass(frozen=True)
82
  class ResourceTypeRestrictedPolicy(AtomicPolicy):
83
- """Forbid certain resource types."""
84
  forbidden_types: Set[ResourceType]
85
 
86
  def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
@@ -91,10 +62,8 @@ class ResourceTypeRestrictedPolicy(AtomicPolicy):
91
 
92
  @dataclass(frozen=True)
93
  class MaxPermissionLevelPolicy(AtomicPolicy):
94
- """Limit the maximum permission level that can be granted."""
95
  max_level: PermissionLevel
96
 
97
- # Permission level ordering (read < write < admin)
98
  _LEVEL_ORDER = {
99
  PermissionLevel.READ: 1,
100
  PermissionLevel.WRITE: 2,
@@ -109,26 +78,13 @@ class MaxPermissionLevelPolicy(AtomicPolicy):
109
 
110
  @dataclass(frozen=True)
111
  class CostThresholdPolicy(AtomicPolicy):
112
- """
113
- Enforce a maximum monthly cost.
114
- Note: This policy requires the cost estimate to be provided externally.
115
- We evaluate it only when a cost is supplied via context.
116
- """
117
  max_cost_usd: float
118
 
119
  def evaluate(self, intent: InfrastructureIntent, cost: Optional[float] = None) -> EvalResult:
120
- # This is a special case – we need cost from the simulator.
121
- # We'll handle it by allowing context injection. For composition, we'll use a wrapper.
122
- # In practice, we'll evaluate this in the simulator and add violations manually.
123
- # For now, we keep it as a marker.
124
  return []
125
 
126
- # -----------------------------------------------------------------------------
127
- # Composite Policies (using Combinators)
128
- # -----------------------------------------------------------------------------
129
  @dataclass(frozen=True)
130
  class AndPolicy(Policy):
131
- """Logical AND of two policies."""
132
  left: Policy
133
  right: Policy
134
 
@@ -137,63 +93,42 @@ class AndPolicy(Policy):
137
 
138
  @dataclass(frozen=True)
139
  class OrPolicy(Policy):
140
- """Logical OR – violations only if both sub-policies have violations."""
141
  left: Policy
142
  right: Policy
143
 
144
  def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
145
  left_violations = self.left.evaluate(intent)
146
  right_violations = self.right.evaluate(intent)
147
- # If either has no violations, overall no violations (OR satisfied)
148
  if not left_violations or not right_violations:
149
  return []
150
- # Both have violations – combine them
151
  return left_violations + right_violations
152
 
153
  @dataclass(frozen=True)
154
  class NotPolicy(Policy):
155
- """Logical NOT – violations if inner policy has no violations."""
156
  inner: Policy
157
 
158
  def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
159
  inner_violations = self.inner.evaluate(intent)
160
  if inner_violations:
161
- return [] # inner failed (had violations), so NOT is satisfied (no violations)
162
  else:
163
  return ["Condition not met (NOT policy)"]
164
 
165
- # -----------------------------------------------------------------------------
166
- # Context-Aware Evaluation
167
- # -----------------------------------------------------------------------------
168
  class PolicyEvaluator:
169
- """
170
- Evaluates a policy tree with additional context (e.g., cost estimates).
171
- This allows policies that depend on external data to be evaluated.
172
- """
173
-
174
  def __init__(self, root_policy: Policy):
175
  self._root = root_policy
176
 
177
  def evaluate(self, intent: InfrastructureIntent, context: Optional[Dict[str, Any]] = None) -> EvalResult:
178
- """
179
- Evaluate the policy tree against the intent, using context for dynamic checks.
180
- Context may contain 'cost_estimate' for CostThresholdPolicy, etc.
181
- """
182
- # For simplicity, we traverse the tree and apply context where needed.
183
- # A more sophisticated implementation would use a visitor pattern.
184
  return self._evaluate_recursive(self._root, intent, context or {})
185
 
186
  def _evaluate_recursive(self, policy: Policy, intent: InfrastructureIntent, context: Dict[str, Any]) -> EvalResult:
187
- # If policy is CostThresholdPolicy, we use context to evaluate.
188
  if isinstance(policy, CostThresholdPolicy):
189
  cost = context.get('cost_estimate')
190
  if cost is not None and cost > policy.max_cost_usd:
191
  return [f"Cost ${cost:.2f} exceeds threshold ${policy.max_cost_usd:.2f}"]
192
  return []
193
- # For atomic policies, just evaluate (they don't need context)
194
  if isinstance(policy, AtomicPolicy):
195
  return policy.evaluate(intent)
196
- # For composites, recurse
197
  if isinstance(policy, AndPolicy):
198
  return self._evaluate_recursive(policy.left, intent, context) + self._evaluate_recursive(policy.right, intent, context)
199
  if isinstance(policy, OrPolicy):
@@ -207,28 +142,20 @@ class PolicyEvaluator:
207
  if inner:
208
  return []
209
  return ["Condition not met (NOT policy)"]
210
- # Fallback
211
  return []
212
 
213
- # -----------------------------------------------------------------------------
214
- # Policy Builder (DSL) – convenience functions
215
- # -----------------------------------------------------------------------------
216
  def allow_all() -> Policy:
217
- """Policy that never produces violations."""
218
  class _AllowAll(AtomicPolicy):
219
  def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
220
  return []
221
  return _AllowAll()
222
 
223
  def deny_all() -> Policy:
224
- """Policy that always produces a violation."""
225
  class _DenyAll(AtomicPolicy):
226
  def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
227
  return ["Action denied by default policy"]
228
  return _DenyAll()
229
 
230
- # Example: policy = (region_allowed({"eastus"}) & ~resource_type_restricted({ResourceType.KUBERNETES_CLUSTER})) | cost_threshold(500)
231
-
232
  __all__ = [
233
  "Policy",
234
  "RegionAllowedPolicy",
 
 
1
  """
2
  Policy Algebra – Composable, typed policies for infrastructure governance.
 
 
 
 
 
 
 
 
3
  """
4
 
5
  from __future__ import annotations
 
10
 
11
  from pydantic import BaseModel
12
 
13
+ # Relative imports for intents
14
+ from .intents import (
15
  InfrastructureIntent,
16
  ProvisionResourceIntent,
17
  GrantAccessIntent,
 
20
  Environment,
21
  )
22
 
 
 
 
23
  Violation = str
24
  EvalResult = List[Violation]
25
 
 
 
 
26
  class Policy(ABC):
 
 
27
  @abstractmethod
28
  def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
 
29
  pass
30
 
 
 
 
31
  def __and__(self, other: Policy) -> Policy:
 
32
  return AndPolicy(self, other)
33
 
34
  def __or__(self, other: Policy) -> Policy:
 
35
  return OrPolicy(self, other)
36
 
37
  def __invert__(self) -> Policy:
 
38
  return NotPolicy(self)
39
 
 
 
 
40
  class AtomicPolicy(Policy, ABC):
 
41
  pass
42
 
43
  @dataclass(frozen=True)
44
  class RegionAllowedPolicy(AtomicPolicy):
 
45
  allowed_regions: Set[str]
46
 
47
  def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
 
52
 
53
  @dataclass(frozen=True)
54
  class ResourceTypeRestrictedPolicy(AtomicPolicy):
 
55
  forbidden_types: Set[ResourceType]
56
 
57
  def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
 
62
 
63
  @dataclass(frozen=True)
64
  class MaxPermissionLevelPolicy(AtomicPolicy):
 
65
  max_level: PermissionLevel
66
 
 
67
  _LEVEL_ORDER = {
68
  PermissionLevel.READ: 1,
69
  PermissionLevel.WRITE: 2,
 
78
 
79
  @dataclass(frozen=True)
80
  class CostThresholdPolicy(AtomicPolicy):
 
 
 
 
 
81
  max_cost_usd: float
82
 
83
  def evaluate(self, intent: InfrastructureIntent, cost: Optional[float] = None) -> EvalResult:
 
 
 
 
84
  return []
85
 
 
 
 
86
  @dataclass(frozen=True)
87
  class AndPolicy(Policy):
 
88
  left: Policy
89
  right: Policy
90
 
 
93
 
94
  @dataclass(frozen=True)
95
  class OrPolicy(Policy):
 
96
  left: Policy
97
  right: Policy
98
 
99
  def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
100
  left_violations = self.left.evaluate(intent)
101
  right_violations = self.right.evaluate(intent)
 
102
  if not left_violations or not right_violations:
103
  return []
 
104
  return left_violations + right_violations
105
 
106
  @dataclass(frozen=True)
107
  class NotPolicy(Policy):
 
108
  inner: Policy
109
 
110
  def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
111
  inner_violations = self.inner.evaluate(intent)
112
  if inner_violations:
113
+ return []
114
  else:
115
  return ["Condition not met (NOT policy)"]
116
 
 
 
 
117
  class PolicyEvaluator:
 
 
 
 
 
118
  def __init__(self, root_policy: Policy):
119
  self._root = root_policy
120
 
121
  def evaluate(self, intent: InfrastructureIntent, context: Optional[Dict[str, Any]] = None) -> EvalResult:
 
 
 
 
 
 
122
  return self._evaluate_recursive(self._root, intent, context or {})
123
 
124
  def _evaluate_recursive(self, policy: Policy, intent: InfrastructureIntent, context: Dict[str, Any]) -> EvalResult:
 
125
  if isinstance(policy, CostThresholdPolicy):
126
  cost = context.get('cost_estimate')
127
  if cost is not None and cost > policy.max_cost_usd:
128
  return [f"Cost ${cost:.2f} exceeds threshold ${policy.max_cost_usd:.2f}"]
129
  return []
 
130
  if isinstance(policy, AtomicPolicy):
131
  return policy.evaluate(intent)
 
132
  if isinstance(policy, AndPolicy):
133
  return self._evaluate_recursive(policy.left, intent, context) + self._evaluate_recursive(policy.right, intent, context)
134
  if isinstance(policy, OrPolicy):
 
142
  if inner:
143
  return []
144
  return ["Condition not met (NOT policy)"]
 
145
  return []
146
 
 
 
 
147
  def allow_all() -> Policy:
 
148
  class _AllowAll(AtomicPolicy):
149
  def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
150
  return []
151
  return _AllowAll()
152
 
153
  def deny_all() -> Policy:
 
154
  class _DenyAll(AtomicPolicy):
155
  def evaluate(self, intent: InfrastructureIntent) -> EvalResult:
156
  return ["Action denied by default policy"]
157
  return _DenyAll()
158
 
 
 
159
  __all__ = [
160
  "Policy",
161
  "RegionAllowedPolicy",