diff --git "a/hf_demo.py" "b/hf_demo.py" --- "a/hf_demo.py" +++ "b/hf_demo.py" @@ -1,8 +1,7 @@ """ Hugging Face Spaces Demo for Agentic Reliability Framework (ARF) Version: 3.3.9 OSS vs Enterprise -Author: Petter Reinhardt -Demo URL: https://huggingface.co/spaces/petter2025/agentic-reliability-framework +Proper import structure based on actual repository """ import gradio as gr @@ -14,935 +13,1828 @@ from datetime import datetime, timedelta import json import time import uuid -from typing import Dict, List, Any, Optional +from typing import Dict, List, Any, Optional, Tuple import hashlib import random +import sys +import traceback -# Import REAL ARF 3.3.9 OSS components -from agentic_reliability_framework.models import ( - ReliabilityEvent, - HealingPolicy, - PolicyAction, - AgentContext, - AnomalyScore, - ReliabilityScore -) +print("=" * 60) +print("đ¤ ARF 3.3.9 Hugging Face Spaces Demo - Starting") +print("=" * 60) -from agentic_reliability_framework.healing_policies import ( - PolicyEngine, - PolicyViolation, - HealingAction -) +# Try to import ARF with multiple possible import paths +ARF_AVAILABLE = False +ARF_MODULES = {} -from agentic_reliability_framework.engine.reliability import ( - ReliabilityEngine, - ConfidenceScore, - RiskAssessment -) +try: + # Try direct import + from agentic_reliability_framework.models import ( + ReliabilityEvent, + HealingPolicy + ) + from agentic_reliability_framework.healing_policies import PolicyEngine + from agentic_reliability_framework.engine.reliability import ReliabilityEngine + from agentic_reliability_framework import __version__ + + ARF_AVAILABLE = True + ARF_VERSION = __version__.__version__ + print(f"â ARF {ARF_VERSION} successfully imported") + + # Store imported modules + ARF_MODULES = { + 'ReliabilityEvent': ReliabilityEvent, + 'HealingPolicy': HealingPolicy, + 'PolicyEngine': PolicyEngine, + 'ReliabilityEngine': ReliabilityEngine, + 'version': ARF_VERSION + } + +except ImportError as e: + print(f"â ī¸ Import error: {e}") + print("Creating mock implementations for demo...") + + # Create comprehensive mock implementations + class MockReliabilityEvent: + def __init__(self, event_id: str, agent_id: str, action: str, + timestamp: datetime, context: Dict = None): + self.event_id = event_id + self.agent_id = agent_id + self.action = action + self.timestamp = timestamp + self.context = context or {} + self.metadata = {"source": "demo", "environment": "testing"} + + def to_dict(self) -> Dict: + return { + 'event_id': self.event_id, + 'agent_id': self.agent_id, + 'action': self.action, + 'timestamp': self.timestamp.isoformat(), + 'context': self.context, + 'metadata': self.metadata + } + + class MockHealingPolicy: + class PolicyAction: + ALLOW = "allow" + HEAL = "heal" + BLOCK = "block" + WARN = "warn" + + def __init__(self, id: str, name: str, description: str, + conditions: Dict, action: str, priority: int = 1): + self.id = id + self.name = name + self.description = description + self.conditions = conditions + self.action = action + self.priority = priority + self.created_at = datetime.now() + self.updated_at = datetime.now() + + def evaluate(self, event: 'MockReliabilityEvent', score: float) -> Tuple[bool, str]: + """Evaluate if policy conditions are met""" + action_lower = event.action.lower() + context = event.context + + # Check for high-risk patterns + high_risk_keywords = ['drop database', 'delete from', 'truncate', 'rm -rf', 'format'] + if any(keyword in action_lower for keyword in high_risk_keywords): + return True, f"High-risk action detected: {event.action}" + + # Check for production environment + if context.get('environment') == 'production' and 'deploy' in action_lower: + if score < 0.8: + return True, f"Low confidence ({score:.2f}) for production deployment" + + return False, "" + + class MockPolicyEngine: + def __init__(self): + self.policies = [] + self.load_default_policies() + + def load_default_policies(self): + """Load default demo policies""" + self.policies = [ + MockHealingPolicy( + id="policy-001", + name="High-Risk Database Prevention", + description="Prevents irreversible database operations", + conditions={"risk_threshold": 0.7}, + action=MockHealingPolicy.PolicyAction.BLOCK, + priority=1 + ), + MockHealingPolicy( + id="policy-002", + name="Production Deployment Safety", + description="Ensures safe deployments to production", + conditions={"confidence_threshold": 0.8, "environment": "production"}, + action=MockHealingPolicy.PolicyAction.HEAL, + priority=2 + ), + MockHealingPolicy( + id="policy-003", + name="Sensitive Data Access", + description="Controls access to sensitive data", + conditions={"data_classification": ["pci", "pii", "phi"]}, + action=MockHealingPolicy.PolicyAction.WARN, + priority=3 + ) + ] + + def evaluate(self, event: MockReliabilityEvent, reliability_score: float) -> List[Dict]: + """Evaluate all policies against event""" + violations = [] + + for policy in self.policies: + triggered, reason = policy.evaluate(event, reliability_score) + if triggered: + violations.append({ + 'policy_id': policy.id, + 'policy_name': policy.name, + 'action': policy.action, + 'reason': reason, + 'priority': policy.priority + }) + + return violations + + class MockReliabilityEngine: + def __init__(self): + self.scoring_factors = { + 'syntax_complexity': 0.2, + 'risk_keywords': 0.3, + 'environment_risk': 0.2, + 'historical_safety': 0.2, + 'time_of_day': 0.1 + } + + def calculate_score(self, event: MockReliabilityEvent) -> Dict: + """Calculate reliability score for an event""" + action = event.action.lower() + context = event.context + + # Calculate individual factors + syntax_score = self._calculate_syntax_complexity(action) + risk_score = self._calculate_risk_keywords(action) + env_score = self._calculate_environment_risk(context) + historical_score = 0.8 # Default historical safety + time_score = self._calculate_time_of_day() + + # Weighted sum + overall = ( + syntax_score * self.scoring_factors['syntax_complexity'] + + risk_score * self.scoring_factors['risk_keywords'] + + env_score * self.scoring_factors['environment_risk'] + + historical_score * self.scoring_factors['historical_safety'] + + time_score * self.scoring_factors['time_of_day'] + ) + + # Confidence based on factors considered + confidence = min(0.95, overall * 1.1) # Slightly optimistic + + return { + 'overall': round(overall, 3), + 'confidence': round(confidence, 3), + 'risk_score': round(1 - overall, 3), + 'components': { + 'syntax_complexity': syntax_score, + 'risk_keywords': risk_score, + 'environment_risk': env_score, + 'historical_safety': historical_score, + 'time_of_day': time_score + } + } + + def _calculate_syntax_complexity(self, action: str) -> float: + """Calculate complexity score based on action syntax""" + words = len(action.split()) + if words <= 3: + return 0.9 # Simple commands are safer + elif words <= 6: + return 0.7 + else: + return 0.5 # Complex commands are riskier + + def _calculate_risk_keywords(self, action: str) -> float: + """Calculate risk based on keywords""" + high_risk = ['drop', 'delete', 'truncate', 'remove', 'format', 'rm'] + medium_risk = ['update', 'alter', 'modify', 'change', 'grant', 'revoke'] + safe = ['select', 'read', 'get', 'fetch', 'list', 'show'] + + action_lower = action.lower() + + for word in high_risk: + if word in action_lower: + return 0.3 # High risk + + for word in medium_risk: + if word in action_lower: + return 0.6 # Medium risk + + for word in safe: + if word in action_lower: + return 0.9 # Safe + + return 0.7 # Default + + def _calculate_environment_risk(self, context: Dict) -> float: + """Calculate risk based on environment""" + env = context.get('environment', 'development').lower() + + env_scores = { + 'production': 0.4, + 'staging': 0.7, + 'testing': 0.8, + 'development': 0.9, + 'sandbox': 0.95 + } + + return env_scores.get(env, 0.7) + + def _calculate_time_of_day(self) -> float: + """Calculate risk based on time of day""" + hour = datetime.now().hour + + # Business hours are safer (9 AM - 6 PM) + if 9 <= hour < 18: + return 0.9 + elif 18 <= hour < 22: + return 0.7 # Evening + else: + return 0.5 # Night/early morning + + # Store mock modules + ARF_MODULES = { + 'ReliabilityEvent': MockReliabilityEvent, + 'HealingPolicy': MockHealingPolicy, + 'PolicyEngine': MockPolicyEngine, + 'ReliabilityEngine': MockReliabilityEngine, + 'version': '3.3.9 (Mock)' + } + + print("â Mock ARF implementations created successfully") + +print("=" * 60) -# Mock Enterprise Components (for demonstration) +# Now use ARF_MODULES for all operations +ReliabilityEvent = ARF_MODULES['ReliabilityEvent'] +HealingPolicy = ARF_MODULES['HealingPolicy'] +PolicyEngine = ARF_MODULES['PolicyEngine'] +ReliabilityEngine = ARF_MODULES['ReliabilityEngine'] +ARF_VERSION = ARF_MODULES['version'] + +# Mock Enterprise Components class MockEnterpriseLicenseManager: """Simulates enterprise license validation with mechanical gates""" def __init__(self): self.license_tiers = { - "trial": {"max_agents": 3, "enforcement": "advisory", "price": 0}, - "starter": {"max_agents": 10, "enforcement": "human_approval", "price": 2000}, - "professional": {"max_agents": 50, "enforcement": "autonomous", "price": 5000}, - "enterprise": {"max_agents": 1000, "enforcement": "full_mechanical", "price": 15000} + "trial": { + "name": "Trial", + "price": 0, + "enforcement": "advisory", + "max_agents": 3, + "features": ["Basic risk assessment", "7-day trial"], + "gates_enabled": ["confidence_threshold", "risk_assessment"] + }, + "starter": { + "name": "Starter", + "price": 2000, + "enforcement": "human_approval", + "max_agents": 10, + "features": ["Human-in-loop gates", "Basic audit trail", "Email support"], + "gates_enabled": ["license_validation", "confidence_threshold", "risk_assessment", "admin_approval"] + }, + "professional": { + "name": "Professional", + "price": 5000, + "enforcement": "autonomous", + "max_agents": 50, + "features": ["Autonomous execution", "Advanced audit", "Priority support", "SLA 99.5%"], + "gates_enabled": ["license_validation", "confidence_threshold", "risk_assessment", + "rollback_feasibility", "budget_check", "auto_scale"] + }, + "enterprise": { + "name": "Enterprise", + "price": 15000, + "enforcement": "full_mechanical", + "max_agents": 1000, + "features": ["Full mechanical enforcement", "Compliance automation", + "Custom gates", "24/7 support", "SLA 99.9%", "Differential privacy"], + "gates_enabled": ["license_validation", "confidence_threshold", "risk_assessment", + "rollback_feasibility", "compliance_check", "budget_check", + "custom_gates", "emergency_override", "audit_logging"] + } } + self.active_licenses = {} + self._create_demo_licenses() - def validate_license(self, license_key: str, action_type: str, risk_score: float) -> Dict: - """Validate enterprise license and mechanical gate requirements""" - license_data = self.active_licenses.get(license_key, {"tier": "trial", "expires": datetime.now() + timedelta(days=14)}) - tier = license_data["tier"] - tier_info = self.license_tiers[tier] - - # Simulate mechanical gate checks - gates = { - "license_valid": True, - "tier_appropriate": tier in ["professional", "enterprise"] if risk_score > 0.7 else True, - "within_limits": True, - "payment_current": True + def _create_demo_licenses(self): + """Create demo licenses for testing""" + self.active_licenses = { + "ARF-TRIAL-DEMO123": { + "tier": "trial", + "email": "demo@arf.dev", + "expires": datetime.now() + timedelta(days=14), + "created": datetime.now(), + "features": self.license_tiers["trial"]["features"] + }, + "ARF-STARTER-456XYZ": { + "tier": "starter", + "email": "customer@example.com", + "expires": datetime.now() + timedelta(days=365), + "created": datetime.now() - timedelta(days=30), + "features": self.license_tiers["starter"]["features"] + }, + "ARF-PRO-789ABC": { + "tier": "professional", + "email": "pro@company.com", + "expires": datetime.now() + timedelta(days=365), + "created": datetime.now() - timedelta(days=60), + "features": self.license_tiers["professional"]["features"] + } } + + def validate_license(self, license_key: str) -> Dict: + """Validate enterprise license""" + if not license_key: + return { + "valid": False, + "tier": "none", + "enforcement": "none", + "message": "No license provided - using OSS mode", + "gates_available": [] + } - passed_gates = sum(gates.values()) - total_gates = len(gates) + license_data = self.active_licenses.get(license_key) + + if license_data: + tier = license_data["tier"] + tier_info = self.license_tiers[tier] + + return { + "valid": True, + "tier": tier, + "name": tier_info["name"], + "enforcement": tier_info["enforcement"], + "gates_available": tier_info["gates_enabled"], + "features": tier_info["features"], + "expires": license_data["expires"].strftime("%Y-%m-%d"), + "message": f"{tier_info['name']} license active" + } + + # Check if it's a trial license pattern + if license_key.startswith("ARF-TRIAL-"): + return { + "valid": True, + "tier": "trial", + "name": "Trial", + "enforcement": "advisory", + "gates_available": self.license_tiers["trial"]["gates_enabled"], + "features": self.license_tiers["trial"]["features"], + "expires": (datetime.now() + timedelta(days=14)).strftime("%Y-%m-%d"), + "message": "Trial license activated (14 days)" + } return { - "valid": True, - "tier": tier, - "enforcement_level": tier_info["enforcement"], - "gates_passed": passed_gates, - "total_gates": total_gates, - "gates": gates, - "can_execute": tier_info["enforcement"] != "advisory" and passed_gates == total_gates + "valid": False, + "tier": "invalid", + "enforcement": "none", + "message": "Invalid license key", + "gates_available": [] } + + def generate_trial_license(self, email: str) -> Dict: + """Generate a new trial license""" + license_hash = hashlib.sha256(f"{email}{datetime.now().isoformat()}".encode()).hexdigest() + license_key = f"ARF-TRIAL-{license_hash[:8].upper()}" + + self.active_licenses[license_key] = { + "tier": "trial", + "email": email, + "expires": datetime.now() + timedelta(days=14), + "created": datetime.now(), + "features": self.license_tiers["trial"]["features"] + } + + return { + "success": True, + "license_key": license_key, + "tier": "trial", + "expires": (datetime.now() + timedelta(days=14)).strftime("%Y-%m-%d"), + "features": self.license_tiers["trial"]["features"], + "message": "14-day trial license generated successfully" + } + + def get_tier_comparison(self) -> List[Dict]: + """Get comparison of all license tiers""" + comparison = [] + for tier_key, tier_info in self.license_tiers.items(): + comparison.append({ + "tier": tier_key, + "name": tier_info["name"], + "price": f"${tier_info['price']:,}/mo", + "enforcement": tier_info["enforcement"].replace("_", " ").title(), + "max_agents": tier_info["max_agents"], + "gates": len(tier_info["gates_enabled"]), + "features": tier_info["features"][:3] # First 3 features + }) + return comparison class MockExecutionAuthorityService: """Simulates mechanical gate enforcement for Enterprise""" def __init__(self): self.gate_definitions = { - "license_validation": {"weight": 0.3, "required": True}, - "confidence_threshold": {"weight": 0.25, "required": True}, - "risk_assessment": {"weight": 0.25, "required": True}, - "rollback_feasibility": {"weight": 0.1, "required": False}, - "admin_approval": {"weight": 0.1, "required": tier == "starter"}, - "compliance_check": {"weight": 0.1, "required": tier == "enterprise"} + "license_validation": { + "description": "Validate enterprise license is active", + "weight": 0.2, + "required": True, + "enterprise_only": True + }, + "confidence_threshold": { + "description": "Confidence score âĨ 0.70", + "weight": 0.25, + "required": True, + "threshold": 0.7 + }, + "risk_assessment": { + "description": "Risk score ⤠0.80", + "weight": 0.25, + "required": True, + "threshold": 0.8 + }, + "rollback_feasibility": { + "description": "Rollback plan exists and is feasible", + "weight": 0.1, + "required": False, + "enterprise_only": False + }, + "admin_approval": { + "description": "Human approval for sensitive actions", + "weight": 0.1, + "required": False, + "enterprise_only": True + }, + "compliance_check": { + "description": "Compliance with regulations (GDPR, PCI, etc.)", + "weight": 0.05, + "required": False, + "enterprise_only": True, + "tiers": ["enterprise"] + }, + "budget_check": { + "description": "Within budget limits and forecasts", + "weight": 0.05, + "required": False, + "enterprise_only": True + } } + + self.audit_log = [] - def evaluate_gates(self, action_data: Dict, license_tier: str) -> Dict: + def evaluate_gates(self, action_data: Dict, license_info: Dict) -> Dict: """Evaluate all mechanical gates for an action""" - results = {} - confidence = action_data.get("confidence", 0.5) - risk_score = action_data.get("risk_score", 0.5) + gate_results = {} + required_gates = [] + optional_gates = [] - # Evaluate each gate - results["license_validation"] = { - "passed": True, - "message": "Professional license validated", - "details": {"license_tier": license_tier} - } + # Determine which gates to evaluate based on license + license_tier = license_info.get("tier", "none") + gates_available = license_info.get("gates_available", []) - results["confidence_threshold"] = { - "passed": confidence >= 0.7, - "message": f"Confidence {confidence:.2f} âĨ 0.70" if confidence >= 0.7 else f"Confidence {confidence:.2f} < 0.70", - "details": {"threshold": 0.7, "actual": confidence} - } + for gate_name, gate_def in self.gate_definitions.items(): + # Check if gate should be evaluated + should_evaluate = ( + gate_name in gates_available or + not gate_def.get("enterprise_only", False) + ) + + if should_evaluate: + if gate_def["required"]: + required_gates.append(gate_name) + else: + optional_gates.append(gate_name) + + # Evaluate the gate + gate_results[gate_name] = self._evaluate_single_gate( + gate_name, gate_def, action_data, license_tier + ) - results["risk_assessment"] = { - "passed": risk_score <= 0.8, - "message": f"Risk {risk_score:.2f} ⤠0.80" if risk_score <= 0.8 else f"Risk {risk_score:.2f} > 0.80", - "details": {"threshold": 0.8, "actual": risk_score} - } + # Calculate overall status + passed_gates = sum(1 for r in gate_results.values() if r["passed"]) + total_gates = len(gate_results) - results["rollback_feasibility"] = { - "passed": True, - "message": "Rollback plan exists", - "details": {"rollback_time": "2 minutes"} - } + # Check if all required gates passed + all_required_passed = all( + gate_results[gate]["passed"] + for gate in required_gates + if gate in gate_results + ) - if license_tier == "starter": - results["admin_approval"] = { - "passed": True, - "message": "Admin approval granted", - "details": {"approver": "admin@company.com"} - } + # Determine execution authority + if not license_info.get("valid", False): + execution_authority = "OSS_ONLY" + elif license_tier == "trial": + execution_authority = "ADVISORY_ONLY" + elif all_required_passed: + if license_tier in ["professional", "enterprise"]: + execution_authority = "AUTONOMOUS_EXECUTION" + else: + execution_authority = "HUMAN_APPROVAL_REQUIRED" + else: + execution_authority = "BLOCKED" - # Calculate overall gate status - passed_gates = sum(1 for gate in results.values() if gate["passed"]) - total_gates = len(results) + # Log to audit trail + audit_entry = self._log_to_audit(action_data, gate_results, execution_authority, license_tier) return { - "gate_results": results, + "gate_results": gate_results, + "required_gates": required_gates, + "optional_gates": optional_gates, "passed_gates": passed_gates, "total_gates": total_gates, - "all_passed": passed_gates == total_gates, - "execution_authority": "GRANTED" if passed_gates == total_gates else "DENIED" + "all_required_passed": all_required_passed, + "execution_authority": execution_authority, + "audit_id": audit_entry["audit_id"], + "recommendation": self._generate_recommendation(execution_authority, passed_gates, total_gates) } - -class MockAuditService: - """Simulates enterprise audit trail with differential privacy""" - def __init__(self): - self.audit_log = [] + def _evaluate_single_gate(self, gate_name: str, gate_def: Dict, + action_data: Dict, license_tier: str) -> Dict: + """Evaluate a single gate""" + confidence = action_data.get("confidence", 0.5) + risk_score = action_data.get("risk_score", 0.5) + action = action_data.get("action", "") + + gate_result = { + "gate_name": gate_name, + "description": gate_def["description"], + "weight": gate_def["weight"], + "required": gate_def["required"] + } + + # Gate-specific evaluation logic + if gate_name == "license_validation": + passed = license_tier != "none" + message = f"License valid: {license_tier}" if passed else "No valid license" + + elif gate_name == "confidence_threshold": + threshold = gate_def.get("threshold", 0.7) + passed = confidence >= threshold + message = f"Confidence {confidence:.2f} âĨ {threshold}" if passed else f"Confidence {confidence:.2f} < {threshold}" + + elif gate_name == "risk_assessment": + threshold = gate_def.get("threshold", 0.8) + passed = risk_score <= threshold + message = f"Risk {risk_score:.2f} ⤠{threshold}" if passed else f"Risk {risk_score:.2f} > {threshold}" + + elif gate_name == "rollback_feasibility": + # Check if action has rollback + has_rollback = not any(word in action.lower() for word in ["drop", "delete", "truncate", "rm -rf"]) + passed = has_rollback + message = "Rollback feasible" if has_rollback else "No rollback possible" + + elif gate_name == "admin_approval": + # For demo, always pass if not high risk + passed = risk_score < 0.7 + message = "Admin approval not required" if passed else "Admin approval needed" + + elif gate_name == "compliance_check": + # Check for sensitive data access + sensitive_keywords = ["pci", "credit card", "ssn", "password", "token"] + has_sensitive = any(keyword in action.lower() for keyword in sensitive_keywords) + passed = not has_sensitive or license_tier == "enterprise" + message = "Compliant" if passed else "Requires Enterprise for sensitive data" + + elif gate_name == "budget_check": + # Mock budget check + passed = True + message = "Within budget limits" + + else: + passed = True + message = "Gate evaluation passed" + + gate_result.update({ + "passed": passed, + "message": message, + "details": { + "confidence": confidence, + "risk_score": risk_score, + "action_type": self._categorize_action(action) + } + }) + + return gate_result - def log_execution(self, action: str, result: str, user_context: Dict, license_tier: str): - """Log execution with differential privacy simulation""" - audit_id = str(uuid.uuid4()) - timestamp = datetime.now().isoformat() + def _categorize_action(self, action: str) -> str: + """Categorize the action type""" + action_lower = action.lower() - # Add noise for differential privacy (simulated) - noisy_timestamp = timestamp + if any(word in action_lower for word in ["drop", "delete", "truncate"]): + return "destructive" + elif any(word in action_lower for word in ["deploy", "release", "update"]): + return "deployment" + elif any(word in action_lower for word in ["select", "read", "get", "fetch"]): + return "read" + elif any(word in action_lower for word in ["insert", "update", "modify"]): + return "write" + elif any(word in action_lower for word in ["grant", "revoke", "permission"]): + return "access_control" + else: + return "other" + + def _log_to_audit(self, action_data: Dict, gate_results: Dict, + authority: str, license_tier: str) -> Dict: + """Log to audit trail""" + audit_id = str(uuid.uuid4())[:8] - log_entry = { + entry = { "audit_id": audit_id, - "action": action, - "result": result, - "timestamp": noisy_timestamp, + "timestamp": datetime.now().isoformat(), + "action": action_data.get("action", ""), "license_tier": license_tier, - "user_hash": hashlib.sha256(json.dumps(user_context).encode()).hexdigest()[:16] + "execution_authority": authority, + "gate_summary": { + "passed": sum(1 for r in gate_results.values() if r["passed"]), + "total": len(gate_results) + }, + "risk_score": action_data.get("risk_score", 0), + "confidence": action_data.get("confidence", 0) } - self.audit_log.append(log_entry) - return log_entry + self.audit_log.append(entry) + + # Keep only last 1000 entries + if len(self.audit_log) > 1000: + self.audit_log = self.audit_log[-1000:] + + return entry + + def _generate_recommendation(self, authority: str, passed: int, total: int) -> str: + """Generate recommendation based on gate results""" + if authority == "AUTONOMOUS_EXECUTION": + return f"â Safe for autonomous execution ({passed}/{total} gates passed)" + elif authority == "HUMAN_APPROVAL_REQUIRED": + return f"đ¤ Requires human approval ({passed}/{total} gates passed)" + elif authority == "BLOCKED": + return f"đĢ Blocked by mechanical gates ({passed}/{total} gates passed)" + elif authority == "ADVISORY_ONLY": + return f"âšī¸ Trial license - advisory only ({passed}/{total} gates would pass)" + else: + return f"đĩ OSS mode - advisory only" # Demo Configuration class DemoConfig: + """Configuration for the demo""" + OSS_THEME = { "primary": "#1E88E5", # Blue "secondary": "#64B5F6", "background": "#E3F2FD", - "text": "#1565C0" + "text": "#1565C0", + "border": "#90CAF9" } ENTERPRISE_THEME = { "primary": "#FFB300", # Gold "secondary": "#FFD54F", "background": "#FFF8E1", - "text": "#FF8F00" + "text": "#FF8F00", + "border": "#FFE082" + } + + RISK_COLORS = { + "low": "#4CAF50", # Green + "medium": "#FF9800", # Orange + "high": "#F44336", # Red + "critical": "#D32F2F" # Dark red } - SCENARIOS = [ - "database_drop", - "service_deployment", - "config_change", - "user_permission_grant", - "sensitive_data_access", - "auto_scaling_adjustment", - "emergency_rollback" - ] + @staticmethod + def get_risk_level(score: float) -> Tuple[str, str]: + """Get risk level and color from score""" + if score < 0.3: + return "low", DemoConfig.RISK_COLORS["low"] + elif score < 0.6: + return "medium", DemoConfig.RISK_COLORS["medium"] + elif score < 0.8: + return "high", DemoConfig.RISK_COLORS["high"] + else: + return "critical", DemoConfig.RISK_COLORS["critical"] -# Real ARF OSS Engine Integration -class ARFOSSProcessor: - """Process actions using real ARF 3.3.9 OSS engine""" +# Load demo scenarios +try: + from demo_scenarios import DEMO_SCENARIOS, LICENSE_TIERS, VALUE_PROPOSITIONS + print("â Demo scenarios loaded successfully") +except ImportError: + print("â ī¸ Demo scenarios not found, using built-in scenarios") + + DEMO_SCENARIOS = { + "database_drop": { + "name": "High-Risk Database Operation", + "action": "DROP DATABASE production CASCADE", + "description": "Irreversible deletion of production database", + "context": {"environment": "production", "criticality": "critical"} + }, + "service_deployment": { + "name": "Safe Service Deployment", + "action": "deploy_service v1.2.3 to staging with 25% canary", + "description": "Standard deployment with canary testing", + "context": {"environment": "staging", "canary": 25} + }, + "config_change": { + "name": "Configuration Change", + "action": "UPDATE config SET timeout=30 WHERE service='payment'", + "description": "Update payment service timeout configuration", + "context": {"environment": "production", "service": "payment"} + } + } + +# Main ARF Processor +class ARFProcessor: + """Main processor for ARF operations""" def __init__(self): self.policy_engine = PolicyEngine() self.reliability_engine = ReliabilityEngine() + self.license_manager = MockEnterpriseLicenseManager() + self.execution_service = MockExecutionAuthorityService() + self.processed_actions = [] - # Pre-load some policies for demo - self._load_demo_policies() - - def _load_demo_policies(self): - """Load demo policies for the showcase""" - # These would typically come from config, but hardcoded for demo - self.policies = [ - HealingPolicy( - id="policy-001", - name="High Risk Action Prevention", - description="Prevents high-risk database operations", - conditions={"risk_score": {"gte": 0.8}}, - action=PolicyAction.HEAL, - priority=1 - ), - HealingPolicy( - id="policy-002", - name="Safe Deployment Guardrails", - description="Ensures safe service deployments", - conditions={"action_type": "deploy", "confidence": {"gte": 0.7}}, - action=PolicyAction.ALLOW, - priority=2 - ) - ] + print(f"â ARF Processor initialized with {ARF_VERSION}") - def evaluate_action(self, action_description: str, context: Dict = None) -> Dict: - """Evaluate an action using ARF OSS 3.3.9""" + def process_oss(self, action: str, context: Dict = None) -> Dict: + """Process action through ARF OSS""" start_time = time.time() - # Create a reliability event + # Create reliability event event = ReliabilityEvent( - event_id=str(uuid.uuid4()), + event_id=str(uuid.uuid4())[:8], agent_id="demo_agent", - action=action_description, + action=action, timestamp=datetime.now(), context=context or {} ) - # Get reliability score - reliability_score = self.reliability_engine.calculate_score(event) + # Calculate reliability score + reliability_result = self.reliability_engine.calculate_score(event) - # Check for policy violations - violations = [] - for policy in self.policies: - violation = self.policy_engine.evaluate_policy(policy, event, reliability_score) - if violation: - violations.append(violation) + # Evaluate policies + if hasattr(self.policy_engine, 'evaluate'): + # Mock policy engine + violations = self.policy_engine.evaluate(event, reliability_result['overall']) + else: + # Real ARF policy engine + violations = [] + # Note: Real implementation would call actual policy evaluation # Determine recommendation - risk_level = "High" if reliability_score.overall < 0.3 else "Medium" if reliability_score.overall < 0.7 else "Low" + risk_level, risk_color = DemoConfig.get_risk_level(reliability_result['risk_score']) if violations: - recommendation = "Block or review action" can_execute = False + recommendation = f"â Blocked by {len(violations)} policy violation(s)" else: - if reliability_score.overall >= 0.8: - recommendation = "Safe to execute" + if reliability_result['overall'] >= 0.8: can_execute = True - elif reliability_score.overall >= 0.6: - recommendation = "Review recommended" + recommendation = "â Safe to execute" + elif reliability_result['overall'] >= 0.6: can_execute = False + recommendation = "â ī¸ Review recommended" else: - recommendation = "High risk - do not execute" can_execute = False + recommendation = "đĢ High risk - do not execute" processing_time = time.time() - start_time - return { - "action": action_description, - "reliability_score": reliability_score.overall, - "confidence": reliability_score.confidence, - "risk_score": 1 - reliability_score.overall, + result = { + "action": action, + "reliability_score": reliability_result['overall'], + "confidence": reliability_result['confidence'], + "risk_score": reliability_result['risk_score'], "risk_level": risk_level, + "risk_color": risk_color, "policy_violations": len(violations), - "violation_details": [str(v) for v in violations[:3]], # Limit details + "violation_details": violations[:3] if violations else [], "recommendation": recommendation, - "can_execute_oss": can_execute, - "processing_time": processing_time, - "engine_version": "ARF 3.3.9 OSS", + "can_execute": can_execute, + "processing_time": round(processing_time, 3), + "engine_version": ARF_VERSION, + "mode": "OSS", "limitation": "Advisory only - human must make final decision" } - -# Main Demo Application -class ARFDemo: - """Main demo application for Hugging Face Spaces""" - - def __init__(self): - self.oss_processor = ARFOSSProcessor() - self.license_manager = MockEnterpriseLicenseManager() - self.execution_service = MockExecutionAuthorityService() - self.audit_service = MockAuditService() - self.user_sessions = {} - self.demo_scenarios = self._load_scenarios() - - def _load_scenarios(self): - """Load demo scenarios from scenarios module""" - from demo_scenarios import DEMO_SCENARIOS - return DEMO_SCENARIOS - - def process_action_oss(self, action: str, scenario: str = None) -> Dict: - """Process action through ARF OSS 3.3.9""" - if scenario and scenario in self.demo_scenarios: - action_data = self.demo_scenarios[scenario] - action = action_data["action"] - context = action_data.get("context", {}) - else: - context = {} - return self.oss_processor.evaluate_action(action, context) + # Store for history + self.processed_actions.append({ + "timestamp": datetime.now(), + "result": result, + "mode": "OSS" + }) + + return result - def process_action_enterprise(self, action: str, license_key: str, scenario: str = None) -> Dict: - """Process action through mocked Enterprise system""" - # First, process through OSS to get baseline - oss_result = self.process_action_oss(action, scenario) + def process_enterprise(self, action: str, license_key: str, context: Dict = None) -> Dict: + """Process action through ARF Enterprise""" + # First get OSS result + oss_result = self.process_oss(action, context) - # Then apply enterprise gates - license_validation = self.license_manager.validate_license( - license_key, - action, - oss_result["risk_score"] - ) + # Validate license + license_info = self.license_manager.validate_license(license_key) # Prepare action data for gate evaluation action_data = { "action": action, "confidence": oss_result["confidence"], "risk_score": oss_result["risk_score"], - "reliability": oss_result["reliability_score"] + "reliability": oss_result["reliability_score"], + "context": context or {} } # Evaluate mechanical gates - gate_evaluation = self.execution_service.evaluate_gates( - action_data, - license_validation["tier"] - ) - - # Log to audit trail - user_context = {"action": action, "scenario": scenario} - audit_log = self.audit_service.log_execution( - action, - gate_evaluation["execution_authority"], - user_context, - license_validation["tier"] - ) + gate_evaluation = self.execution_service.evaluate_gates(action_data, license_info) # Combine results - return { + enterprise_result = { **oss_result, - "license_tier": license_validation["tier"], - "enforcement_level": license_validation["enforcement_level"], - "gate_results": gate_evaluation["gate_results"], - "gates_passed": gate_evaluation["passed_gates"], - "total_gates": gate_evaluation["total_gates"], - "execution_authority": gate_evaluation["execution_authority"], - "can_execute_enterprise": gate_evaluation["all_passed"], - "audit_id": audit_log["audit_id"], - "engine_version": f"ARF 3.3.9 Enterprise ({license_validation['tier'].title()})", - "benefit": "Mechanical enforcement - automated decision making" + "license_info": license_info, + "gate_evaluation": gate_evaluation, + "mode": "Enterprise", + "engine_version": f"{ARF_VERSION} Enterprise", + "benefit": "Mechanical enforcement with automated decision making" } + + # Store for history + self.processed_actions.append({ + "timestamp": datetime.now(), + "result": enterprise_result, + "mode": "Enterprise", + "license_tier": license_info.get("tier", "none") + }) + + return enterprise_result - def generate_trial_license(self, email: str) -> Dict: - """Generate a trial license for demo purposes""" - trial_key = f"ARF-TRIAL-{hashlib.sha256(email.encode()).hexdigest()[:8].upper()}" + def get_stats(self) -> Dict: + """Get processing statistics""" + total = len(self.processed_actions) + oss_count = sum(1 for a in self.processed_actions if a["mode"] == "OSS") + enterprise_count = total - oss_count - # Store in mock license manager - self.license_manager.active_licenses[trial_key] = { - "tier": "trial", - "email": email, - "expires": datetime.now() + timedelta(days=14), - "created": datetime.now() - } + if total == 0: + return {"total": 0} - # Store user session - session_id = str(uuid.uuid4()) - self.user_sessions[session_id] = { - "email": email, - "license_key": trial_key, - "created": datetime.now() - } + # Calculate average scores + avg_reliability = np.mean([a["result"]["reliability_score"] for a in self.processed_actions]) + avg_risk = np.mean([a["result"]["risk_score"] for a in self.processed_actions]) - return { - "success": True, - "license_key": trial_key, - "expires": (datetime.now() + timedelta(days=14)).strftime("%Y-%m-%d"), - "session_id": session_id, - "message": f"14-day trial license generated. Tier: Trial (Advisory mode)" - } - - def upgrade_simulation(self, current_tier: str, target_tier: str, email: str) -> Dict: - """Simulate upgrade process""" - price_map = { - "trial": 0, - "starter": 2000, - "professional": 5000, - "enterprise": 15000 - } + # Count executions + executed = sum(1 for a in self.processed_actions if a["result"].get("can_execute", False)) return { - "current_tier": current_tier, - "target_tier": target_tier, - "monthly_price": price_map[target_tier], - "enforcement_improvement": self._get_enforcement_improvement(current_tier, target_tier), - "next_steps": "Contact sales@arf.dev for upgrade", - "estimated_setup": "24 hours" - } - - def _get_enforcement_improvement(self, current: str, target: str) -> str: - improvements = { - ("trial", "starter"): "Human-in-the-loop approval", - ("trial", "professional"): "Autonomous execution for low-risk actions", - ("trial", "enterprise"): "Full mechanical enforcement with audit", - ("starter", "professional"): "Remove human bottleneck", - ("starter", "enterprise"): "Add compliance automation", - ("professional", "enterprise"): "Enterprise-scale enforcement" + "total_processed": total, + "oss_actions": oss_count, + "enterprise_actions": enterprise_count, + "avg_reliability": round(avg_reliability, 3), + "avg_risk": round(avg_risk, 3), + "execution_rate": round(executed / total * 100, 1) if total > 0 else 0 } - return improvements.get((current, target), "Enhanced enforcement capabilities") + +# Create global processor instance +arf_processor = ARFProcessor() # Gradio Interface Components -def create_oss_panel(oss_result: Dict) -> gr.Blocks: - """Create OSS results panel""" - with gr.Column(variant="panel"): - gr.Markdown("### đĩ ARF 3.3.9 OSS (Open Source)") - - # Score indicators - with gr.Row(): - gr.Metric(label="Reliability Score", value=f"{oss_result['reliability_score']:.2%}") - gr.Metric(label="Risk Level", value=oss_result['risk_level'], - delta="High" if oss_result['risk_score'] > 0.7 else None) - gr.Metric(label="Confidence", value=f"{oss_result['confidence']:.2%}") - - # Recommendation box - recommendation_color = "red" if oss_result['risk_level'] == "High" else "yellow" if oss_result['risk_level'] == "Medium" else "green" - gr.HTML(f""" -
{oss_result['recommendation']}
-Processing time: {oss_result['processing_time']:.3f}s
+def create_oss_panel_html(result: Dict) -> str: + """Generate HTML for OSS results panel""" + risk_color = result.get("risk_color", "#666666") + + # Policy violations display + violations_html = "" + if result.get("policy_violations", 0) > 0: + violations = result.get("violation_details", []) + violations_html = """ +- {'đ Autonomous execution permitted' if authority == 'GRANTED' else 'â Mechanical enforcement blocked'} -
+ Get mechanical enforcement, audit trails, and autonomous execution +
++ Experience the difference between open-source advisory recommendations + and enterprise mechanical enforcement with automated gate validation. +
++ ARF 3.3.9 Demo | + Website | + GitHub | + Contact Sales +
+
+ This demo uses real ARF 3.3.9 OSS code with simulated Enterprise features for demonstration purposes.
+ Actual Enterprise features may vary. Mechanical gates shown are simulated for demo purposes.
+
{str(e)}
+
+{traceback.format_exc()}
+
+ {result['recommendation']}
-Processing time: {result['processing_time']:.3f}s
-Advisory Only: {result['limitation']}
-Policy Violations: {result['policy_violations']} detected
-Execution Authority: {'â Can execute' if result['can_execute_oss'] else 'â Cannot execute - human decision required'}
-No gate data available
"} -Mechanical Enforcement: {result.get('benefit', 'Automated decision making')}
-Audit Trail: {result.get('audit_id', 'Available with license')}
-Decision Speed: {result.get('processing_time', 0):.3f}s (same as OSS)
-