- -- coding: utf-8 --
- Optional acceleration
- -------------------------
- Config
- -------------------------
- -------------------------
- Utilities & logging
- -------------------------
- -------------------------
- Ledger & comp log
- -------------------------
- -------------------------
- Frequency store
- -------------------------
- -------------------------
- Data models
- -------------------------
- -------------------------
- Registry & SeedIndex
- -------------------------
- -------------------------
- Vector helpers
- -------------------------
- -------------------------
- Quantization helpers
- -------------------------
- -------------------------
- Compressor with both strategies
- -------------------------
- -------------------------
- LazyExpander (unchanged)
- -------------------------
- -------------------------
- Self-bootstrap: ingest shards or inject samples
- -------------------------
- -------------------------
- Backup & rollback helpers
- -------------------------
- -------------------------
- Quarantine retry & sublimation (embedded)
- -------------------------
- -------------------------
- Autonomous pipeline driver & watcher
- -------------------------
- -------------------------
- Optional minimal HTTP status server
- -------------------------
- -------------------------
- Entrypoint
- -------------------------
#!/usr/bin/env python3
-- coding: utf-8 --
""" qem_autonomous_final_embedded.py Ultimate improved single-file — QEM Autonomous Final (with quarantine sublimation retry embedded)
- Based on qem_autonomous_final, embeds retry_quarantine_and_sublimate
- Automatically retries top-K pairs from quarantine after a single run (configurable)
- Supports --debug-run-once, --relax-sim, --force-cluster, --cluster-eps
- Writes detailed debug logs to qem_cloud_data/debug_log.txt
Notes:
- Place this file in the project root and run: python qem_autonomous_final_embedded.py [--debug-run-once]
- To force cluster merges: --force-cluster --cluster-eps 0.25
- Environment variables can override some parameters (see Config) """ from future import annotations import os import sys import time import json import uuid import math import random import threading import traceback import shutil import http.server import socketserver import argparse from dataclasses import dataclass, field from typing import Any, Dict, List, Optional, Tuple
Optional acceleration
try: import numpy as np except Exception: np = None
-------------------------
Config
-------------------------
@dataclass class Config: base_dir: str = os.path.abspath(".") data_dir: str = os.path.join(base_dir, "qem_cloud_data") shard_dir: str = field(init=False) ent_path: str = field(init=False) seed_path: str = field(init=False) ledger_path: str = field(init=False) comp_log: str = field(init=False) backup_dir: str = field(init=False) debug_log: str = field(init=False) quarantine_path: str = field(init=False) auto_disable_file: str = field(init=False) shutdown_signal: str = field(init=False)
dim: int = int(os.environ.get("QEM_DIM", 64))
default_sim: float = float(os.environ.get("QEM_SIM", 0.70))
default_iters: int = int(os.environ.get("QEM_ITERS", 4))
quant_bits: int = int(os.environ.get("QEM_QUANT_BITS", 8))
min_group: int = int(os.environ.get("QEM_MIN_GROUP", 2))
freq_alpha: float = float(os.environ.get("QEM_FREQ_ALPHA", 1.0))
freq_beta: float = float(os.environ.get("QEM_FREQ_BETA", 1.0))
pair_sim_factor: float = float(os.environ.get("QEM_PAIR_SIM_FACTOR", 1.0))
status_port: int = int(os.environ.get("QEM_STATUS_PORT", 0))
max_auto_inject: int = int(os.environ.get("QEM_MAX_AUTO_INJECT", 128))
poll_interval: float = float(os.environ.get("QEM_POLL_INTERVAL", 5.0))
idle_run_seconds: int = int(os.environ.get("QEM_IDLE_RUN_SECONDS", 120))
log_prefix: str = "[QEM-Auto-Final-Embedded]"
sim_min: float = float(os.environ.get("QEM_SIM_MIN", 0.35))
quarantine_retry_limit: int = int(os.environ.get("QEM_QUARANTINE_RETRY", 3))
max_backup_keep: int = int(os.environ.get("QEM_MAX_BACKUP_KEEP", 10))
def __post_init__(self):
self.shard_dir = os.path.join(self.data_dir, "shards")
self.ent_path = os.path.join(self.data_dir, "ents.json")
self.seed_path = os.path.join(self.data_dir, "seeds.json")
self.ledger_path = os.path.join(self.data_dir, "ledger.json")
self.comp_log = os.path.join(self.data_dir, "compression_log.json")
self.backup_dir = os.path.join(self.data_dir, "backups")
self.debug_log = os.path.join(self.data_dir, "debug_log.txt")
self.quarantine_path = os.path.join(self.data_dir, "quarantine.json")
self.auto_disable_file = os.path.join(self.data_dir, "auto_run.disable")
self.shutdown_signal = os.path.join(self.data_dir, "shutdown.signal")
os.makedirs(self.data_dir, exist_ok=True)
os.makedirs(self.shard_dir, exist_ok=True)
os.makedirs(self.backup_dir, exist_ok=True)
cfg = Config()
-------------------------
Utilities & logging
-------------------------
def now_ts() -> str: return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
def uid(prefix: str = "") -> str: return prefix + str(uuid.uuid4())[:12]
def safe_write_json(path: str, obj: Any): tmp = path + ".tmp" try: with open(tmp, "w", encoding="utf-8") as f: json.dump(obj, f, ensure_ascii=False, indent=2) os.replace(tmp, path) except Exception: try: with open(path, "w", encoding="utf-8") as f: json.dump(obj, f, ensure_ascii=False, indent=2) except Exception: pass
def _append_debug(s: str): try: with open(cfg.debug_log, "a", encoding="utf-8") as f: f.write(f"{now_ts()} {s}\n") except Exception: pass
def log(msg: str): line = f"{cfg.log_prefix} [{now_ts()}] {msg}" print(line, flush=True) _append_debug(msg)
def log_exc(prefix: str = "EXC"): tb = traceback.format_exc() log(f"{prefix}: {tb}")
-------------------------
Ledger & comp log
-------------------------
class Ledger: _lock = threading.RLock() chain: List[Dict[str, Any]] = []
@classmethod
def record(cls, op: str, obj_id: str, info: Dict[str, Any]):
with cls._lock:
prev = cls.chain[-1]['hash'] if cls.chain else ''
entry = {"ts": now_ts(), "op": op, "id": obj_id, "info": info, "prev": prev}
try:
s = json.dumps(entry, sort_keys=True, ensure_ascii=False)
import hashlib
entry['hash'] = hashlib.sha256(s.encode('utf-8')).hexdigest()
except Exception:
entry['hash'] = uid("h-")
cls.chain.append(entry)
@classmethod
def dump(cls):
with cls._lock:
try:
safe_write_json(cfg.ledger_path, cls.chain)
log(f"Ledger dumped to {cfg.ledger_path}")
except Exception:
log_exc("Ledger.dump failed")
def append_comp_log(rec: Dict[str, Any]): arr = [] try: if os.path.exists(cfg.comp_log): with open(cfg.comp_log, "r", encoding="utf-8") as f: arr = json.load(f) except Exception: arr = [] arr.append(rec) safe_write_json(cfg.comp_log, arr)
-------------------------
Frequency store
-------------------------
class FrequencyStore: def init(self, path=os.path.join(cfg.data_dir, "freq.json")): self.path = path self.lock = threading.RLock() self.counts: Dict[str,int] = {} self._load()
def _load(self):
if os.path.exists(self.path):
try:
with open(self.path, "r", encoding="utf-8") as f:
self.counts = json.load(f)
except Exception:
self.counts = {}
def inc(self, k:str, d:int=1):
with self.lock:
self.counts[k] = self.counts.get(k,0) + d
if self.counts[k] % 50 == 0:
safe_write_json(self.path, self.counts)
def get(self,k:str)->int:
with self.lock:
return self.counts.get(k,0)
def snapshot(self):
with self.lock:
return dict(self.counts)
freq_store = FrequencyStore()
-------------------------
Data models
-------------------------
@dataclass class EntNode: id: str vec: Optional[List[float]] shards: List[str] score: float = 1.0 ts: float = field(default_factory=time.time)
def to_dict(self):
return {"id":self.id,"vec":self.vec,"shards":self.shards,"score":self.score,"ts":self.ts}
@staticmethod
def from_dict(d):
return EntNode(id=d["id"], vec=d.get("vec"), shards=d.get("shards",[]), score=d.get("score",1.0), ts=d.get("ts", time.time()))
@dataclass class SeedNode: id: str seed_vec: Optional[List[float]] members: List[str] diffs: Dict[str, List[int]] = field(default_factory=dict) quant_meta: Dict[str,Any] = field(default_factory=dict) ts: float = field(default_factory=time.time)
def to_dict(self):
return {"id":self.id,"seed_vec":self.seed_vec,"members":self.members,"diffs":self.diffs,"quant_meta":self.quant_meta,"ts":self.ts}
@staticmethod
def from_dict(d):
return SeedNode(id=d["id"], seed_vec=d.get("seed_vec"), members=d.get("members",[]), diffs=d.get("diffs",{}), quant_meta=d.get("quant_meta",{}), ts=d.get("ts", time.time()))
-------------------------
Registry & SeedIndex
-------------------------
class EntRegistry: def init(self, path=cfg.ent_path): self.path = path self.lock = threading.RLock() self.nodes: Dict[str,EntNode] = {} self._load()
def _load(self):
if os.path.exists(self.path):
try:
with open(self.path,"r",encoding="utf-8") as f:
data = json.load(f)
for nid, nd in data.get("nodes",{}).items():
self.nodes[nid] = EntNode.from_dict(nd)
except Exception:
self.nodes = {}
def save(self):
with self.lock:
data = {"nodes": {nid:n.to_dict() for nid,n in self.nodes.items()}}
safe_write_json(self.path, data)
def register(self,node:EntNode):
with self.lock:
self.nodes[node.id] = node
try:
safe_write_json(self.path, {"nodes": {nid:n.to_dict() for nid,n in self.nodes.items()}})
except Exception:
pass
Ledger.record("ENT_REGISTER", node.id, {"shards": len(node.shards)})
class SeedIndex: def init(self, path=cfg.seed_path): self.path = path self.lock = threading.RLock() self.seeds: Dict[str,SeedNode] = {} self._load()
def _load(self):
if os.path.exists(self.path):
try:
with open(self.path,"r",encoding="utf-8") as f:
data = json.load(f)
for sid, sd in data.get("seeds",{}).items():
self.seeds[sid] = SeedNode.from_dict(sd)
except Exception:
self.seeds = {}
def save(self):
with self.lock:
data = {"seeds": {sid:s.to_dict() for sid,s in self.seeds.items()}}
safe_write_json(self.path, data)
def register(self, seed:SeedNode):
with self.lock:
self.seeds[seed.id] = seed
try:
safe_write_json(self.path, {"seeds": {sid:s.to_dict() for sid,s in self.seeds.items()}})
except Exception:
pass
Ledger.record("SEED_REGISTER", seed.id, {"members": len(seed.members)})
-------------------------
Vector helpers
-------------------------
def _cosine(a,b): if not a or not b: return 0.0 try: if np is not None: aa = np.array(a, dtype=float); bb = np.array(b, dtype=float) an = np.linalg.norm(aa) + 1e-12; bn = np.linalg.norm(bb) + 1e-12 return float(np.dot(aa, bb) / (an * bn)) except Exception: pass m = min(len(a), len(b)) dot = sum((a[i] * b[i]) for i in range(m)) an = math.sqrt(sum(xx for x in a)) + 1e-12 bn = math.sqrt(sum(yy for y in b)) + 1e-12 return dot / (an * bn)
def mean_vec(vecs): if not vecs: return None try: if np is not None: arr = np.stack([np.array(v, dtype=float) for v in vecs], axis=0) return np.mean(arr, axis=0).tolist() except Exception: pass dim = max(len(v) for v in vecs) res = [0.0] * dim for v in vecs: for i, x in enumerate(v): res[i] += x n = len(vecs) return [x / n for x in res]
def vec_sub(a, b): if a is None or b is None: return None dim = max(len(a), len(b)) res = [] for i in range(dim): ai = a[i] if i < len(a) else 0.0 bi = b[i] if i < len(b) else 0.0 res.append(ai - bi) return res
def vec_add(a, b): if a is None: return b if b is None: return a dim = max(len(a), len(b)) res = [] for i in range(dim): ai = a[i] if i < len(a) else 0.0 bi = b[i] if i < len(b) else 0.0 res.append(ai + bi) return res
-------------------------
Quantization helpers
-------------------------
def quantize_list(vecs: List[List[float]], bits:int=cfg.quant_bits): flat = [x for v in vecs for x in v] if vecs else [] if not flat: return [], {} mn = min(flat); mx = max(flat) if mn == mx: q = [[0] * len(vecs[0]) for _ in vecs] return q, {"min": mn, "max": mx, "bits": bits} levels = (1 << bits) - 1 meta = {"min": mn, "max": mx, "bits": bits} qvecs = [] for v in vecs: qv = [int(round((x - mn) / (mx - mn) * levels)) for x in v] qvecs.append(qv) return qvecs, meta
def dequantize(qvec, meta): mn = meta.get("min", 0.0); mx = meta.get("max", 0.0); bits = meta.get("bits", cfg.quant_bits) levels = (1 << bits) - 1 if levels == 0: return [mn for _ in qvec] return [mn + (x / levels) * (mx - mn) for x in qvec]
-------------------------
Compressor with both strategies
-------------------------
class Compressor: def init(self, registry:EntRegistry, seed_index:SeedIndex): self.registry = registry self.seed_index = seed_index
# greedy cluster used by build_seeds and force-cluster
def greedy_cluster(self, nodes:List[EntNode], sim_thresh:float, min_group:int):
groups = []; used = set()
for i, a in enumerate(nodes):
if a.id in used:
continue
group = [a]; used.add(a.id)
for b in nodes[i+1:]:
if b.id in used:
continue
try:
if _cosine(a.vec, b.vec) >= sim_thresh:
group.append(b); used.add(b.id)
except Exception:
continue
if len(group) >= min_group:
groups.append(group)
return groups
def build_seeds(self, sim_thresh:float=cfg.default_sim, min_group:int=cfg.min_group, quant_bits:int=cfg.quant_bits):
nodes = list(self.registry.nodes.values())
if not nodes:
return {"created": 0}
groups = self.greedy_cluster(nodes, sim_thresh, min_group)
created = 0
for g in groups:
created += self._create_seed(g, quant_bits)
return {"created": created, "groups": len(groups)}
def _create_seed(self, group:List[EntNode], quant_bits:int):
vecs = [n.vec for n in group if n.vec is not None]
if not vecs:
return 0
seed_vec = mean_vec(vecs)
member_ids = []
diffs = []
ent_ids = []
for n in group:
member_ids.extend(n.shards)
if n.vec is not None:
d = vec_sub(n.vec, seed_vec)
if d is not None:
diffs.append(d)
ent_ids.append(n.shards[0] if n.shards else n.id)
qvecs, meta = quantize_list(diffs, bits=quant_bits) if diffs else ([], {})
diffs_map = {ent_ids[i]: qvecs[i] for i in range(len(ent_ids))} if qvecs else {}
seed_node = SeedNode(id=uid("seed-"), seed_vec=seed_vec, members=member_ids, diffs=diffs_map, quant_meta=meta)
self.seed_index.register(seed_node)
with self.registry.lock:
for n in group:
self.registry.nodes.pop(n.id, None)
self.registry.save()
Ledger.record("SEED_CREATED", seed_node.id, {"from":[n.id for n in group], "members":len(member_ids)})
append_comp_log({"ts": now_ts(), "seed": seed_node.id, "from":[n.id for n in group], "members": len(member_ids)})
log(f"Created seed {seed_node.id} from {[n.id for n in group]}")
return 1
# Method A: sign-agnostic + low-sim sublimation + quarantine
def complementary_sublimate_flexible(self, sim_thresh:float=cfg.default_sim, sim_min:float=cfg.sim_min, allow_sign_flip:bool=True, alpha:float=cfg.freq_alpha, beta:float=cfg.freq_beta, quant_bits:int=cfg.quant_bits, max_iters:int=cfg.default_iters, target_nodes:Optional[int]=None):
quarantine_path = cfg.quarantine_path
def load_quarantine():
try:
if os.path.exists(quarantine_path):
return json.load(open(quarantine_path, "r", encoding="utf-8"))
except Exception:
pass
return []
def save_quarantine(q):
try:
safe_write_json(quarantine_path, q)
except Exception:
pass
def node_freq_score(node: EntNode) -> float:
vals = [freq_store.get(s) for s in node.shards] if node.shards else [0]
mean = sum(vals) / max(1, len(vals))
try:
return math.tanh(alpha * (math.log1p(mean) - beta))
except Exception:
return 0.0
def pair_metric(a: EntNode, b: EntNode, sa: float, sb: float) -> float:
sim = _cosine(a.vec, b.vec) if (a.vec and b.vec) else 0.0
sign_bonus = 1.0
if sa * sb < 0:
sign_bonus = 1.2
elif allow_sign_flip:
sign_bonus = 1.05
sign_term = 1.0 - abs(sa + sb)
return sim * (abs(sa) + abs(sb) + 1e-6) * sign_term * sign_bonus * cfg.pair_sim_factor
quarantine = load_quarantine()
merged_total = 0
it = 0
while it < max_iters:
it += 1
nodes = list(self.registry.nodes.values())
if target_nodes and len(nodes) <= target_nodes:
break
if len(nodes) < 2:
break
scores = {n.id: node_freq_score(n) for n in nodes}
nodes_sorted = sorted(nodes, key=lambda x: abs(scores.get(x.id, 0.0)), reverse=True)
used = set()
pairs = []
candidate_list = []
for i, a in enumerate(nodes_sorted):
if a.id in used:
continue
sa = scores.get(a.id, 0.0)
best = None; best_metric = 0.0; best_sim = 0.0
for b in nodes_sorted[i+1:]:
if b.id in used:
continue
sb = scores.get(b.id, 0.0)
sim = _cosine(a.vec, b.vec) if (a.vec and b.vec) else 0.0
m = pair_metric(a, b, sa, sb)
candidate_list.append((a.id, b.id, sim, sa, sb, m))
if sim >= sim_thresh and m > best_metric:
best_metric = m; best = b; best_sim = sim
elif best is None and m > best_metric:
best_metric = m; best = b; best_sim = sim
if best:
if best_sim < sim_min:
quarantine.append({"a": a.id, "b": best.id, "sim": best_sim, "metric": best_metric, "iter": it, "retries": 0})
continue
if best_sim < sim_thresh:
pairs.append((a, best, best_metric, "low-sim"))
else:
pairs.append((a, best, best_metric, "high-sim"))
used.add(a.id); used.add(best.id)
if candidate_list:
candidate_list_sorted = sorted(candidate_list, key=lambda x: -x[5])[:50]
log(f"Candidate summary (top 50) iter={it} total_candidates={len(candidate_list)}")
for ca, cb, sim, sa, sb, m in candidate_list_sorted[:20]:
_append_debug(f"CAND {ca} {cb} sim={sim:.4f} sa={sa:.4f} sb={sb:.4f} metric={m:.6f}")
else:
log("No candidate pairs generated this iteration")
if not pairs:
log("No pairs selected this iteration; will attempt quarantine retry or relax-run later")
break
for a_node, b_node, metric, tag in pairs:
try:
vecs = [v for v in (a_node.vec, b_node.vec) if v is not None]
if not vecs:
continue
seed_vec = mean_vec(vecs)
shards = sorted(set(a_node.shards + b_node.shards))
diffs = []; ids = []
for n in (a_node, b_node):
if n.vec is not None:
d = vec_sub(n.vec, seed_vec)
if d is not None:
diffs.append(d); ids.append(n.shards[0] if n.shards else n.id)
qvecs, meta = quantize_list(diffs, bits=quant_bits) if diffs else ([], {})
diffs_map = {ids[i]: qvecs[i] for i in range(len(ids))} if qvecs else {}
seed = SeedNode(id=uid("seed-"), seed_vec=seed_vec, members=shards, diffs=diffs_map, quant_meta=meta)
if tag == "low-sim":
seed.quant_meta["low_sim_flag"] = True
seed.quant_meta["orig_metric"] = metric
self.seed_index.register(seed)
with self.registry.lock:
self.registry.nodes.pop(a_node.id, None)
self.registry.nodes.pop(b_node.id, None)
merged_ent = EntNode(id=uid("ent-"), vec=seed_vec, shards=shards, score=(a_node.score + b_node.score))
self.registry.nodes[merged_ent.id] = merged_ent
try:
safe_write_json(self.registry.path, {"nodes": {nid:n.to_dict() for nid,n in self.registry.nodes.items()}})
except Exception:
pass
Ledger.record("1PLUSNEG1_MERGE", seed.id, {"from":[a_node.id, b_node.id], "shards": len(shards), "metric": metric, "tag": tag})
append_comp_log({"ts": now_ts(), "seed": seed.id, "from":[a_node.id, b_node.id], "members": len(shards), "type":"1+-1", "metric": metric, "tag": tag})
log(f"Merged {a_node.id}+{b_node.id} -> {seed.id} tag={tag} metric={metric:.6f}")
merged_total += 1
except Exception:
log_exc("merge error")
try:
self.registry.save(); self.seed_index.save(); Ledger.dump()
except Exception:
log_exc("post-merge persist error")
# dedupe and save quarantine
if quarantine:
uniq = {}
for q in quarantine:
key = f"{q['a']}::{q['b']}"
if key not in uniq:
uniq[key] = q
else:
uniq[key]["retries"] = uniq[key].get("retries",0) + 1
save_quarantine(list(uniq.values()))
log(f"Quarantine saved {len(uniq)} pairs for retry")
return {"merged": merged_total, "iters": it, "remaining": len(self.registry.nodes)}
# Method B: force cluster and merge (aggressive)
def force_cluster_and_merge(self, eps:float=0.25, min_members:int=2, quant_bits:int=cfg.quant_bits):
nodes = list(self.registry.nodes.values())
if len(nodes) < 2:
log("force_cluster: not enough nodes")
return {"forced":0}
clusters: List[List[EntNode]] = []
used = set()
for i, a in enumerate(nodes):
if a.id in used:
continue
cluster = [a]; used.add(a.id)
for b in nodes[i+1:]:
if b.id in used:
continue
try:
if _cosine(a.vec, b.vec) >= eps:
cluster.append(b); used.add(b.id)
except Exception:
continue
if len(cluster) >= min_members:
clusters.append(cluster)
forced = 0
for cl in clusters:
try:
vecs = [n.vec for n in cl if n.vec is not None]
if not vecs:
continue
seed_vec = mean_vec(vecs)
shards = []
diffs = []; ids = []
for n in cl:
shards.extend(n.shards)
if n.vec is not None:
d = vec_sub(n.vec, seed_vec)
if d is not None:
diffs.append(d); ids.append(n.shards[0] if n.shards else n.id)
qvecs, meta = quantize_list(diffs, bits=quant_bits) if diffs else ([], {})
diffs_map = {ids[i]: qvecs[i] for i in range(len(ids))} if qvecs else {}
seed = SeedNode(id=uid("seed-"), seed_vec=seed_vec, members=sorted(set(shards)), diffs=diffs_map, quant_meta=meta)
seed.quant_meta["force_cluster_flag"] = True
self.seed_index.register(seed)
with self.registry.lock:
for n in cl:
self.registry.nodes.pop(n.id, None)
merged_ent = EntNode(id=uid("ent-"), vec=seed_vec, shards=sorted(set(shards)), score=sum(n.score for n in cl))
self.registry.nodes[merged_ent.id] = merged_ent
try:
safe_write_json(self.registry.path, {"nodes": {nid:n.to_dict() for nid,n in self.registry.nodes.items()}})
except Exception:
pass
Ledger.record("FORCE_CLUSTER_MERGE", seed.id, {"from":[n.id for n in cl], "members": len(shards), "eps": eps})
append_comp_log({"ts": now_ts(), "seed": seed.id, "from":[n.id for n in cl], "members": len(shards), "type":"force-cluster", "eps": eps})
log(f"Force-cluster created seed {seed.id} from {[n.id for n in cl]} eps={eps}")
forced += 1
except Exception:
log_exc("force-cluster merge error")
try:
self.registry.save(); self.seed_index.save(); Ledger.dump()
except Exception:
log_exc("force-cluster persist error")
return {"forced": forced}
-------------------------
LazyExpander (unchanged)
-------------------------
class LazyExpander: def init(self, seed_index:SeedIndex): self.seed_index = seed_index self.cache = {} self.lock = threading.RLock()
def quick_holo(self, seed:SeedNode, query_vec:Optional[List[float]]=None, alpha:float=0.6):
if query_vec is None:
emb = seed.seed_vec
else:
emb = vec_add([x*alpha for x in query_vec], [x*(1-alpha) for x in seed.seed_vec]) if seed.seed_vec else query_vec
delta = _cosine(seed.seed_vec, emb) if seed.seed_vec else 0.0
holo = {"id": uid("h-"), "embedding": emb, "confidence": 0.75, "seed": seed.id, "delta": delta}
Ledger.record("SEED_QUICK", holo["id"], {"seed":seed.id})
return holo
def expand(self, seed:SeedNode, top_n:int=6):
with self.lock:
if seed.id in self.cache:
return self.cache[seed.id]
members = []
if seed.quant_meta and seed.diffs:
for mid in seed.members[:top_n]:
q = seed.diffs.get(mid)
if q:
try:
diff = dequantize(q, seed.quant_meta)
emb = vec_add(seed.seed_vec, diff)
members.append({"id":mid,"embedding":emb})
except Exception:
members.append({"id":mid})
else:
members.append({"id":mid})
else:
for mid in seed.members[:top_n]:
members.append({"id":mid})
res = {"seed":seed.id,"members":members,"ts":time.time()}
with self.lock:
self.cache[seed.id] = res
Ledger.record("SEED_EXPAND", seed.id, {"members":len(members)})
return res
-------------------------
Self-bootstrap: ingest shards or inject samples
-------------------------
def _ingest_from_shards(registry:EntRegistry, max_import:int=1024): if not os.path.isdir(cfg.shard_dir): return 0 files = sorted(os.listdir(cfg.shard_dir)) imported = 0 seen_hashes = set() for fname in files[:max_import]: fpath = os.path.join(cfg.shard_dir, fname) try: with open(fpath, "rb") as f: payload = f.read() except Exception: continue import hashlib h = hashlib.sha256(payload).hexdigest() if h in seen_hashes: continue seen_hashes.add(h) vec = [] for i in range(cfg.dim): idx = (i * 2) % len(h) try: b = int(h[idx:idx+2], 16) except Exception: b = 0 val = ((b / 255.0) * 0.6) - 0.3 vec.append(val) nid = uid("ent-") shard_id = fname node = EntNode(id=nid, vec=vec, shards=[shard_id]) registry.register(node) imported += 1 return imported
_injected_counter_lock = threading.RLock() _injected_counter_path = os.path.join(cfg.data_dir, "injected_count.json")
def _get_injected_count(): try: if os.path.exists(_injected_counter_path): with open(_injected_counter_path, "r", encoding="utf-8") as f: return int(json.load(f).get("count", 0)) except Exception: pass return 0
def _set_injected_count(n:int): try: safe_write_json(_injected_counter_path, {"count": n}) except Exception: pass
def _inject_animation_samples(registry:EntRegistry, count:int = 24): samples = [ "pocket infinite storage", "memory bread copy restore", "memory camera snapshot replay", "time cloth restore state", "memory disk compress replay", "memory capsule compress small", "holographic pocket seed aggregator", "seed singularity compressed origin", "lazy expansion reconstruct" ] injected = 0 i = 0 count = min(count, cfg.max_auto_inject) with _injected_counter_lock: already = _get_injected_count() to_inject = max(0, count - already) while injected < to_inject: s = samples[i % len(samples)] + f" sample-{already+injected}" import hashlib h = hashlib.sha256(s.encode('utf-8')).digest() vec = [] for k in range(cfg.dim): b = h[k % len(h)] val = ((b / 255.0) * 0.6) - 0.3 vec.append(val) nid = uid("ent-") registry.register(EntNode(id=nid, vec=vec, shards=[f"anim-{already+injected}"])) injected += 1 i += 1 if injected > 0: _set_injected_count(already + injected) return injected
-------------------------
Backup & rollback helpers
-------------------------
def _prune_backups(): try: items = sorted(os.listdir(cfg.backup_dir)) if len(items) <= cfg.max_backup_keep: return for old in items[:-cfg.max_backup_keep]: p = os.path.join(cfg.backup_dir, old) try: if os.path.isdir(p): shutil.rmtree(p) else: os.remove(p) except Exception: pass except Exception: pass
def backup_state(): try: ts = int(time.time()) dest = os.path.join(cfg.backup_dir, f"backup{ts}") os.makedirs(dest, exist_ok=True) for p in (cfg.ent_path, cfg.seed_path, cfg.ledger_path, cfg.comp_log): if os.path.exists(p): try: shutil.copy2(p, dest) except Exception: pass log(f"Backup created at {dest}") _prune_backups() return dest except Exception as e: log(f"Backup failed: {e}") return None
def rollback_from_backup(backup_dir: str): try: for fname in ("ents.json","seeds.json","ledger.json","compression_log.json"): src = os.path.join(backup_dir, fname) dst = os.path.join(cfg.data_dir, fname) if os.path.exists(src): try: shutil.copy2(src, dst) except Exception: pass log(f"Rollback applied from {backup_dir}") except Exception as e: log(f"Rollback failed: {e}")
-------------------------
Quarantine retry & sublimation (embedded)
-------------------------
def retry_quarantine_and_sublimate(compressor: Compressor, top_k:int=20, relax_steps:List[float]=[0.55,0.50], interp_steps:int=5, perturb_sigma:float=0.02, cluster_merge:bool=False): """ Retry and 'sublimate' pairs from quarantine. Parameters: compressor: Compressor instance top_k: process top_k pairs by metric relax_steps: sequence of sim thresholds to try interp_steps: number of interpolation steps between vectors (inclusive) perturb_sigma: gaussian noise stddev for augmentation cluster_merge: if True, cluster related quarantine pairs first (not implemented here) """ qpath = cfg.quarantine_path if not os.path.exists(qpath): log("retry_quarantine: no quarantine file") return {"tried":0,"merged":0} try: q = json.load(open(qpath, "r", encoding="utf-8")) except Exception: log("retry_quarantine: failed to load quarantine") return {"tried":0,"merged":0} q_sorted = sorted(q, key=lambda x: -float(x.get("metric",0)))[:top_k] tried = 0; merged = 0; new_quarantine = []
def attempt_merge_by_vec(a_id, b_id, cand_vec, sim_target):
nonlocal merged
reg = compressor.registry
a = reg.nodes.get(a_id); b = reg.nodes.get(b_id)
if not a or not b:
return False
sim_a = _cosine(a.vec, cand_vec)
sim_b = _cosine(b.vec, cand_vec)
if sim_a >= sim_target and sim_b >= sim_target:
shards = sorted(set(a.shards + b.shards))
diffs=[]; ids=[]
for n in (a,b):
if n.vec is not None:
d = vec_sub(n.vec, cand_vec)
if d is not None:
diffs.append(d); ids.append(n.shards[0] if n.shards else n.id)
qvecs, meta = quantize_list(diffs, bits=cfg.quant_bits) if diffs else ([],{})
diffs_map = {ids[i]: qvecs[i] for i in range(len(ids))} if qvecs else {}
seed = SeedNode(id=uid("seed-"), seed_vec=cand_vec, members=shards, diffs=diffs_map, quant_meta=meta)
seed.quant_meta["sublimated_flag"] = True
compressor.seed_index.register(seed)
with compressor.registry.lock:
compressor.registry.nodes.pop(a.id, None)
compressor.registry.nodes.pop(b.id, None)
merged_ent = EntNode(id=uid("ent-"), vec=cand_vec, shards=shards, score=(a.score + b.score))
compressor.registry.nodes[merged_ent.id] = merged_ent
try:
safe_write_json(compressor.registry.path, {"nodes": {nid:n.to_dict() for nid,n in compressor.registry.nodes.items()}})
except Exception:
pass
Ledger.record("SUBLIMATE_MERGE", seed.id, {"from":[a.id,b.id],"sim_a":sim_a,"sim_b":sim_b,"target":sim_target})
append_comp_log({"ts": now_ts(), "seed": seed.id, "from":[a.id,b.id], "members": len(shards), "type":"sublimate", "sim_a":sim_a, "sim_b":sim_b, "target":sim_target})
log(f"Sublimated merge {a.id}+{b.id} -> {seed.id} sim_a={sim_a:.4f} sim_b={sim_b:.4f} target={sim_target:.3f}")
merged += 1
return True
return False
for item in q_sorted:
tried += 1
a = item.get("a"); b = item.get("b")
base_metric = float(item.get("metric",0))
reg = compressor.registry
na = reg.nodes.get(a); nb = reg.nodes.get(b)
if not na or not nb:
continue
vecs = []
for t in range(interp_steps+1):
alpha = t / max(1, interp_steps)
cand = [(1-alpha)*x + alpha*y for x,y in zip(na.vec, nb.vec)]
vecs.append(cand)
for p in range(2):
pert = [v + random.gauss(0, perturb_sigma) for v in cand]
vecs.append(pert)
merged_flag = False
for sim_target in relax_steps:
if merged_flag: break
for cand in vecs:
if attempt_merge_by_vec(a, b, cand, sim_target):
merged_flag = True
break
if not merged_flag:
item["retries"] = item.get("retries",0) + 1
if item["retries"] < cfg.quarantine_retry_limit:
new_quarantine.append(item)
else:
item["exhausted"] = True
new_quarantine.append(item)
try:
safe_write_json(cfg.quarantine_path, new_quarantine)
except Exception:
pass
try:
compressor.registry.save(); compressor.seed_index.save(); Ledger.dump()
except Exception:
pass
return {"tried":tried,"merged":merged}
-------------------------
Autonomous pipeline driver & watcher
-------------------------
class AutoDriver: def init(self): self.registry = EntRegistry() self.seed_index = SeedIndex() self.comp = Compressor(self.registry, self.seed_index) self.lock = threading.RLock() self.running = False self.sim = cfg.default_sim self.iters = cfg.default_iters self.min_group = cfg.min_group self.last_activity = time.time() self.metrics = {"runs":0,"merged":0,"seeds":0,"injected":_get_injected_count()} self._shutdown_requested = False
def request_shutdown(self):
self._shutdown_requested = True
def ensure_data(self):
if not self.registry.nodes:
imported = _ingest_from_shards(self.registry, max_import=1024)
if imported:
log(f"AutoDriver: imported {imported} shards")
else:
injected = _inject_animation_samples(self.registry, count=cfg.max_auto_inject)
if injected:
self.metrics["injected"] += injected
log(f"AutoDriver: injected {injected} samples")
def run_once(self, relax_sim: Optional[float] = None, force_cluster: bool = False, cluster_eps: float = 0.25):
with self.lock:
if self._shutdown_requested:
log("AutoDriver: shutdown requested; skipping run")
return
self.ensure_data()
if not self.registry.nodes:
log("AutoDriver: no nodes to process")
return
backup = _backup_state()
try:
sim_to_use = relax_sim if relax_sim is not None else self.sim
log(f"AutoDriver: running fusion with sim={sim_to_use:.3f} nodes={len(self.registry.nodes)}")
res1 = self.comp.complementary_sublimate_flexible(sim_thresh=sim_to_use, sim_min=cfg.sim_min, max_iters=2)
res2 = self.comp.build_seeds(sim_thresh=sim_to_use, min_group=self.min_group, quant_bits=cfg.quant_bits)
merged = res1.get("merged",0) + res2.get("created",0)
forced = 0
# If quarantine exists and no merges, attempt automatic quarantine retry/sublimation
try:
if merged == 0 and os.path.exists(cfg.quarantine_path):
log("AutoDriver: attempting automatic quarantine retry/sublimation")
retry_res = retry_quarantine_and_sublimate(self.comp, top_k=60, relax_steps=[0.55,0.50,0.45], interp_steps=6, perturb_sigma=0.03)
log(f"AutoDriver: quarantine retry result: {retry_res}")
merged += retry_res.get("merged",0)
except Exception:
log_exc("auto quarantine retry error")
if merged == 0 and force_cluster:
log("AutoDriver: no merges from flexible path; running force-cluster as backup")
forced_res = self.comp.force_cluster_and_merge(eps=cluster_eps, min_members=2, quant_bits=cfg.quant_bits)
forced = forced_res.get("forced", 0)
self.metrics["runs"] += 1
self.metrics["merged"] += merged + forced
self.metrics["seeds"] += res2.get("created",0)
self.last_activity = time.time()
log(f"AutoDriver: run completed merged={merged} forced={forced} seeds_created={res2.get('created',0)} remaining_nodes={len(self.registry.nodes)}")
if merged + forced > 0:
self.sim = min(0.995, self.sim + 0.01)
else:
self.sim = max(0.50, self.sim - 0.02)
self.registry.save(); self.seed_index.save(); Ledger.dump()
except Exception as e:
log(f"AutoDriver: error during run: {e}")
log_exc("run_once exception")
if backup:
rollback_from_backup(backup)
def watch_loop(self, poll_interval: float = cfg.poll_interval):
self.running = True
log("AutoDriver: entering watch loop")
self.run_once()
while self.running and not self._shutdown_requested:
if os.path.exists(cfg.shutdown_signal):
log("AutoDriver: shutdown signal detected; exiting watch loop")
break
try:
new_found = False
for fname in os.listdir(cfg.shard_dir):
fpath = os.path.join(cfg.shard_dir, fname)
try:
mtime = os.path.getmtime(fpath)
if mtime > self.last_activity - 1:
new_found = True
break
except Exception:
continue
if new_found:
log("AutoDriver: new shard detected -> triggering run")
self.run_once()
if time.time() - self.last_activity > cfg.idle_run_seconds:
log("AutoDriver: idle timeout -> periodic run")
self.run_once()
except Exception as e:
log(f"AutoDriver: watch loop error: {e}")
log_exc("watch_loop exception")
time.sleep(poll_interval)
self.running = False
log("AutoDriver: watch loop ended")
-------------------------
Optional minimal HTTP status server
-------------------------
class StatusHandler(http.server.BaseHTTPRequestHandler): def do_GET(self): if self.path.startswith("/status"): try: body = { "ts": now_ts(), "ents": len(EntRegistry().nodes), "seeds": len(SeedIndex().seeds), "metrics": auto_driver.metrics if 'auto_driver' in globals() else {} } self.send_response(200) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(json.dumps(body).encode("utf-8")) except Exception: self.send_response(500); self.end_headers() elif self.path.startswith("/metrics"): try: m = auto_driver.metrics if 'auto_driver' in globals() else {} lines = [] for k,v in m.items(): lines.append(f"qem_{k} {v}") self.send_response(200) self.send_header("Content-Type", "text/plain; version=0.0.4") self.end_headers() self.wfile.write("\n".join(lines).encode("utf-8")) except Exception: self.send_response(500); self.end_headers() else: self.send_response(404); self.end_headers()
def log_message(self, format, *args):
return
def start_status_server(port:int): if port <= 0: return None try: server = socketserver.ThreadingTCPServer(("0.0.0.0", port), StatusHandler) t = threading.Thread(target=server.serve_forever, daemon=True) t.start() log(f"Status server listening on port {port}") return server except Exception as e: log(f"Failed to start status server: {e}") return None
-------------------------
Entrypoint
-------------------------
def main(argv): parser = argparse.ArgumentParser(description="QEM Autonomous Final Embedded") parser.add_argument("--debug-run-once", action="store_true", help="Run one detailed debug iteration then exit") parser.add_argument("--no-auto", action="store_true", help="Do not auto-run (exit)") parser.add_argument("--relax-sim", type=float, default=None, help="Temporarily relax sim threshold for debug run") parser.add_argument("--force-cluster", action="store_true", help="If no merges, run force-cluster as backup") parser.add_argument("--cluster-eps", type=float, default=0.25, help="Eps for force-cluster") args = parser.parse_args(argv)
global auto_driver
auto_driver = AutoDriver()
server = start_status_server(cfg.status_port)
if args.no_auto:
log("Auto-run disabled by --no-auto; exiting")
return
if args.debug_run_once:
log("DEBUG RUN ONCE: starting detailed single run")
try:
with open(cfg.debug_log, "a", encoding="utf-8") as f:
f.write(f"\n=== DEBUG RUN START {now_ts()} ===\n")
except Exception:
pass
auto_driver.run_once(relax_sim=args.relax_sim, force_cluster=args.force_cluster, cluster_eps=args.cluster_eps)
log("DEBUG RUN ONCE: finished; check qem_cloud_data/debug_log.txt for details")
if server:
try:
server.shutdown(); server.server_close()
except Exception:
pass
return
try:
auto_driver.watch_loop(poll_interval=cfg.poll_interval)
except KeyboardInterrupt:
log("KeyboardInterrupt received; shutting down")
finally:
if server:
try:
server.shutdown(); server.server_close()
except Exception:
pass
Ledger.dump()
log("Autonomous process exiting")
if name == "main": main(sys.argv[1:])