CodexFlow_TM / app.py
LordXido's picture
Update app.py
4c2f227 verified
# app.py
# CodexFlow / GVBDMS v3 β€’ January 11, 2026
# Persistent provenance ledger + first gentle IRE influence (coherence check + Ξ© smoothing)
import gradio as gr
import requests
import time
import json
import hashlib
import sqlite3
import numpy as np
from typing import Dict, Any, List, Optional, Tuple
from collections import deque
# ──────────────────────────────────────────────────────────────
# CONFIGURATION
# ──────────────────────────────────────────────────────────────
DB_PATH = "codexflow_gvbdms_v3.db"
WORLD_BANK_BASE = "https://api.worldbank.org/v2"
DEFAULT_YEAR = "2023"
INDICATORS = {
"GDP": "NY.GDP.MKTP.CD",
"INFLATION": "FP.CPI.TOTL.ZG",
"POPULATION": "SP.POP.TOTL",
}
# Very simple toy intent anchor for first coherence check
INTENT_ANCHOR = {"stability": 0.92, "transparency": 0.88}
COHERENCE_THRESHOLD = 0.65 # records below this are refused
OMEGA_MEMORY = deque(maxlen=8) # very light causal smoothing buffer
# ──────────────────────────────────────────────────────────────
# DATABASE LAYER
# ──────────────────────────────────────────────────────────────
def init_db():
with sqlite3.connect(DB_PATH) as con:
cur = con.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts REAL NOT NULL,
hash TEXT UNIQUE NOT NULL,
prev_hash TEXT NOT NULL,
schema TEXT NOT NULL,
country TEXT NOT NULL,
source TEXT NOT NULL,
reliability REAL NOT NULL,
latency_s REAL NOT NULL,
payload_json TEXT NOT NULL,
metadata_json TEXT NOT NULL,
coherence_score REAL,
bytes INTEGER NOT NULL,
entropy_proxy REAL NOT NULL
)
""")
cur.execute("CREATE INDEX IF NOT EXISTS idx_schema_country ON records(schema, country)")
cur.execute("CREATE INDEX IF NOT EXISTS idx_ts ON records(ts)")
print("Database initialized.")
init_db()
def get_tip_hash() -> str:
with sqlite3.connect(DB_PATH) as con:
cur = con.cursor()
cur.execute("SELECT hash FROM records ORDER BY id DESC LIMIT 1")
row = cur.fetchone()
return row[0] if row else "GENESIS"
def insert_record(rec: Dict) -> bool:
try:
with sqlite3.connect(DB_PATH) as con:
cur = con.cursor()
cur.execute("""
INSERT INTO records (
ts, hash, prev_hash, schema, country, source, reliability, latency_s,
payload_json, metadata_json, coherence_score, bytes, entropy_proxy
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
rec["ts"], rec["hash"], rec["prev_hash"], rec["schema"], rec["country"],
rec["source"], rec["reliability"], rec["latency_s"],
rec["payload_json"], rec["metadata_json"], rec.get("coherence_score"),
rec["bytes"], rec["entropy_proxy"]
))
return True
except sqlite3.IntegrityError:
return False # duplicate hash
# ──────────────────────────────────────────────────────────────
# UTILITIES
# ──────────────────────────────────────────────────────────────
def canonical_bytes(obj: Any) -> bytes:
return json.dumps(obj, sort_keys=True, separators=(",", ":")).encode('utf-8')
def compute_bit_stats(payload: Dict) -> Tuple[int, float]:
b = canonical_bytes(payload)
n = len(b)
return n, round(len(set(b)) / max(n, 1), 6)
def hash_chain(payload: Dict, prev: str) -> str:
return hashlib.sha256(canonical_bytes({"payload": payload, "prev": prev})).hexdigest()
def toy_coherence_score(values: Dict[str, float]) -> float:
"""Extremely simple first coherence proxy against intent anchor"""
scores = []
for k, target in INTENT_ANCHOR.items():
v = values.get(k)
if v is not None:
diff = abs(v - target) / max(abs(target), 0.01)
scores.append(max(0.0, 1.0 - min(1.0, diff)))
return round(np.mean(scores) if scores else 0.5, 4)
def omega_smooth(key: str, value: float) -> float:
if not OMEGA_MEMORY:
OMEGA_MEMORY.append({key: value})
return value
prev = OMEGA_MEMORY[-1].get(key, value)
smoothed = 0.25 * value + 0.75 * prev
OMEGA_MEMORY.append({key: smoothed})
return round(smoothed, 6)
# ──────────────────────────────────────────────────────────────
# DATA INGEST & SIGNAL GENERATION
# ──────────────────────────────────────────────────────────────
def fetch_macro(country: str, year: str) -> Dict:
result = {"type": "macro", "country": country, "year": year}
t0 = time.time()
for name, code in INDICATORS.items():
try:
r = requests.get(
f"{WORLD_BANK_BASE}/country/{country}/indicator/{code}?format=json&date={year}&per_page=1",
timeout=7
).json()
result[name.lower()] = r[1][0]["value"] if len(r) > 1 and r[1] else None
except:
result[name.lower()] = None
latency = time.time() - t0
return result, latency
def generate_signals(commodity: str, anchor: float, macro: Dict, lag_days: int) -> List[Tuple[Dict, str]]:
gdp_scale = macro.get("gdp", 1e14) / 1e14 if macro.get("gdp") else 1.0
supply = anchor * gdp_scale
price = omega_smooth("price_index", round((supply / 11.0) * gdp_scale, 6))
econ = {
"type": "commodity",
"commodity": commodity,
"supply": round(supply, 4),
"demand": round(supply * 0.95, 4),
"price_index": price,
"flow": round(supply * 0.95 * price, 4)
}
friction = abs(econ["supply"] - econ["demand"]) / max(econ["supply"], 1e-9)
logi = {"type": "logistics", "friction": round(friction, 6)}
ener = {"type": "energy", "cost_index": round(price * 0.42, 4),
"dependency": "high" if commodity.lower() in ["oil","gas"] else "moderate"}
sent = {"type": "sentiment", "confidence": omega_smooth("confidence", np.random.uniform(0.62, 0.91))}
feat = {
"type": "features",
"lag_days": lag_days,
"projected_price": round(price * (1 + (1 - sent["confidence"]) * 0.07), 6),
"volatility": round(0.012 * lag_days, 6)
}
return [(econ, "commodity.v1"), (logi, "logistics.v1"), (ener, "energy.v1"),
(sent, "sentiment.v1"), (feat, "features.v1")]
# ──────────────────────────────────────────────────────────────
# CORE TICK FUNCTION (with coherence refusal)
# ──────────────────────────────────────────────────────────────
def run_tick(commodity: str, anchor: float, country: str, lag_days: int, use_live: bool, year: str):
macro, latency = fetch_macro(country, year) if use_live else (
{"type": "macro", "country": country, "year": year, "gdp": None, "inflation": None, "population": None}, 0.0
)
macro_coh = toy_coherence_score({
"stability": 1.0 - abs(macro.get("inflation", 0) or 0) / 10,
})
if macro_coh < COHERENCE_THRESHOLD:
return {"status": "refused", "reason": f"Macro coherence too low: {macro_coh:.3f}", "tip": get_tip_hash()}, None
macro_rec = {
"ts": time.time(),
"hash": hash_chain(macro, get_tip_hash()),
"prev_hash": get_tip_hash(),
"schema": "macro.v1",
"country": country,
"source": "world_bank" if use_live else "synthetic",
"reliability": 0.88 if use_live else 0.65,
"latency_s": round(latency, 4),
"payload_json": json.dumps(macro, sort_keys=True),
"metadata_json": json.dumps({"note": "anchor ingest"}, sort_keys=True),
"coherence_score": macro_coh,
"bytes": len(canonical_bytes(macro)),
"entropy_proxy": compute_bit_stats(macro)[1]
}
if not insert_record(macro_rec):
return {"status": "error", "reason": "duplicate hash"}, None
signals = generate_signals(commodity, anchor, macro, lag_days)
added = 0
for payload, schema in signals:
coh = toy_coherence_score({"stability": 1.0 - payload.get("friction", 0)})
if coh < COHERENCE_THRESHOLD:
continue # refuse low-coherence derived signal
rec = {
"ts": time.time(),
"hash": hash_chain(payload, get_tip_hash()),
"prev_hash": get_tip_hash(),
"schema": schema,
"country": country,
"source": "derived",
"reliability": 0.92,
"latency_s": 0.0,
"payload_json": json.dumps(payload, sort_keys=True),
"metadata_json": json.dumps({"linked_macro": macro_rec["hash"]}, sort_keys=True),
"coherence_score": coh,
"bytes": len(canonical_bytes(payload)),
"entropy_proxy": compute_bit_stats(payload)[1]
}
if insert_record(rec):
added += 1
tip = get_tip_hash()
return {
"status": "ok",
"added": added,
"tip_hash": tip,
"macro_coherence": macro_coh
}, tip
# ──────────────────────────────────────────────────────────────
# QUERY & CHAT
# ──────────────────────────────────────────────────────────────
def query_records(limit: int = 50, schema: str = "ANY", country: str = "ANY") -> List[Dict]:
limit = max(1, min(int(limit), 300))
with sqlite3.connect(DB_PATH) as con:
cur = con.cursor()
where = []
params = []
if schema != "ANY":
where.append("schema = ?")
params.append(schema)
if country != "ANY":
where.append("country = ?")
params.append(country)
sql = "SELECT ts, hash, prev_hash, schema, country, coherence_score FROM records"
if where:
sql += " WHERE " + " AND ".join(where)
sql += " ORDER BY id DESC LIMIT ?"
params.append(limit)
cur.execute(sql, params)
rows = cur.fetchall()
return [{"ts": r[0], "hash": r[1], "prev": r[2], "schema": r[3], "country": r[4], "coherence": r[5]} for r in rows]
def jarvis(message: str, history):
m = message.lower().strip()
if "latest" in m or "tip" in m:
recs = query_records(1)
return json.dumps(recs[0] if recs else {"status": "empty"}, indent=2)
if "coherence" in m:
recs = query_records(20)
coh_values = [r["coherence"] for r in recs if r["coherence"] is not None]
return f"Recent coherence (last {len(coh_values)}): mean = {np.mean(coh_values):.3f}" if coh_values else "No coherence data yet"
return "Commands: latest, tip, coherence"
# ──────────────────────────────────────────────────────────────
# GRADIO INTERFACE
# ──────────────────────────────────────────────────────────────
with gr.Blocks(title="CodexFlow v3 β€’ IRE Influence") as app:
gr.Markdown("# CodexFlow v3 β€’ Provenance Ledger + First IRE Touch")
gr.Markdown("SQLite β€’ Hash chain β€’ Bit stats β€’ Simple coherence refusal & smoothing")
with gr.Row():
with gr.Column(scale=2):
comm = gr.Dropdown(["Gold","Oil","Gas","Wheat","Copper"], "Gold", label="Commodity")
anch = gr.Number(950, label="Anchor")
cntry = gr.Textbox("WLD", label="Country")
lag_d = gr.Slider(1, 365, 7, label="Lag days")
yr = gr.Textbox(DEFAULT_YEAR, label="Year")
live = gr.Checkbox(True, label="Live World Bank")
btn = gr.Button("Run Tick", variant="primary")
res = gr.JSON(label="Result")
tip = gr.Textbox(label="Tip Hash", interactive=False)
btn.click(run_tick, [comm, anch, cntry, lag_d, live, yr], [res, tip])
gr.Markdown("### Query")
lim = gr.Slider(5, 200, 30, label="Limit")
sch = gr.Dropdown(["ANY", "macro.v1", "commodity.v1", "features.v1"], "ANY", label="Schema")
qry_btn = gr.Button("Query")
out = gr.JSON(label="Records")
qry_btn.click(query_records, [lim, sch, cntry], out)
with gr.Column(scale=1):
gr.Markdown("### Jarvis X")
gr.ChatInterface(jarvis, chatbot=gr.Chatbot(height=400))
gr.Markdown("**v3** β€’ First coherence check & Ξ© smoothing β€’ Still toy-level IRE influence")
if __name__ == "__main__":
app.launch()