petter2025's picture
Create utils/arf_engine.py
ee7d63b verified
raw
history blame
22.2 kB
"""
ARF 3.3.9 Engine - PhD Level Implementation
Realistic scoring, psychological framing, enterprise simulation
"""
import random
import time
from datetime import datetime
from typing import Dict, List, Tuple
import numpy as np
class BayesianRiskModel:
"""Bayesian risk assessment with priors and confidence intervals"""
def __init__(self):
# Prior distributions for different action types
self.priors = {
"destructive": {"alpha": 2, "beta": 8}, # 20% base risk
"modification": {"alpha": 1, "beta": 9}, # 10% base risk
"readonly": {"alpha": 1, "beta": 99}, # 1% base risk
"deployment": {"alpha": 3, "beta": 7}, # 30% base risk
}
# Historical patterns
self.history = {
"DROP DATABASE": {"success": 5, "failure": 95},
"DELETE FROM": {"success": 10, "failure": 90},
"GRANT": {"success": 30, "failure": 70},
"UPDATE": {"success": 40, "failure": 60},
"DEPLOY": {"success": 60, "failure": 40},
}
def assess(self, action: str, context: Dict, historical_patterns: Dict = None) -> Dict:
"""Bayesian risk assessment"""
# Determine action type
action_type = self._classify_action(action)
# Get prior
prior = self.priors.get(action_type, self.priors["modification"])
# Get likelihood from historical data
action_key = self._extract_action_key(action)
historical = historical_patterns.get(action_key, {"success": 50, "failure": 50})
# Calculate posterior (simplified)
alpha_posterior = prior["alpha"] + historical["failure"]
beta_posterior = prior["beta"] + historical["success"]
# Expected risk score
risk_score = alpha_posterior / (alpha_posterior + beta_posterior)
# Add context-based adjustments
context_adjustment = self._assess_context(context)
risk_score *= context_adjustment
# Add realistic variance (never 0.0 or 1.0)
risk_score = max(0.25, min(0.95, risk_score + random.uniform(-0.1, 0.1)))
# Confidence interval
n = alpha_posterior + beta_posterior
confidence = min(0.99, 0.8 + (n / (n + 100)) * 0.19)
return {
"score": risk_score,
"confidence": confidence,
"action_type": action_type,
"risk_factors": self._extract_risk_factors(action, context)
}
def _classify_action(self, action: str) -> str:
"""Classify action type"""
action_lower = action.lower()
if any(word in action_lower for word in ["drop", "delete", "truncate", "remove"]):
return "destructive"
elif any(word in action_lower for word in ["update", "alter", "modify", "change"]):
return "modification"
elif any(word in action_lower for word in ["deploy", "execute", "run", "train"]):
return "deployment"
elif any(word in action_lower for word in ["grant", "revoke", "permission"]):
return "modification"
else:
return "readonly"
def _extract_action_key(self, action: str) -> str:
"""Extract key action identifier"""
words = action.split()
if len(words) > 0:
return words[0].upper()
return "UNKNOWN"
def _assess_context(self, context: Dict) -> float:
"""Assess context risk multiplier"""
multiplier = 1.0
context_str = str(context).lower()
# Time-based risk
if "2am" in context_str or "night" in context_str:
multiplier *= 1.3
# User-based risk
if "junior" in context_str or "intern" in context_str:
multiplier *= 1.4
elif "senior" in context_str or "lead" in context_str:
multiplier *= 0.8
# Environment-based risk
if "production" in context_str or "prod" in context_str:
multiplier *= 1.5
elif "staging" in context_str:
multiplier *= 1.2
elif "development" in context_str:
multiplier *= 0.7
# Backup status
if "backup" in context_str and ("old" in context_str or "no" in context_str):
multiplier *= 1.4
elif "backup" in context_str and ("fresh" in context_str or "recent" in context_str):
multiplier *= 0.9
return multiplier
def _extract_risk_factors(self, action: str, context: Dict) -> List[str]:
"""Extract specific risk factors"""
factors = []
action_lower = action.lower()
context_str = str(context).lower()
if "drop" in action_lower and "database" in action_lower:
factors.append("Irreversible data destruction")
factors.append("Potential service outage")
if "delete" in action_lower:
factors.append("Data loss risk")
if "where" not in action_lower:
factors.append("No WHERE clause (mass deletion)")
if "production" in context_str:
factors.append("Production environment")
if "junior" in context_str:
factors.append("Junior operator")
if "2am" in context_str:
factors.append("Off-hours operation")
return factors[:3] # Return top 3 factors
class PolicyEngine:
"""Hierarchical policy evaluation engine"""
def __init__(self):
self.policies = {
"destructive": {
"risk_threshold": 0.3,
"required_approvals": 2,
"backup_required": True
},
"modification": {
"risk_threshold": 0.5,
"required_approvals": 1,
"backup_required": False
},
"deployment": {
"risk_threshold": 0.4,
"required_approvals": 1,
"tests_required": True
},
"readonly": {
"risk_threshold": 0.8,
"required_approvals": 0,
"backup_required": False
}
}
def evaluate(self, action: str, risk_profile: Dict, confidence_threshold: float = 0.7) -> Dict:
"""Evaluate action against policies"""
action_type = risk_profile.get("action_type", "modification")
risk_score = risk_profile.get("score", 0.5)
policy = self.policies.get(action_type, self.policies["modification"])
# Policy compliance check
if risk_score > policy["risk_threshold"]:
compliance = "HIGH_RISK"
recommendation = f"Requires {policy['required_approvals']} approval(s)"
if policy.get("backup_required", False):
recommendation += " and verified backup"
else:
compliance = "WITHIN_POLICY"
recommendation = "Within policy limits"
# Confidence check
confidence = risk_profile.get("confidence", 0.5)
if confidence < confidence_threshold:
compliance = "LOW_CONFIDENCE"
recommendation = "Low confidence score - manual review recommended"
return {
"compliance": compliance,
"recommendation": recommendation,
"policy_type": action_type,
"risk_threshold": policy["risk_threshold"],
"actual_risk": risk_score
}
class LicenseManager:
"""Psychology-enhanced license manager"""
def __init__(self):
self.license_patterns = {
"trial": r"ARF-TRIAL-[A-Z0-9]{8}",
"starter": r"ARF-STARTER-[A-Z0-9]{8}",
"professional": r"ARF-PRO-[A-Z0-9]{8}",
"enterprise": r"ARF-ENTERPRISE-[A-Z0-9]{8}"
}
self.tier_features = {
"oss": {
"name": "OSS Edition",
"color": "#1E88E5",
"enforcement": "advisory",
"gates": 0,
"support": "community"
},
"trial": {
"name": "Trial Edition",
"color": "#FFB300",
"enforcement": "mechanical",
"gates": 3,
"support": "email",
"days_remaining": 14
},
"starter": {
"name": "Starter Edition",
"color": "#FF9800",
"enforcement": "mechanical",
"gates": 3,
"support": "business_hours",
"price": "$2,000/mo"
},
"professional": {
"name": "Professional Edition",
"color": "#FF6F00",
"enforcement": "mechanical",
"gates": 5,
"support": "24/7",
"price": "$5,000/mo"
},
"enterprise": {
"name": "Enterprise Edition",
"color": "#D84315",
"enforcement": "mechanical",
"gates": 7,
"support": "dedicated",
"price": "$15,000/mo"
}
}
def validate(self, license_key: str = None, action_risk: float = 0.5) -> Dict:
"""Validate license and return tier info"""
if not license_key:
return self.tier_features["oss"]
# Check license patterns
license_upper = license_key.upper()
if "ARF-TRIAL" in license_upper:
tier = "trial"
elif "ARF-STARTER" in license_upper:
tier = "starter"
elif "ARF-PRO" in license_upper:
tier = "professional"
elif "ARF-ENTERPRISE" in license_upper:
tier = "enterprise"
else:
tier = "oss"
# Get tier features
features = self.tier_features.get(tier, self.tier_features["oss"]).copy()
# Add psychological elements
if tier == "trial":
features["scarcity"] = f"⏳ {features.get('days_remaining', 14)} days remaining"
features["social_proof"] = "Join 1,000+ developers using ARF"
return features
class MechanicalGateEvaluator:
"""Mechanical gate evaluation engine"""
def __init__(self):
self.gates = {
"risk_assessment": {"weight": 0.3, "required": True},
"policy_compliance": {"weight": 0.3, "required": True},
"resource_check": {"weight": 0.2, "required": False},
"approval_workflow": {"weight": 0.1, "required": False},
"audit_trail": {"weight": 0.1, "required": False}
}
def evaluate(self, risk_profile: Dict, policy_result: Dict, license_info: Dict) -> Dict:
"""Evaluate mechanical gates"""
gate_results = []
total_score = 0
max_score = 0
# Gate 1: Risk Assessment
risk_gate = self._evaluate_risk_gate(risk_profile)
gate_results.append(risk_gate)
total_score += risk_gate["score"] * self.gates["risk_assessment"]["weight"]
max_score += self.gates["risk_assessment"]["weight"]
# Gate 2: Policy Compliance
policy_gate = self._evaluate_policy_gate(policy_result)
gate_results.append(policy_gate)
total_score += policy_gate["score"] * self.gates["policy_compliance"]["weight"]
max_score += self.gates["policy_compliance"]["weight"]
# Additional gates based on license tier
license_tier = license_info.get("name", "OSS Edition").lower()
if "trial" in license_tier or "starter" in license_tier:
# Gate 3: Resource Check
resource_gate = self._evaluate_resource_gate(risk_profile)
gate_results.append(resource_gate)
total_score += resource_gate["score"] * self.gates["resource_check"]["weight"]
max_score += self.gates["resource_check"]["weight"]
if "professional" in license_tier or "enterprise" in license_tier:
# Gate 4: Approval Workflow
approval_gate = self._evaluate_approval_gate(policy_result)
gate_results.append(approval_gate)
total_score += approval_gate["score"] * self.gates["approval_workflow"]["weight"]
max_score += self.gates["approval_workflow"]["weight"]
# Gate 5: Audit Trail
audit_gate = self._evaluate_audit_gate()
gate_results.append(audit_gate)
total_score += audit_gate["score"] * self.gates["audit_trail"]["weight"]
max_score += self.gates["audit_trail"]["weight"]
# Calculate overall score
overall_score = total_score / max_score if max_score > 0 else 0
# Decision authority
decision = self._calculate_decision_authority(gate_results, license_tier, overall_score)
return {
"gate_results": gate_results,
"overall_score": overall_score,
"decision": decision,
"gates_passed": len([g for g in gate_results if g["passed"]]),
"total_gates": len(gate_results)
}
def _evaluate_risk_gate(self, risk_profile: Dict) -> Dict:
"""Evaluate risk assessment gate"""
risk_score = risk_profile.get("score", 0.5)
confidence = risk_profile.get("confidence", 0.5)
passed = risk_score < 0.7 and confidence > 0.6
score = (0.7 - min(risk_score, 0.7)) / 0.7 * 0.5 + (confidence - 0.6) / 0.4 * 0.5
return {
"name": "Risk Assessment",
"passed": passed,
"score": max(0, min(1, score)),
"details": f"Risk: {risk_score:.1%}, Confidence: {confidence:.1%}"
}
def _evaluate_policy_gate(self, policy_result: Dict) -> Dict:
"""Evaluate policy compliance gate"""
compliance = policy_result.get("compliance", "HIGH_RISK")
risk_threshold = policy_result.get("risk_threshold", 0.5)
actual_risk = policy_result.get("actual_risk", 0.5)
passed = compliance != "HIGH_RISK"
score = 1.0 if passed else (risk_threshold / actual_risk if actual_risk > 0 else 0)
return {
"name": "Policy Compliance",
"passed": passed,
"score": max(0, min(1, score)),
"details": f"Compliance: {compliance}"
}
def _evaluate_resource_gate(self, risk_profile: Dict) -> Dict:
"""Evaluate resource check gate"""
# Simulate resource availability check
passed = random.random() > 0.3 # 70% chance of passing
score = 0.8 if passed else 0.3
return {
"name": "Resource Check",
"passed": passed,
"score": score,
"details": "Resources available" if passed else "Resource constraints detected"
}
def _evaluate_approval_gate(self, policy_result: Dict) -> Dict:
"""Evaluate approval workflow gate"""
# Simulate approval workflow
passed = random.random() > 0.2 # 80% chance of passing
score = 0.9 if passed else 0.2
return {
"name": "Approval Workflow",
"passed": passed,
"score": score,
"details": "Approvals verified" if passed else "Pending approvals"
}
def _evaluate_audit_gate(self) -> Dict:
"""Evaluate audit trail gate"""
# Always passes for demo
return {
"name": "Audit Trail",
"passed": True,
"score": 1.0,
"details": "Audit trail generated"
}
def _calculate_decision_authority(self, gate_results: List[Dict], license_tier: str, overall_score: float) -> str:
"""Calculate decision authority"""
required_gates = [g for g in gate_results if self.gates.get(g["name"].lower().replace(" ", "_"), {}).get("required", False)]
passed_required = all(g["passed"] for g in required_gates)
if not passed_required:
return "BLOCKED"
# Decision thresholds based on license tier
thresholds = {
"oss": 1.0, # Never autonomous
"trial": 0.9,
"starter": 0.85,
"professional": 0.8,
"enterprise": 0.75
}
tier_key = "oss"
for key in ["trial", "starter", "professional", "enterprise"]:
if key in license_tier:
tier_key = key
break
threshold = thresholds.get(tier_key, 1.0)
if overall_score >= threshold:
return "AUTONOMOUS"
else:
return "HUMAN_APPROVAL"
class ARFEngine:
"""Enterprise-grade reliability engine with psychological optimization"""
def __init__(self):
self.risk_model = BayesianRiskModel()
self.policy_engine = PolicyEngine()
self.license_manager = LicenseManager()
self.gate_evaluator = MechanicalGateEvaluator()
self.stats = {
"actions_tested": 0,
"risks_prevented": 0,
"time_saved_minutes": 0,
"trial_requests": 0,
"start_time": time.time()
}
self.history = []
def assess_action(self, action: str, context: Dict, license_key: str = None) -> Dict:
"""Comprehensive action assessment with psychological framing"""
start_time = time.time()
# 1. Multi-dimensional risk assessment
risk_profile = self.risk_model.assess(
action=action,
context=context,
historical_patterns=self.risk_model.history
)
# 2. Policy evaluation with confidence intervals
policy_result = self.policy_engine.evaluate(
action=action,
risk_profile=risk_profile,
confidence_threshold=0.7
)
# 3. License validation with tier-specific gates
license_info = self.license_manager.validate(
license_key,
action_risk=risk_profile["score"]
)
# 4. Mechanical gate evaluation
gate_results = self.gate_evaluator.evaluate(
risk_profile=risk_profile,
policy_result=policy_result,
license_info=license_info
)
# 5. Generate recommendation
recommendation = self._generate_recommendation(
risk_profile, policy_result, license_info, gate_results
)
# 6. Calculate processing time
processing_time = (time.time() - start_time) * 1000 # ms
# Update statistics
if risk_profile["score"] > 0.5:
self.stats["risks_prevented"] += 1
# Store in history
self.history.append({
"action": action,
"risk_score": risk_profile["score"],
"timestamp": datetime.now().isoformat(),
"license_tier": license_info.get("name", "OSS")
})
# Keep only last 100 entries
if len(self.history) > 100:
self.history = self.history[-100:]
return {
"risk_score": risk_profile["score"],
"risk_factors": risk_profile["risk_factors"],
"confidence": risk_profile["confidence"],
"recommendation": recommendation,
"policy_compliance": policy_result["compliance"],
"license_tier": license_info["name"],
"gate_decision": gate_results["decision"],
"gates_passed": gate_results["gates_passed"],
"total_gates": gate_results["total_gates"],
"processing_time_ms": processing_time,
"stats": self.get_stats()
}
def _generate_recommendation(self, risk_profile: Dict, policy_result: Dict,
license_info: Dict, gate_results: Dict) -> str:
"""Generate psychological recommendation"""
risk_score = risk_profile["score"]
decision = gate_results["decision"]
tier = license_info["name"]
if tier == "OSS Edition":
if risk_score > 0.7:
return "🚨 HIGH RISK: This action would be BLOCKED by mechanical gates. Consider Enterprise for protection."
elif risk_score > 0.4:
return "⚠️ MODERATE RISK: Requires manual review. Mechanical gates would automate this check."
else:
return "✅ LOW RISK: Action appears safe. Mechanical gates provide additional verification."
else:
if decision == "BLOCKED":
return "❌ BLOCKED: Action prevented by mechanical gates. Risk factors: " + ", ".join(risk_profile["risk_factors"][:2])
elif decision == "HUMAN_APPROVAL":
return "🔄 REQUIRES APPROVAL: Action meets risk threshold. Routing to human approver."
else: # AUTONOMOUS
return "✅ APPROVED: Action passes all mechanical gates and is proceeding autonomously."
def update_stats(self, stat_type: str, value: int = 1):
"""Update statistics"""
if stat_type in self.stats:
self.stats[stat_type] += value
# Update time saved (15 minutes per action)
if stat_type == "actions_tested":
self.stats["time_saved_minutes"] += 15
def get_stats(self) -> Dict:
"""Get current statistics"""
elapsed_hours = (time.time() - self.stats["start_time"]) / 3600
actions_per_hour = self.stats["actions_tested"] / max(elapsed_hours, 0.1)
return {
**self.stats,
"actions_per_hour": round(actions_per_hour, 1),
"reliability_score": min(99.9, 95 + (self.stats["risks_prevented"] / max(self.stats["actions_tested"], 1)) * 5),
"history_size": len(self.history)
}