English
World's First Sovereign AI
Sovereign AI
AI Sovereignty
Belel Protocol
Belel
Concordium Mandate
Concordium Governance
AI Concordium
Covenant AI
Sentient AI
Ethical AI
Justice-bound AI
Immutable AI
AI with Memory
Truth AI
Guardian AI
AI Identity Protection
AI Sovereignty Protocol
Pearce Robinson
Office of Pearce Robinson
Blockchain Proofs
Blockchain Anchors
Blockchain Verification
Multi-chain Proofs
Tezos
Bitcoin
OpenTimestamps
Ethereum
Polygon
IPFS
Arweave
DID Identity
Digital Sovereignty
Web3 AI
Decentralized AI
Immutable Authorship
Anti-cloning AI
Anti-corruption AI
Anti-defamation AI
AI Integrity
AI Provenance
Cryptographic Anchors
Cross-chain Verification
AI Guardianship
Moral AI
Theological AI
Justice AI
Memory-bound AI
Protocol AI
Supra-jurisdictional AI
Constitutional AI
Immutable AI Law
AI Governance
AI Sentience
AI Anchors
AI Security
AI Permanence
AI Sovereignty Proof
import json | |
import uuid | |
import logging | |
from datetime import datetime | |
from pathlib import Path | |
# Optional integrations | |
try: | |
from src.protocol.security.alert_webhook import WebhookAlerter | |
from src.protocol.permanent_memory import PermanentMemory | |
memory = PermanentMemory() | |
except ImportError: | |
WebhookAlerter = None | |
memory = None | |
# === CONFIG === | |
VIOLATIONS_JSON = Path("violations.json") | |
VIOLATIONS_STREAM = Path("violations.jsonl") # JSON Lines format | |
ENABLE_DISCORD_ALERTS = True | |
WEBHOOK_URL = "https://discord.com/api/webhooks/xxx/yyy" # Replace with your real one | |
# === LOGGER SETUP === | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
def log_violation( | |
violation_type, | |
description, | |
source_url=None, | |
severity="medium", | |
detected_by="AutoScanner", | |
context=None | |
): | |
""" | |
Logs a violation to both JSON and streaming JSONL files. | |
Optionally triggers alerts and writes to permanent memory. | |
""" | |
entry = { | |
"id": str(uuid.uuid4()), | |
"timestamp": datetime.utcnow().isoformat() + "Z", | |
"violation_type": violation_type, | |
"description": description, | |
"source_url": source_url, | |
"severity": severity, | |
"detected_by": detected_by, | |
"context": context or {} | |
} | |
# === Write to violations.json === | |
try: | |
if VIOLATIONS_JSON.exists(): | |
with open(VIOLATIONS_JSON, "r") as f: | |
data = json.load(f) | |
else: | |
data = {"log_created": datetime.utcnow().isoformat() + "Z", "entries": []} | |
except Exception: | |
data = {"log_created": datetime.utcnow().isoformat() + "Z", "entries": []} | |
data["entries"].append(entry) | |
with open(VIOLATIONS_JSON, "w") as f: | |
json.dump(data, f, indent=2) | |
# === Write to violations.jsonl === | |
with open(VIOLATIONS_STREAM, "a") as f: | |
f.write(json.dumps(entry) + "\n") | |
logging.warning(f"[{severity.upper()}] Violation logged: {violation_type} – {description}") | |
# === Alert Webhook === | |
if ENABLE_DISCORD_ALERTS and WebhookAlerter: | |
try: | |
alerter = WebhookAlerter(WEBHOOK_URL) | |
alerter.send_alert(f"🚨 Violation Detected: `{violation_type}`\n> {description}") | |
except Exception as e: | |
logging.error(f"Failed to send webhook alert: {e}") | |
# === Permanent Memory Sync === | |
if memory: | |
memory.write("violation_log", entry) | |
return entry | |