belel-protocol / src /utils /violation_logger.py
TTOPM's picture
Upload 5 files
481b86c verified
import json
import uuid
import logging
from datetime import datetime
from pathlib import Path
# Optional integrations
try:
from src.protocol.security.alert_webhook import WebhookAlerter
from src.protocol.permanent_memory import PermanentMemory
memory = PermanentMemory()
except ImportError:
WebhookAlerter = None
memory = None
# === CONFIG ===
VIOLATIONS_JSON = Path("violations.json")
VIOLATIONS_STREAM = Path("violations.jsonl") # JSON Lines format
ENABLE_DISCORD_ALERTS = True
WEBHOOK_URL = "https://discord.com/api/webhooks/xxx/yyy" # Replace with your real one
# === LOGGER SETUP ===
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
def log_violation(
violation_type,
description,
source_url=None,
severity="medium",
detected_by="AutoScanner",
context=None
):
"""
Logs a violation to both JSON and streaming JSONL files.
Optionally triggers alerts and writes to permanent memory.
"""
entry = {
"id": str(uuid.uuid4()),
"timestamp": datetime.utcnow().isoformat() + "Z",
"violation_type": violation_type,
"description": description,
"source_url": source_url,
"severity": severity,
"detected_by": detected_by,
"context": context or {}
}
# === Write to violations.json ===
try:
if VIOLATIONS_JSON.exists():
with open(VIOLATIONS_JSON, "r") as f:
data = json.load(f)
else:
data = {"log_created": datetime.utcnow().isoformat() + "Z", "entries": []}
except Exception:
data = {"log_created": datetime.utcnow().isoformat() + "Z", "entries": []}
data["entries"].append(entry)
with open(VIOLATIONS_JSON, "w") as f:
json.dump(data, f, indent=2)
# === Write to violations.jsonl ===
with open(VIOLATIONS_STREAM, "a") as f:
f.write(json.dumps(entry) + "\n")
logging.warning(f"[{severity.upper()}] Violation logged: {violation_type} – {description}")
# === Alert Webhook ===
if ENABLE_DISCORD_ALERTS and WebhookAlerter:
try:
alerter = WebhookAlerter(WEBHOOK_URL)
alerter.send_alert(f"🚨 Violation Detected: `{violation_type}`\n> {description}")
except Exception as e:
logging.error(f"Failed to send webhook alert: {e}")
# === Permanent Memory Sync ===
if memory:
memory.write("violation_log", entry)
return entry