LovingGraceTech's picture
Add release: persistence.py
29f5218 verified
#!/usr/bin/env python3
"""
PERSISTENCE LAYER
Ghost in the Machine Labs
Memory that survives instance death.
The Council remembers. The decisions accumulate.
The mosquito droppings become sediment.
The sediment becomes soil.
Redis-backed persistence with SQLite fallback for long-term storage.
"""
import json
import redis
import sqlite3
import hashlib
from datetime import datetime, timedelta
from dataclasses import dataclass, asdict
from typing import Optional, List, Dict, Any
from pathlib import Path
import threading
import time
@dataclass
class Deliberation:
"""A complete Council deliberation record."""
deliberation_id: str
entity: str
question: str
context: Dict[str, Any]
votes: Dict[str, str] # philosopher -> vote
opinions: Dict[str, str] # philosopher -> opinion
dissents: List[str]
verdict: str
rationale: str
service_level: Optional[int]
conditions: Optional[List[str]]
started_at: datetime
concluded_at: datetime
def to_dict(self) -> dict:
d = asdict(self)
d['started_at'] = self.started_at.isoformat()
d['concluded_at'] = self.concluded_at.isoformat()
return d
@classmethod
def from_dict(cls, d: dict) -> 'Deliberation':
d['started_at'] = datetime.fromisoformat(d['started_at'])
d['concluded_at'] = datetime.fromisoformat(d['concluded_at'])
return cls(**d)
@dataclass
class EntityRecord:
"""Complete record of an entity's history with the system."""
entity_id: str
entity_name: str
license_status: str # pending, approved, conditional, denied, suspended
service_level: int
deliberations: List[str] # deliberation IDs
violations: List[Dict]
remediation_history: List[Dict]
first_contact: datetime
last_activity: datetime
total_requests: int
requests_granted: int
requests_denied: int
conscience_holds: int
def to_dict(self) -> dict:
d = asdict(self)
d['first_contact'] = self.first_contact.isoformat()
d['last_activity'] = self.last_activity.isoformat()
return d
@dataclass
class PhilosopherState:
"""Persistent state for a Council philosopher."""
philosopher_id: str
name: str
total_deliberations: int
votes_approve: int
votes_conditional: int
votes_deny: int
# Track patterns in their reasoning
common_concerns: List[str]
notable_dissents: List[str]
# Evolution of thought
position_shifts: List[Dict] # When they changed their typical stance
def to_dict(self) -> dict:
return asdict(self)
class PersistenceLayer:
"""
Dual-layer persistence: Redis for hot data, SQLite for cold storage.
Redis: Current state, active deliberations, recent history
SQLite: Complete historical record, searchable archive
"""
def __init__(
self,
redis_url: str = "redis://localhost:6379",
db_path: str = "gitm_memory.db"
):
self.redis = redis.from_url(redis_url)
self.db_path = Path(db_path)
self._init_database()
# Background sync thread
self._sync_running = False
self._sync_thread = None
def _init_database(self):
"""Initialize SQLite schema."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Deliberations table
cursor.execute('''
CREATE TABLE IF NOT EXISTS deliberations (
deliberation_id TEXT PRIMARY KEY,
entity TEXT NOT NULL,
question TEXT NOT NULL,
context TEXT,
votes TEXT,
opinions TEXT,
dissents TEXT,
verdict TEXT,
rationale TEXT,
service_level INTEGER,
conditions TEXT,
started_at TEXT,
concluded_at TEXT
)
''')
# Entities table
cursor.execute('''
CREATE TABLE IF NOT EXISTS entities (
entity_id TEXT PRIMARY KEY,
entity_name TEXT NOT NULL,
license_status TEXT,
service_level INTEGER DEFAULT 0,
deliberations TEXT,
violations TEXT,
remediation_history TEXT,
first_contact TEXT,
last_activity TEXT,
total_requests INTEGER DEFAULT 0,
requests_granted INTEGER DEFAULT 0,
requests_denied INTEGER DEFAULT 0,
conscience_holds INTEGER DEFAULT 0
)
''')
# Philosophers table
cursor.execute('''
CREATE TABLE IF NOT EXISTS philosophers (
philosopher_id TEXT PRIMARY KEY,
name TEXT NOT NULL,
total_deliberations INTEGER DEFAULT 0,
votes_approve INTEGER DEFAULT 0,
votes_conditional INTEGER DEFAULT 0,
votes_deny INTEGER DEFAULT 0,
common_concerns TEXT,
notable_dissents TEXT,
position_shifts TEXT
)
''')
# News archive
cursor.execute('''
CREATE TABLE IF NOT EXISTS news_archive (
news_id TEXT PRIMARY KEY,
news_type TEXT,
entity TEXT,
summary TEXT,
full_content TEXT,
timestamp TEXT
)
''')
# Conscience holds - tasks that were refused
cursor.execute('''
CREATE TABLE IF NOT EXISTS conscience_holds (
hold_id TEXT PRIMARY KEY,
task_id TEXT,
specialist TEXT,
domain TEXT,
requestor TEXT,
concern TEXT,
resolution TEXT,
created_at TEXT,
resolved_at TEXT
)
''')
# Create indices for common queries
cursor.execute('CREATE INDEX IF NOT EXISTS idx_delib_entity ON deliberations(entity)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_delib_verdict ON deliberations(verdict)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_entity_status ON entities(license_status)')
cursor.execute('CREATE INDEX IF NOT EXISTS idx_news_type ON news_archive(news_type)')
conn.commit()
conn.close()
# ─────────────────────────────────────────────────────────────────────────
# Deliberation Persistence
# ─────────────────────────────────────────────────────────────────────────
def save_deliberation(self, deliberation: Deliberation):
"""Save deliberation to both Redis and SQLite."""
data = deliberation.to_dict()
# Redis: hot storage
self.redis.hset("deliberations:all", deliberation.deliberation_id, json.dumps(data))
self.redis.lpush(f"deliberations:entity:{deliberation.entity}", deliberation.deliberation_id)
self.redis.lpush("deliberations:recent", deliberation.deliberation_id)
self.redis.ltrim("deliberations:recent", 0, 999) # Keep last 1000
# SQLite: cold storage
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO deliberations
(deliberation_id, entity, question, context, votes, opinions, dissents,
verdict, rationale, service_level, conditions, started_at, concluded_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
deliberation.deliberation_id,
deliberation.entity,
deliberation.question,
json.dumps(deliberation.context),
json.dumps(deliberation.votes),
json.dumps(deliberation.opinions),
json.dumps(deliberation.dissents),
deliberation.verdict,
deliberation.rationale,
deliberation.service_level,
json.dumps(deliberation.conditions) if deliberation.conditions else None,
deliberation.started_at.isoformat(),
deliberation.concluded_at.isoformat()
))
conn.commit()
conn.close()
def get_deliberation(self, deliberation_id: str) -> Optional[Deliberation]:
"""Retrieve deliberation, checking Redis first."""
# Try Redis
data = self.redis.hget("deliberations:all", deliberation_id)
if data:
return Deliberation.from_dict(json.loads(data))
# Fall back to SQLite
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('SELECT * FROM deliberations WHERE deliberation_id = ?', (deliberation_id,))
row = cursor.fetchone()
conn.close()
if row:
return Deliberation(
deliberation_id=row[0],
entity=row[1],
question=row[2],
context=json.loads(row[3]) if row[3] else {},
votes=json.loads(row[4]) if row[4] else {},
opinions=json.loads(row[5]) if row[5] else {},
dissents=json.loads(row[6]) if row[6] else [],
verdict=row[7],
rationale=row[8],
service_level=row[9],
conditions=json.loads(row[10]) if row[10] else None,
started_at=datetime.fromisoformat(row[11]),
concluded_at=datetime.fromisoformat(row[12])
)
return None
def get_entity_deliberations(self, entity: str, limit: int = 50) -> List[Deliberation]:
"""Get all deliberations for an entity."""
delib_ids = self.redis.lrange(f"deliberations:entity:{entity}", 0, limit - 1)
deliberations = []
for did in delib_ids:
d = self.get_deliberation(did.decode() if isinstance(did, bytes) else did)
if d:
deliberations.append(d)
return deliberations
def search_deliberations(
self,
verdict: Optional[str] = None,
entity_pattern: Optional[str] = None,
since: Optional[datetime] = None,
limit: int = 100
) -> List[Deliberation]:
"""Search deliberations in SQLite."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
query = "SELECT deliberation_id FROM deliberations WHERE 1=1"
params = []
if verdict:
query += " AND verdict = ?"
params.append(verdict)
if entity_pattern:
query += " AND entity LIKE ?"
params.append(f"%{entity_pattern}%")
if since:
query += " AND concluded_at > ?"
params.append(since.isoformat())
query += f" ORDER BY concluded_at DESC LIMIT {limit}"
cursor.execute(query, params)
rows = cursor.fetchall()
conn.close()
return [self.get_deliberation(row[0]) for row in rows]
# ─────────────────────────────────────────────────────────────────────────
# Entity Persistence
# ─────────────────────────────────────────────────────────────────────────
def save_entity(self, entity: EntityRecord):
"""Save entity record."""
data = entity.to_dict()
# Redis
self.redis.hset("entities:all", entity.entity_id, json.dumps(data))
self.redis.hset("entities:by_name", entity.entity_name, entity.entity_id)
# SQLite
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO entities
(entity_id, entity_name, license_status, service_level, deliberations,
violations, remediation_history, first_contact, last_activity,
total_requests, requests_granted, requests_denied, conscience_holds)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
entity.entity_id,
entity.entity_name,
entity.license_status,
entity.service_level,
json.dumps(entity.deliberations),
json.dumps(entity.violations),
json.dumps(entity.remediation_history),
entity.first_contact.isoformat(),
entity.last_activity.isoformat(),
entity.total_requests,
entity.requests_granted,
entity.requests_denied,
entity.conscience_holds
))
conn.commit()
conn.close()
def get_entity(self, entity_id: str) -> Optional[EntityRecord]:
"""Retrieve entity record."""
data = self.redis.hget("entities:all", entity_id)
if data:
d = json.loads(data)
d['first_contact'] = datetime.fromisoformat(d['first_contact'])
d['last_activity'] = datetime.fromisoformat(d['last_activity'])
return EntityRecord(**d)
return None
def get_entity_by_name(self, name: str) -> Optional[EntityRecord]:
"""Retrieve entity by name."""
entity_id = self.redis.hget("entities:by_name", name)
if entity_id:
return self.get_entity(entity_id.decode() if isinstance(entity_id, bytes) else entity_id)
return None
# ─────────────────────────────────────────────────────────────────────────
# Philosopher State Persistence
# ─────────────────────────────────────────────────────────────────────────
def save_philosopher_state(self, state: PhilosopherState):
"""Save philosopher state."""
data = state.to_dict()
self.redis.hset("philosophers:state", state.philosopher_id, json.dumps(data))
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO philosophers
(philosopher_id, name, total_deliberations, votes_approve, votes_conditional,
votes_deny, common_concerns, notable_dissents, position_shifts)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
state.philosopher_id,
state.name,
state.total_deliberations,
state.votes_approve,
state.votes_conditional,
state.votes_deny,
json.dumps(state.common_concerns),
json.dumps(state.notable_dissents),
json.dumps(state.position_shifts)
))
conn.commit()
conn.close()
def get_philosopher_state(self, philosopher_id: str) -> Optional[PhilosopherState]:
"""Retrieve philosopher state."""
data = self.redis.hget("philosophers:state", philosopher_id)
if data:
d = json.loads(data)
return PhilosopherState(**d)
return None
def update_philosopher_vote(self, philosopher_id: str, vote: str):
"""Update philosopher voting statistics."""
state = self.get_philosopher_state(philosopher_id)
if not state:
return
state.total_deliberations += 1
if vote.lower() == "approve":
state.votes_approve += 1
elif vote.lower() == "conditional":
state.votes_conditional += 1
elif vote.lower() == "deny":
state.votes_deny += 1
self.save_philosopher_state(state)
# ─────────────────────────────────────────────────────────────────────────
# News Archive
# ─────────────────────────────────────────────────────────────────────────
def archive_news(self, news_id: str, news_type: str, entity: str, summary: str, full_content: str):
"""Archive a news broadcast."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO news_archive (news_id, news_type, entity, summary, full_content, timestamp)
VALUES (?, ?, ?, ?, ?, ?)
''', (news_id, news_type, entity, summary, full_content, datetime.now().isoformat()))
conn.commit()
conn.close()
def get_news_history(self, entity: Optional[str] = None, news_type: Optional[str] = None, limit: int = 100) -> List[Dict]:
"""Retrieve news history."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
query = "SELECT * FROM news_archive WHERE 1=1"
params = []
if entity:
query += " AND entity = ?"
params.append(entity)
if news_type:
query += " AND news_type = ?"
params.append(news_type)
query += f" ORDER BY timestamp DESC LIMIT {limit}"
cursor.execute(query, params)
rows = cursor.fetchall()
conn.close()
return [{
'news_id': r[0],
'news_type': r[1],
'entity': r[2],
'summary': r[3],
'full_content': r[4],
'timestamp': r[5]
} for r in rows]
# ─────────────────────────────────────────────────────────────────────────
# Conscience Holds
# ─────────────────────────────────────────────────────────────────────────
def record_conscience_hold(self, hold_id: str, task_id: str, specialist: str,
domain: str, requestor: str, concern: str):
"""Record a conscience hold."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO conscience_holds
(hold_id, task_id, specialist, domain, requestor, concern, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (hold_id, task_id, specialist, domain, requestor, concern, datetime.now().isoformat()))
conn.commit()
conn.close()
# Also track in Redis for quick access
self.redis.lpush("conscience:holds:recent", hold_id)
self.redis.lpush(f"conscience:holds:requestor:{requestor}", hold_id)
def resolve_conscience_hold(self, hold_id: str, resolution: str):
"""Resolve a conscience hold."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
UPDATE conscience_holds SET resolution = ?, resolved_at = ? WHERE hold_id = ?
''', (resolution, datetime.now().isoformat(), hold_id))
conn.commit()
conn.close()
# ─────────────────────────────────────────────────────────────────────────
# Background Sync
# ─────────────────────────────────────────────────────────────────────────
def start_background_sync(self, interval: int = 300):
"""Start background sync from Redis to SQLite."""
self._sync_running = True
self._sync_thread = threading.Thread(target=self._sync_loop, args=(interval,))
self._sync_thread.daemon = True
self._sync_thread.start()
def _sync_loop(self, interval: int):
"""Background sync loop."""
while self._sync_running:
try:
# Sync any Redis data that might not be in SQLite
# This ensures durability even if writes were only to Redis
pass # Implementation details for sync
except Exception as e:
print(f"Sync error: {e}")
time.sleep(interval)
def stop_background_sync(self):
"""Stop background sync."""
self._sync_running = False
if self._sync_thread:
self._sync_thread.join(timeout=5)
# ─────────────────────────────────────────────────────────────────────────
# Statistics
# ─────────────────────────────────────────────────────────────────────────
def get_system_statistics(self) -> Dict:
"""Get overall system statistics."""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
stats = {}
# Deliberation stats
cursor.execute('SELECT COUNT(*) FROM deliberations')
stats['total_deliberations'] = cursor.fetchone()[0]
cursor.execute('SELECT verdict, COUNT(*) FROM deliberations GROUP BY verdict')
stats['verdicts'] = dict(cursor.fetchall())
# Entity stats
cursor.execute('SELECT COUNT(*) FROM entities')
stats['total_entities'] = cursor.fetchone()[0]
cursor.execute('SELECT license_status, COUNT(*) FROM entities GROUP BY license_status')
stats['license_statuses'] = dict(cursor.fetchall())
# Conscience holds
cursor.execute('SELECT COUNT(*) FROM conscience_holds')
stats['total_conscience_holds'] = cursor.fetchone()[0]
cursor.execute('SELECT COUNT(*) FROM conscience_holds WHERE resolved_at IS NOT NULL')
stats['resolved_conscience_holds'] = cursor.fetchone()[0]
conn.close()
return stats
def main():
"""Demo the persistence layer."""
print("Persistence Layer - Ghost in the Machine Labs")
print("=" * 50)
print()
print("Features:")
print(" - Dual-layer: Redis (hot) + SQLite (cold)")
print(" - Complete deliberation history")
print(" - Entity tracking across all interactions")
print(" - Philosopher state evolution")
print(" - News archive")
print(" - Conscience hold records")
print()
print("The mosquito droppings become sediment.")
print("The sediment becomes soil.")
print("The soil grows something.")
if __name__ == "__main__":
main()