Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import hashlib | |
| import json | |
| import time | |
| import io | |
| import zipfile | |
| from dataclasses import dataclass, asdict | |
| from typing import List, Dict, Any, Tuple | |
| # ============================================================ | |
| # CodexByte ΩΞ Runtime — Single-File, HF-Safe, Proof-First | |
| # ============================================================ | |
| # ---------------------------- | |
| # ISA (stable core) | |
| # ---------------------------- | |
| OPCODES = { | |
| "HALT": 0x00, | |
| "LOAD_IMM": 0x01, # r, v | |
| "LOAD_MEM": 0x02, # r, a | |
| "STORE": 0x03, # r, a | |
| "ADD": 0x04, # r1, r2 | |
| "SUB": 0x05, # r1, r2 | |
| "MUL": 0x06, # r1, r2 | |
| "DIV": 0x07, # r1, r2 | |
| "CMP": 0x08, # r1, r2 -> sets Z | |
| "JMP": 0x09, # a | |
| "JZ": 0x0A, # a | |
| "JNZ": 0x0B, # a | |
| "HASH": 0x0C, # r -> r becomes 64-bit int derived from sha256 | |
| "TIME": 0x10, # r -> monotonic ms (deterministic-ish per run; used for trace only) | |
| "COMMIT": 0x0F, # -> append chained hash to Ω-ledger | |
| "EMIT": 0x11, # r -> LAST_EMIT in mem | |
| } | |
| OPNAMES = {v: k for k, v in OPCODES.items()} | |
| # ---------------------------- | |
| # Utilities | |
| # ---------------------------- | |
| def sha256_hex(b: bytes) -> str: | |
| return hashlib.sha256(b).hexdigest() | |
| def stable_json(obj: Any) -> bytes: | |
| return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode("utf-8") | |
| def clamp_int64(x: int) -> int: | |
| # Keep values bounded (prevents unbounded growth) | |
| return int(max(min(x, (1 << 63) - 1), -(1 << 63))) | |
| def parse_int(token: str) -> int: | |
| # Accept decimal or 0x.. forms | |
| return int(token, 0) | |
| # ---------------------------- | |
| # Compiler: Text -> Bytecode | |
| # ---------------------------- | |
| def compile_codexbyte(source: str) -> List[int]: | |
| """ | |
| Assembles a simple assembly-like form into a flat integer bytecode list. | |
| Each instruction is encoded as: [opcode, operand1, operand2, ...] with fixed arity per op. | |
| """ | |
| lines = source.splitlines() | |
| bytecode: List[int] = [] | |
| for raw in lines: | |
| line = raw.strip() | |
| if not line or line.startswith(";") or line.startswith("#"): | |
| continue | |
| # strip inline comments | |
| if ";" in line: | |
| line = line.split(";", 1)[0].strip() | |
| if "#" in line: | |
| line = line.split("#", 1)[0].strip() | |
| if not line: | |
| continue | |
| parts = line.split() | |
| op = parts[0].upper() | |
| if op not in OPCODES: | |
| raise ValueError(f"Unknown opcode: {op}") | |
| bytecode.append(OPCODES[op]) | |
| # encode operands | |
| for tok in parts[1:]: | |
| bytecode.append(parse_int(tok)) | |
| return bytecode | |
| # ---------------------------- | |
| # Trace record | |
| # ---------------------------- | |
| class TraceStep: | |
| step: int | |
| pc_before: int | |
| op: str | |
| operands: List[int] | |
| reg_before: List[int] | |
| reg_after: List[int] | |
| flags_before: Dict[str, bool] | |
| flags_after: Dict[str, bool] | |
| mem_writes: List[Tuple[str, int]] # (addr, value) | |
| ledger_added: str | None | |
| # ---------------------------- | |
| # VM: Bytecode -> State/Proof | |
| # ---------------------------- | |
| class CodexByteVM: | |
| def __init__(self): | |
| self.reset() | |
| def reset(self): | |
| self.reg = [0] * 8 | |
| self.mem: Dict[Any, int] = {} | |
| self.flags = {"Z": False} | |
| self.pc = 0 | |
| self.omega_ledger: List[str] = [] | |
| self._ledger_head = "0" * 64 # chained hash head | |
| self.last_trace: List[TraceStep] = [] | |
| def _state_digest(self) -> str: | |
| # Deterministic digest of current state (regs + mem + flags + pc) | |
| obj = { | |
| "pc": self.pc, | |
| "reg": self.reg, | |
| "mem": self.mem, | |
| "flags": self.flags, | |
| "ledger_head": self._ledger_head, | |
| } | |
| return sha256_hex(stable_json(obj)) | |
| def _commit(self) -> str: | |
| # Chain: head <- sha256(head || state_digest) | |
| sd = self._state_digest() | |
| new_head = sha256_hex((self._ledger_head + sd).encode("utf-8")) | |
| self._ledger_head = new_head | |
| self.omega_ledger.append(new_head) | |
| return new_head | |
| def run(self, prog: List[int], step_limit: int = 20000, trace: bool = True) -> Dict[str, Any]: | |
| self.last_trace = [] | |
| steps = 0 | |
| while True: | |
| if steps >= step_limit: | |
| raise RuntimeError(f"Step limit exceeded ({step_limit}). Possible infinite loop.") | |
| if self.pc < 0 or self.pc >= len(prog): | |
| raise RuntimeError(f"PC out of bounds: pc={self.pc}, program_len={len(prog)}") | |
| pc_before = self.pc | |
| op = prog[self.pc] | |
| self.pc += 1 | |
| opname = OPNAMES.get(op, f"OP_{op:02X}") | |
| reg_before = self.reg.copy() | |
| flags_before = dict(self.flags) | |
| mem_writes: List[Tuple[str, int]] = [] | |
| ledger_added = None | |
| def need(n: int): | |
| if self.pc + n > len(prog): | |
| raise RuntimeError(f"Truncated operands for {opname} at pc={pc_before}") | |
| def read1() -> int: | |
| nonlocal prog | |
| v = prog[self.pc] | |
| self.pc += 1 | |
| return v | |
| def read2() -> Tuple[int, int]: | |
| need(2) | |
| a = read1() | |
| b = read1() | |
| return a, b | |
| # ---- Execute ---- | |
| if op == 0x00: # HALT | |
| pass | |
| elif op == 0x01: # LOAD_IMM r v | |
| r, v = read2() | |
| self._check_reg(r) | |
| self.reg[r] = clamp_int64(v) | |
| elif op == 0x02: # LOAD_MEM r a | |
| r, a = read2() | |
| self._check_reg(r) | |
| self.reg[r] = clamp_int64(self.mem.get(self._addr(a), 0)) | |
| elif op == 0x03: # STORE r a | |
| r, a = read2() | |
| self._check_reg(r) | |
| addr = self._addr(a) | |
| self.mem[addr] = clamp_int64(self.reg[r]) | |
| mem_writes.append((str(addr), self.mem[addr])) | |
| elif op == 0x04: # ADD r1 r2 | |
| r1, r2 = read2() | |
| self._check_reg(r1); self._check_reg(r2) | |
| self.reg[r1] = clamp_int64(self.reg[r1] + self.reg[r2]) | |
| elif op == 0x05: # SUB r1 r2 | |
| r1, r2 = read2() | |
| self._check_reg(r1); self._check_reg(r2) | |
| self.reg[r1] = clamp_int64(self.reg[r1] - self.reg[r2]) | |
| elif op == 0x06: # MUL r1 r2 | |
| r1, r2 = read2() | |
| self._check_reg(r1); self._check_reg(r2) | |
| self.reg[r1] = clamp_int64(self.reg[r1] * self.reg[r2]) | |
| elif op == 0x07: # DIV r1 r2 | |
| r1, r2 = read2() | |
| self._check_reg(r1); self._check_reg(r2) | |
| if self.reg[r2] == 0: | |
| raise ZeroDivisionError("DIV by zero") | |
| self.reg[r1] = clamp_int64(int(self.reg[r1] / self.reg[r2])) | |
| elif op == 0x08: # CMP r1 r2 | |
| r1, r2 = read2() | |
| self._check_reg(r1); self._check_reg(r2) | |
| self.flags["Z"] = (self.reg[r1] == self.reg[r2]) | |
| elif op == 0x09: # JMP a | |
| a = read1() | |
| self.pc = self._pc(a, len(prog)) | |
| elif op == 0x0A: # JZ a | |
| a = read1() | |
| if self.flags["Z"]: | |
| self.pc = self._pc(a, len(prog)) | |
| elif op == 0x0B: # JNZ a | |
| a = read1() | |
| if not self.flags["Z"]: | |
| self.pc = self._pc(a, len(prog)) | |
| elif op == 0x0C: # HASH r (store as 64-bit int) | |
| r = read1() | |
| self._check_reg(r) | |
| h = sha256_hex(str(self.reg[r]).encode("utf-8")) | |
| self.reg[r] = int(h[:16], 16) # 64-bit-ish | |
| elif op == 0x10: # TIME r | |
| r = read1() | |
| self._check_reg(r) | |
| # Note: time is not strictly deterministic across runs; use only for observability. | |
| self.reg[r] = int(time.time() * 1000) | |
| elif op == 0x0F: # COMMIT | |
| ledger_added = self._commit() | |
| elif op == 0x11: # EMIT r | |
| r = read1() | |
| self._check_reg(r) | |
| self.mem["LAST_EMIT"] = clamp_int64(self.reg[r]) | |
| mem_writes.append(("LAST_EMIT", self.mem["LAST_EMIT"])) | |
| else: | |
| raise RuntimeError(f"Unsupported opcode: 0x{op:02X} at pc={pc_before}") | |
| reg_after = self.reg.copy() | |
| flags_after = dict(self.flags) | |
| # Trace | |
| if trace: | |
| # operands captured approximately: from prog slice | |
| # best-effort decode: since pc moved, reconstruct from pc_before+1 to current pc | |
| op_slice = prog[pc_before+1:self.pc] | |
| self.last_trace.append(TraceStep( | |
| step=steps, | |
| pc_before=pc_before, | |
| op=opname, | |
| operands=op_slice, | |
| reg_before=reg_before, | |
| reg_after=reg_after, | |
| flags_before=flags_before, | |
| flags_after=flags_after, | |
| mem_writes=mem_writes, | |
| ledger_added=ledger_added | |
| )) | |
| steps += 1 | |
| if op == 0x00: # HALT | |
| break | |
| return self.snapshot() | |
| def snapshot(self) -> Dict[str, Any]: | |
| return { | |
| "registers": self.reg, | |
| "memory": self.mem, | |
| "flags": self.flags, | |
| "omega_ledger": self.omega_ledger, | |
| "ledger_head": self._ledger_head, | |
| "state_digest": self._state_digest(), | |
| } | |
| def _check_reg(self, r: int): | |
| if not (0 <= r < 8): | |
| raise ValueError(f"Invalid register index r={r}, expected 0..7") | |
| def _addr(self, a: int) -> str: | |
| # normalize address into stable string | |
| return hex(a) if isinstance(a, int) else str(a) | |
| def _pc(self, a: int, n: int) -> int: | |
| if not (0 <= a < n): | |
| raise ValueError(f"Invalid jump target {a}, program_len={n}") | |
| return a | |
| # ---------------------------- | |
| # Proof bundle (ZIP export) | |
| # ---------------------------- | |
| def build_proof_bundle( | |
| source: str, | |
| bytecode: List[int], | |
| result: Dict[str, Any], | |
| trace_steps: List[TraceStep], | |
| ) -> bytes: | |
| trace_json = [asdict(s) for s in trace_steps] | |
| bundle = { | |
| "meta": { | |
| "system": "CodexByte_Runtime", | |
| "proof_format": "OmegaTraceBundle.v1", | |
| }, | |
| "source": source, | |
| "bytecode": bytecode, | |
| "result": result, | |
| "trace": trace_json, | |
| } | |
| verifier_py = r''' | |
| import json, hashlib | |
| def sha256_hex(b: bytes) -> str: | |
| return hashlib.sha256(b).hexdigest() | |
| def stable_json(obj): | |
| return json.dumps(obj, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode("utf-8") | |
| def replay(bundle_path: str): | |
| with open(bundle_path, "rb") as f: | |
| z = f.read() | |
| # This verifier expects you to open the ZIP and inspect bundle.json. | |
| print("Open the ZIP, extract bundle.json, and use your runtime to replay bytecode.") | |
| print("Bundle bytes sha256:", sha256_hex(z)) | |
| if __name__ == "__main__": | |
| replay("CodexByte_ProofBundle.zip") | |
| '''.lstrip() | |
| mem = io.BytesIO() | |
| with zipfile.ZipFile(mem, "w", compression=zipfile.ZIP_DEFLATED) as z: | |
| z.writestr("bundle.json", json.dumps(bundle, indent=2, ensure_ascii=False)) | |
| z.writestr("verifier.py", verifier_py) | |
| z.writestr("README.txt", | |
| "CodexByte Proof Bundle\n" | |
| "- bundle.json contains source, bytecode, trace, result\n" | |
| "- verifier.py provides a minimal integrity hook\n" | |
| "Replay verification: run the same bytecode in the runtime; compare ledger_head/state_digest.\n") | |
| return mem.getvalue() | |
| # ---------------------------- | |
| # Samples | |
| # ---------------------------- | |
| SAMPLES: Dict[str, str] = { | |
| "Contract: obligation satisfied (commit+emit)": """\ | |
| ; If mem[0x20] == 1000 -> satisfied (emit 0), else breach (emit 1) | |
| LOAD_IMM 0 1000 | |
| LOAD_MEM 1 0x20 | |
| CMP 1 0 | |
| JZ 18 | |
| ; breach | |
| LOAD_IMM 2 1 | |
| STORE 2 0x30 | |
| COMMIT | |
| EMIT 2 | |
| HALT | |
| ; satisfied | |
| LOAD_IMM 2 0 | |
| STORE 2 0x30 | |
| COMMIT | |
| EMIT 2 | |
| HALT | |
| """, | |
| "Ledger integrity demo (multi-commit)": """\ | |
| LOAD_IMM 0 7 | |
| COMMIT | |
| ADD 0 0 | |
| COMMIT | |
| HASH 0 | |
| COMMIT | |
| EMIT 0 | |
| HALT | |
| """, | |
| "Loop demo (safe with step limit)": """\ | |
| LOAD_IMM 0 0 | |
| LOAD_IMM 1 1 | |
| ADD 0 1 | |
| JMP 4 | |
| HALT | |
| """, | |
| } | |
| # ============================================================ | |
| # Gradio App | |
| # ============================================================ | |
| vm = CodexByteVM() | |
| def load_sample(name: str) -> str: | |
| return SAMPLES.get(name, "") | |
| def run_program(source: str, step_limit: int, enable_trace: bool, preload_mem_0x20: int): | |
| vm.reset() | |
| # preload for common contract patterns | |
| vm.mem[hex(0x20)] = int(preload_mem_0x20) | |
| bytecode = compile_codexbyte(source) | |
| result = vm.run(bytecode, step_limit=step_limit, trace=enable_trace) | |
| trace = [asdict(s) for s in vm.last_trace] if enable_trace else [] | |
| proof_zip = build_proof_bundle(source, bytecode, result, vm.last_trace if enable_trace else []) | |
| # gr.File expects a path OR a tuple (name, bytes) in newer versions. | |
| # Use (filename, bytes) which HF Gradio accepts. | |
| return ( | |
| { | |
| "status": "OK", | |
| "result": result, | |
| "bytecode_len": len(bytecode), | |
| "preloaded_memory": {"0x20": preload_mem_0x20}, | |
| }, | |
| trace, | |
| ("CodexByte_ProofBundle.zip", proof_zip), | |
| ) | |
| def replay_verify(source: str, step_limit: int, preload_mem_0x20: int): | |
| """ | |
| Replay check: run twice, compare final ledger_head & state_digest. | |
| """ | |
| # run 1 | |
| vm1 = CodexByteVM() | |
| vm1.mem[hex(0x20)] = int(preload_mem_0x20) | |
| bc = compile_codexbyte(source) | |
| r1 = vm1.run(bc, step_limit=step_limit, trace=False) | |
| # run 2 | |
| vm2 = CodexByteVM() | |
| vm2.mem[hex(0x20)] = int(preload_mem_0x20) | |
| r2 = vm2.run(bc, step_limit=step_limit, trace=False) | |
| ok = (r1["ledger_head"] == r2["ledger_head"]) and (r1["state_digest"] == r2["state_digest"]) | |
| return { | |
| "replay_ok": ok, | |
| "run1": {"ledger_head": r1["ledger_head"], "state_digest": r1["state_digest"]}, | |
| "run2": {"ledger_head": r2["ledger_head"], "state_digest": r2["state_digest"]}, | |
| "note": "TIME opcode makes replay non-deterministic. Avoid TIME in proofs." | |
| } | |
| with gr.Blocks(title="CodexByte ΩΞ Runtime") as demo: | |
| gr.Markdown( | |
| "# CodexByte ΩΞ Runtime\n" | |
| "**Programs = contracts • Execution = enforcement • Trace = Ω-proof**\n\n" | |
| "This runtime executes CodexByte deterministically and emits an intrinsic proof bundle." | |
| ) | |
| with gr.Row(): | |
| sample = gr.Dropdown(list(SAMPLES.keys()), value="Contract: obligation satisfied (commit+emit)", label="Sample Programs") | |
| load_btn = gr.Button("Load Sample") | |
| program = gr.Textbox(lines=18, label="CodexByte Program") | |
| load_btn.click(load_sample, inputs=sample, outputs=program) | |
| with gr.Row(): | |
| preload = gr.Number(value=1000, label="Preload mem[0x20] value (common contract input)") | |
| step_limit = gr.Slider(100, 50000, value=20000, step=100, label="Step limit (safety)") | |
| trace_on = gr.Checkbox(value=True, label="Enable trace (Ω-step log)") | |
| run_btn = gr.Button("Execute") | |
| with gr.Tabs(): | |
| with gr.Tab("Ω-State Result"): | |
| result_out = gr.JSON() | |
| proof_file = gr.File(label="Download Proof Bundle (ZIP)") | |
| with gr.Tab("Execution Trace"): | |
| trace_out = gr.JSON() | |
| with gr.Tab("Replay Verification"): | |
| verify_btn = gr.Button("Replay verify (run twice)") | |
| verify_out = gr.JSON() | |
| run_btn.click( | |
| fn=run_program, | |
| inputs=[program, step_limit, trace_on, preload], | |
| outputs=[result_out, trace_out, proof_file], | |
| ) | |
| verify_btn.click( | |
| fn=replay_verify, | |
| inputs=[program, step_limit, preload], | |
| outputs=verify_out | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |