petter2025's picture
Upload folder using huggingface_hub
afa4de7 verified
raw
history blame
14.1 kB
"""
Risk service – integrates ARF risk engine, policy engine, and decision engine.
Deterministic, no random fallbacks, explicit error handling.
Version: 2026-05-04 – added Prometheus metrics for observability.
"""
import json
import logging
import os
import time
from typing import Optional, List, Dict, Any
from agentic_reliability_framework.core.governance.risk_engine import RiskEngine
from agentic_reliability_framework.core.governance.intents import InfrastructureIntent
from agentic_reliability_framework.core.models.event import ReliabilityEvent, HealingAction
from agentic_reliability_framework.core.governance.policy_engine import PolicyEngine
from agentic_reliability_framework.core.decision.decision_engine import DecisionEngine
from agentic_reliability_framework.runtime.memory.rag_graph import RAGGraphMemory
from agentic_reliability_framework.core.research.eclipse_probe import compute_epistemic_risk
# ── optional tracing ─────────────────────────────────────────
try:
from opentelemetry import trace
_tracer = trace.get_tracer(__name__)
OTEL_AVAILABLE = True
except ImportError:
OTEL_AVAILABLE = False
_tracer = None
# ── Prometheus metrics (always registered; no‑op if not scraped) ─
from prometheus_client import Counter, Histogram
_EVAL_COUNTER = Counter(
"arf_evaluations_total",
"Total evaluation calls (intent + healing), partitioned by engine and status.",
["engine", "status"],
)
_EVAL_DURATION = Histogram(
"arf_evaluation_duration_seconds",
"End‑to‑end latency of evaluation calls.",
["engine"],
buckets=(0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0),
)
_RUST_AGREEMENT = Counter(
"arf_rust_agreement_total",
"Agreement between Rust enforcer and Python policy evaluation.",
["result"], # "agreed" or "diverged"
)
# ── optional Rust enforcer (shadow mode) ──────────────────────
_RUST_ENFORCER_AVAILABLE = False
_rust_evaluator = None # singleton per process
_rust_policy_json: Optional[str] = None
if os.getenv("ARF_USE_RUST_ENFORCER", "false").lower() == "true":
try:
import arf_enforcer
_RUST_ENFORCER_AVAILABLE = True
except ImportError:
pass
# Default OSS policy tree – mirrors the hard‑coded rules in the Python PolicyEvaluator
# that check region, resource type, and max permission level.
_OSS_POLICY_TREE_JSON = json.dumps({
"And": [
{"Atomic": {"RegionAllowed": {"allowed_regions": ["eastus"]}}},
{"Atomic": {"ResourceTypeRestricted": {
"forbidden_types": ["DATABASE_DROP", "FULL_ROLLOUT", "SYSTEM_SHUTDOWN", "SECRET_ROTATION"]
}}},
{"Atomic": {"MaxPermissionLevel": {"max_level": "admin"}}}
]
})
def _ensure_rust_evaluator() -> bool:
"""Lazy initialise the Rust policy evaluator. Returns True on success."""
global _rust_evaluator, _rust_policy_json
if _rust_evaluator is not None:
return True
if not _RUST_ENFORCER_AVAILABLE:
return False
try:
_rust_policy_json = _OSS_POLICY_TREE_JSON
_rust_evaluator = arf_enforcer.PyPolicyEvaluator(_rust_policy_json)
return True
except Exception:
_rust_evaluator = None
return False
logger = logging.getLogger(__name__)
def evaluate_intent(
engine: RiskEngine,
intent: InfrastructureIntent,
cost_estimate: Optional[float],
policy_violations: List[str]
) -> dict:
"""
Evaluate an infrastructure intent using the Bayesian risk engine.
Optionally shadows the policy evaluation with the Rust enforcer when
the environment variable ARF_USE_RUST_ENFORCER is set to "true".
Any divergence is logged and counted as a Prometheus metric.
Parameters
----------
engine : RiskEngine
Initialised ARF Bayesian risk engine.
intent : InfrastructureIntent
The infrastructure request to evaluate.
cost_estimate : float or None
Estimated monthly cost (used by cost‑threshold policies).
policy_violations : list[str]
Pre‑computed policy violation strings (from the Python evaluator).
Returns
-------
dict
Keys: risk_score, explanation, contributions.
"""
t0 = time.monotonic()
span = None
if OTEL_AVAILABLE and _tracer:
span = _tracer.start_span("risk_service.evaluate_intent")
span.set_attribute("intent_type", type(intent).__name__)
# ── Shadow Rust enforcer (best‑effort, non‑blocking) ──────
if _RUST_ENFORCER_AVAILABLE and _ensure_rust_evaluator():
try:
rust_intent = {
"action": getattr(intent, "intent_type", "unknown"),
"component": getattr(intent, "service_name", "unknown"),
"region": getattr(intent, "region", None),
"resource_type": getattr(intent, "resource_type", None),
"permission_level": getattr(intent, "permission_level", None),
"extra": {}
}
rust_raw = _rust_evaluator.evaluate(
json.dumps(rust_intent), cost_estimate
)
rust_violations = json.loads(rust_raw)
agreed = set(rust_violations) == set(policy_violations)
_RUST_AGREEMENT.labels(result="agreed" if agreed else "diverged").inc()
if not agreed:
msg = (
"Rust enforcer divergence: "
f"Rust={sorted(rust_violations)} Python={sorted(policy_violations)}"
)
logger.warning(msg)
if span:
span.add_event("rust_enforcer_divergence", {
"rust_violations": rust_violations,
"python_violations": policy_violations
})
except Exception as exc:
logger.debug("Rust enforcer shadow evaluation failed: %s", exc)
# ── Core risk evaluation ──────────────────────────────────
# ── Automated canary promotion ──────────────────────────
if _RUST_ENFORCER_AVAILABLE and os.getenv("ARF_RUST_CANARY", "false").lower() == "true":
try:
from prometheus_client import REGISTRY
lower = REGISTRY.get_sample_value("arf_rust_agreement_lower_bound", {})
if lower is not None and lower > 0.9999:
policy_violations = rust_violations
if span:
span.set_attribute("rust_enforcer_active", True)
except Exception:
pass
try:
score, explanation, contributions = engine.calculate_risk(
intent=intent,
cost_estimate=cost_estimate,
policy_violations=policy_violations
)
engine_label = "python"
status = "success"
except Exception:
_EVAL_COUNTER.labels(engine="python", status="error").inc()
_EVAL_DURATION.labels(engine="python").observe(time.monotonic() - t0)
raise
_EVAL_COUNTER.labels(engine=engine_label, status=status).inc()
_EVAL_DURATION.labels(engine=engine_label).observe(time.monotonic() - t0)
if span:
span.set_attribute("risk_score", score)
if _RUST_ENFORCER_AVAILABLE:
span.set_attribute("rust_enforcer_available", True)
span.end()
return {
"risk_score": score,
"explanation": explanation,
"contributions": contributions
}
def evaluate_healing_decision(
event: ReliabilityEvent,
policy_engine: PolicyEngine,
decision_engine: Optional[DecisionEngine] = None,
rag_graph: Optional[RAGGraphMemory] = None,
model=None,
tokenizer=None,
) -> Dict[str, Any]:
"""
Evaluate healing actions for a given reliability event using decision‑theoretic selection.
Includes epistemic risk signals from the eclipse probe.
Parameters
----------
event : ReliabilityEvent
The incident event containing latency, error rate, etc.
policy_engine : PolicyEngine
The ARF healing policy engine with configured policies.
decision_engine : DecisionEngine, optional
If omitted, a default instance is created.
rag_graph : RAGGraphMemory, optional
Semantic memory for similar incident retrieval.
model, tokenizer : optional
HuggingFace model and tokenizer for epistemic risk computation.
Returns
-------
dict
Keys: risk_score, selected_action, expected_utility, alternatives,
explanation, epistemic_signals.
"""
t0 = time.monotonic()
span = None
if OTEL_AVAILABLE and _tracer:
span = _tracer.start_span("risk_service.evaluate_healing")
span.set_attribute("component", event.component)
# If decision_engine not provided, try to get from policy_engine
if decision_engine is None and hasattr(policy_engine, 'decision_engine'):
decision_engine = policy_engine.decision_engine
# If still None, create a minimal one (global stats only)
if decision_engine is None:
logger.debug("No DecisionEngine provided; creating default instance")
decision_engine = DecisionEngine(rag_graph=rag_graph)
# Get raw candidate actions (by temporarily disabling decision engine)
orig_use = policy_engine.use_decision_engine
try:
policy_engine.use_decision_engine = False
raw_actions = policy_engine.evaluate_policies(event)
finally:
policy_engine.use_decision_engine = orig_use
# If no actions, return NO_ACTION
if not raw_actions or raw_actions == [HealingAction.NO_ACTION]:
if span:
span.set_attribute("selected_action", HealingAction.NO_ACTION.value)
span.end()
_EVAL_COUNTER.labels(engine="python", status="success").inc()
_EVAL_DURATION.labels(engine="python").observe(time.monotonic() - t0)
return {
"risk_score": 0.0,
"selected_action": HealingAction.NO_ACTION.value,
"expected_utility": 0.0,
"alternatives": [],
"explanation": "No candidate actions triggered.",
"epistemic_signals": None,
}
# Build reasoning text from policies that triggered the actions
reasoning_parts = []
for policy in policy_engine.policies:
if any(a in policy.actions for a in raw_actions):
conditions_str = ", ".join(
f"{c.metric} {c.operator} {c.threshold}" for c in policy.conditions
)
reasoning_parts.append(
f"Policy {policy.name} triggered by {conditions_str} β†’ actions {[a.value for a in policy.actions]}"
)
reasoning_text = " ".join(reasoning_parts)
# Build evidence text from the event
evidence_text = (
f"Component: {event.component}, "
f"latency_p99: {event.latency_p99}, "
f"error_rate: {event.error_rate}, "
f"cpu_util: {event.cpu_util}, "
f"memory_util: {event.memory_util}"
)
# Compute epistemic signals (if model/tokenizer provided)
epistemic_signals = None
if model is not None and tokenizer is not None:
try:
epistemic_signals = compute_epistemic_risk(
reasoning_text, evidence_text, model, tokenizer
)
except Exception as e:
logger.error(f"Failed to compute epistemic risk: {e}")
epistemic_signals = {
"entropy": 0.0,
"contradiction": 0.0,
"evidence_lift": 0.0,
"hallucination_risk": 0.0,
}
else:
logger.debug("Epistemic model/tokenizer not provided; using zero signals")
epistemic_signals = {
"entropy": 0.0,
"contradiction": 0.0,
"evidence_lift": 0.0,
"hallucination_risk": 0.0,
}
# Run decision engine to get best action and alternatives
decision = decision_engine.select_optimal_action(
raw_actions, event, component=event.component,
epistemic_signals=epistemic_signals
)
# Extract risk of the selected action
risk_score = None
for alt in decision.alternatives:
if alt.action == decision.best_action:
risk_score = alt.risk
break
if risk_score is None:
# Compute risk separately
risk_score = decision_engine.compute_risk(
decision.best_action, event, event.component)
# Format alternatives (top 3 only)
alt_list = []
for alt in decision.alternatives[:3]:
alt_list.append({
"action": alt.action.value,
"expected_utility": alt.utility,
"risk": alt.risk,
})
# ── Metrics & span finalisation ───────────────────────────
_EVAL_COUNTER.labels(engine="python", status="success").inc()
_EVAL_DURATION.labels(engine="python").observe(time.monotonic() - t0)
if span:
span.set_attribute("risk_score", risk_score)
span.set_attribute("selected_action", decision.best_action.value)
span.set_attribute("expected_utility", decision.expected_utility)
span.end()
return {
"risk_score": risk_score,
"selected_action": decision.best_action.value,
"expected_utility": decision.expected_utility,
"alternatives": alt_list,
"explanation": decision.explanation,
"raw_decision": decision.raw_data,
"epistemic_signals": epistemic_signals,
}
def get_system_risk() -> float:
"""
Return an aggregated risk score across all monitored components.
This is a placeholder – the endpoint is deprecated.
Raises NotImplementedError to avoid random fallback.
"""
raise NotImplementedError(
"get_system_risk is deprecated. Use component‑level risk evaluation instead."
)