| |
| """ |
| OneOCR Extraction Pipeline — Complete end-to-end extraction tool. |
| ================================================================= |
| |
| Single script that performs the entire extraction process: |
| 1. Decrypt .onemodel container (AES-256-CFB128) |
| 2. Extract 34 ONNX models + config data |
| 3. Unlock models 11-33 (replace OneOCRFeatureExtract custom op) |
| 4. Verify all models load in onnxruntime |
| |
| Usage: |
| python tools/extract_pipeline.py # defaults |
| python tools/extract_pipeline.py path/to/oneocr.onemodel # custom input |
| python tools/extract_pipeline.py --verify-only # just verify |
| |
| Requirements: |
| pip install pycryptodome onnx onnxruntime numpy |
| |
| Output structure: |
| oneocr_extracted/ |
| ├── onnx_models/ # 34 raw ONNX models (11-33 have custom ops) |
| ├── onnx_models_unlocked/ # 23 unlocked models (11-33, standard ops) |
| └── config_data/ # char maps, rnn_info, manifest, configs |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import copy |
| import hashlib |
| import struct |
| import sys |
| import time |
| from pathlib import Path |
|
|
| import numpy as np |
|
|
| try: |
| from Crypto.Cipher import AES |
| except ImportError: |
| print("ERROR: pycryptodome is required.") |
| print(" pip install pycryptodome") |
| sys.exit(1) |
|
|
| try: |
| import onnx |
| from onnx import helper, numpy_helper |
| except ImportError: |
| print("ERROR: onnx is required.") |
| print(" pip install onnx") |
| sys.exit(1) |
|
|
| try: |
| import onnxruntime as ort |
| except ImportError: |
| ort = None |
| print("WARNING: onnxruntime not installed — will skip runtime verification.") |
|
|
|
|
| |
| |
| |
|
|
| MASTER_KEY = b'kj)TGtrK>f]b[Piow.gU+nC@s""""""4' |
| IV = b"Copyright @ OneO" |
| CONTAINER_MAGIC = bytes.fromhex("4a1a082b25000000") |
|
|
|
|
| |
| |
| |
|
|
| def aes_cfb128_decrypt(key: bytes, data: bytes) -> bytes: |
| """Decrypt with AES-256-CFB128.""" |
| return AES.new(key, AES.MODE_CFB, iv=IV, segment_size=128).decrypt(data) |
|
|
|
|
| def derive_key(sha_input: bytes) -> bytes: |
| """SHA256 key derivation.""" |
| return hashlib.sha256(sha_input).digest() |
|
|
|
|
| def read_varint(data: bytes, pos: int) -> tuple[int, int]: |
| """Read protobuf varint.""" |
| val = shift = 0 |
| while pos < len(data): |
| b = data[pos]; pos += 1 |
| val |= (b & 0x7F) << shift |
| if not (b & 0x80): break |
| shift += 7 |
| return val, pos |
|
|
|
|
| def measure_protobuf(data: bytes) -> int: |
| """Measure valid ONNX ModelProto protobuf length.""" |
| VALID = {1, 2, 3, 4, 5, 6, 7, 8, 9, 14, 20} |
| pos = 0 |
| while pos < len(data): |
| start = pos |
| tag, pos = read_varint(data, pos) |
| if pos > len(data): return start |
| field, wire = tag >> 3, tag & 7 |
| if field not in VALID: return start |
| if wire == 0: _, pos = read_varint(data, pos) |
| elif wire == 1: pos += 8 |
| elif wire == 2: l, pos = read_varint(data, pos); pos += l |
| elif wire == 5: pos += 4 |
| else: return start |
| if pos > len(data): return start |
| return pos |
|
|
|
|
| class OneModelFile: |
| """Parser for .onemodel encrypted containers.""" |
|
|
| def __init__(self, filepath: str | Path): |
| self.filepath = Path(filepath) |
| self.data = self.filepath.read_bytes() |
| self.H = struct.unpack_from("<Q", self.data, 0)[0] |
| self.file_hash = self.data[8:24] |
| self.dx_offset = 24 |
| self.dx_size = self.H - 12 |
| self.payload_start = self.H + 16 |
|
|
| def decrypt_dx(self) -> bytes: |
| key = derive_key(MASTER_KEY + self.file_hash) |
| return aes_cfb128_decrypt(key, self.data[self.dx_offset:self.dx_offset + self.dx_size]) |
|
|
| def decrypt_config(self, dx: bytes) -> bytes: |
| key = derive_key(dx[48:64] + dx[32:48]) |
| s1 = struct.unpack_from("<Q", dx, 48)[0] |
| return aes_cfb128_decrypt(key, dx[64:64 + s1 + 8]) |
|
|
| def iter_chunks(self): |
| """Yield (index, decrypted_payload) for each payload chunk.""" |
| off = self.payload_start |
| idx = 0 |
| while off + 32 <= len(self.data): |
| checksum = self.data[off:off + 16] |
| s1, s2 = struct.unpack_from("<QQ", self.data, off + 16) |
| if s2 != s1 + 24 or s1 == 0 or s1 > len(self.data): break |
| enc_size = s1 + 8 |
| data_off = off + 32 |
| if data_off + enc_size > len(self.data): break |
| key = derive_key(self.data[off + 16:off + 32] + checksum) |
| dec = aes_cfb128_decrypt(key, self.data[data_off:data_off + enc_size]) |
| if dec[:8] == CONTAINER_MAGIC: |
| yield idx, dec[8:] |
| else: |
| print(f" WARNING: chunk#{idx} magic mismatch — skipping") |
| off = data_off + enc_size |
| idx += 1 |
|
|
|
|
| def classify_chunk(payload: bytes) -> str: |
| """Classify decrypted chunk type.""" |
| if len(payload) > 100 and payload[0] == 0x08 and payload[1] in (0x06, 0x07): |
| return "onnx" |
| try: |
| sample = payload[:100].decode("ascii") |
| if all(c.isprintable() or c in "\n\r\t" for c in sample): |
| if "<LogPrior>" in sample: return "rnn_info" |
| if sample.startswith("! ") or sample.startswith('" '): |
| return "char2ind" if any(c.isdigit() for c in sample[:20]) else "char2inschar" |
| if sample.startswith("0."): return "score_calibration" |
| if "text_script" in sample: return "ocr_config" |
| if "//" in sample[:5]: return "composite_chars" |
| return "text_data" |
| except (UnicodeDecodeError, ValueError): |
| pass |
| return "binary_data" |
|
|
|
|
| def decrypt_and_extract(input_file: Path, output_dir: Path) -> dict: |
| """Step 1: Decrypt .onemodel and extract all chunks. |
| |
| Returns dict with 'onnx_models' and 'config_files' lists. |
| """ |
| print("=" * 70) |
| print(" STEP 1: DECRYPT & EXTRACT") |
| print("=" * 70) |
|
|
| model_file = OneModelFile(input_file) |
| print(f" Input: {input_file} ({len(model_file.data):,} bytes)") |
| print(f" Output: {output_dir}") |
|
|
| |
| dx = model_file.decrypt_dx() |
| assert dx[:2] == b"DX", "DX magic mismatch!" |
| print(f" DX index decrypted ({len(dx):,} bytes)") |
|
|
| |
| config_dec = model_file.decrypt_config(dx) |
| assert config_dec[:8] == CONTAINER_MAGIC |
| config_payload = config_dec[8:] |
|
|
| |
| onnx_dir = output_dir / "onnx_models" |
| config_dir = output_dir / "config_data" |
| onnx_dir.mkdir(parents=True, exist_ok=True) |
| config_dir.mkdir(parents=True, exist_ok=True) |
|
|
| |
| manifest_path = config_dir / "manifest.bin" |
| manifest_path.write_bytes(config_payload) |
| print(f" Manifest: {len(config_payload):,} bytes") |
|
|
| |
| onnx_models = [] |
| config_files = [manifest_path] |
|
|
| EXT_MAP = { |
| "rnn_info": ".rnn_info", "char2ind": ".char2ind.txt", |
| "char2inschar": ".char2inschar.txt", "score_calibration": ".calibration.txt", |
| "ocr_config": ".config.txt", "composite_chars": ".composite.txt", |
| "text_data": ".txt", "binary_data": ".bin", |
| } |
|
|
| print(f"\n {'#':>4} {'Type':18s} {'Size':>12} {'Filename'}") |
| print(f" {'-'*66}") |
|
|
| for idx, payload in model_file.iter_chunks(): |
| chunk_type = classify_chunk(payload) |
|
|
| if chunk_type == "onnx": |
| exact_size = measure_protobuf(payload) |
| onnx_data = payload[:exact_size] |
| info = _get_onnx_info(onnx_data) |
| ir = info.get("ir_version", "?") |
| prod = info.get("producer_version", "unknown") |
| size_kb = len(onnx_data) // 1024 |
| onnx_idx = len(onnx_models) |
| fname = f"model_{onnx_idx:02d}_ir{ir}_{prod}_{size_kb}KB.onnx" |
| (onnx_dir / fname).write_bytes(onnx_data) |
| onnx_models.append(onnx_dir / fname) |
| print(f" {idx:4d} {'ONNX':18s} {len(onnx_data):12,} {fname}") |
| else: |
| ext = EXT_MAP.get(chunk_type, ".bin") |
| fname = f"chunk_{idx:02d}_{chunk_type}{ext}" |
| (config_dir / fname).write_bytes(payload) |
| config_files.append(config_dir / fname) |
| print(f" {idx:4d} {chunk_type:18s} {len(payload):12,} {fname}") |
|
|
| print(f"\n Extracted: {len(onnx_models)} ONNX models, {len(config_files)} config files") |
| return {"onnx_models": onnx_models, "config_files": config_files} |
|
|
|
|
| def _get_onnx_info(data: bytes) -> dict: |
| """Extract basic ONNX info from protobuf header.""" |
| info = {}; pos = 0 |
| while pos < min(len(data), 500): |
| tag, pos = read_varint(data, pos) |
| field, wire = tag >> 3, tag & 7 |
| if wire == 0: |
| val, pos = read_varint(data, pos) |
| if field == 1: info["ir_version"] = val |
| elif wire == 2: |
| l, pos = read_varint(data, pos) |
| raw = data[pos:pos + l]; pos += l |
| try: |
| if field == 4: info["producer_version"] = raw.decode() |
| except: pass |
| elif wire == 5: pos += 4 |
| elif wire == 1: pos += 8 |
| else: break |
| if "ir_version" in info and "producer_version" in info: break |
| return info |
|
|
|
|
| |
| |
| |
|
|
| def _extract_fe_weights(model) -> tuple[np.ndarray, np.ndarray, int, int]: |
| """Extract W, b from OneOCRFeatureExtract config blob.""" |
| config_blob = None |
| for init in model.graph.initializer: |
| if init.name == "feature/config": |
| config_blob = bytes(init.string_data[0] if init.string_data else init.raw_data) |
| break |
| if config_blob is None: |
| raise ValueError("No feature/config initializer") |
|
|
| be_arr = np.frombuffer(config_blob, dtype='>f4').copy() |
|
|
| |
| fe_node = next((n for n in model.graph.node if n.op_type == "OneOCRFeatureExtract"), None) |
| if fe_node is None: |
| raise ValueError("No OneOCRFeatureExtract node") |
|
|
| in_dim = out_dim = None |
| for i in range(len(be_arr) - 10, len(be_arr)): |
| val = be_arr[i] |
| if val == 21.0 and i + 1 < len(be_arr) and be_arr[i + 1] in [50.0, 51.0]: |
| in_dim, out_dim = 21, int(be_arr[i + 1]) |
| break |
|
|
| if in_dim is None: |
| for gi in model.graph.input: |
| if gi.name == "data": |
| shape = [d.dim_value for d in gi.type.tensor_type.shape.dim] |
| if len(shape) >= 2 and shape[1] > 0: |
| in_dim = shape[1] |
| break |
|
|
| if out_dim is None: |
| fe_out = fe_node.output[0] |
| for node in model.graph.node: |
| if node.op_type == "Gemm" and fe_out in node.input: |
| wn = node.input[1] |
| for init in model.graph.initializer: |
| if init.name == wn: |
| W = numpy_helper.to_array(init) |
| out_dim = W.shape[0] if len(W.shape) == 2 else W.shape[1] |
| break |
|
|
| if in_dim is None or out_dim is None: |
| raise ValueError(f"Cannot determine dims: in={in_dim}, out={out_dim}") |
|
|
| W = be_arr[:in_dim * out_dim].reshape(in_dim, out_dim).astype(np.float32) |
| b = be_arr[in_dim * out_dim:in_dim * out_dim + out_dim].astype(np.float32) |
| return W, b, in_dim, out_dim |
|
|
|
|
| def unlock_gemm_model(model_path: Path, output_dir: Path) -> Path | None: |
| """Unlock models 11-32: OneOCRFeatureExtract → Gemm.""" |
| model = onnx.load(str(model_path)) |
| if not any(n.op_type == "OneOCRFeatureExtract" for n in model.graph.node): |
| return None |
|
|
| W, b, in_dim, out_dim = _extract_fe_weights(model) |
| new_model = copy.deepcopy(model) |
|
|
| |
| new_inits = [i for i in new_model.graph.initializer if i.name != "feature/config"] |
| new_inits.append(numpy_helper.from_array(W.T, name="fe_weight")) |
| new_inits.append(numpy_helper.from_array(b, name="fe_bias")) |
| del new_model.graph.initializer[:] |
| new_model.graph.initializer.extend(new_inits) |
|
|
| |
| fe_node = next(n for n in new_model.graph.node if n.op_type == "OneOCRFeatureExtract") |
| fe_in, fe_out = fe_node.input[0], fe_node.output[0] |
| new_nodes = [] |
| for node in new_model.graph.node: |
| if node.op_type == "OneOCRFeatureExtract": |
| new_nodes.append(helper.make_node("Gemm", [fe_in, "fe_weight", "fe_bias"], |
| [fe_out], alpha=1.0, beta=1.0, transB=1)) |
| else: |
| new_nodes.append(node) |
| del new_model.graph.node[:] |
| new_model.graph.node.extend(new_nodes) |
|
|
| |
| del new_model.graph.input[:] |
| new_model.graph.input.extend([i for i in model.graph.input if i.name != "feature/config"]) |
| new_opsets = [op for op in new_model.opset_import if op.domain != "com.microsoft.oneocr"] |
| del new_model.opset_import[:] |
| new_model.opset_import.extend(new_opsets) |
|
|
| out_path = output_dir / (model_path.stem + "_unlocked.onnx") |
| onnx.save(new_model, str(out_path)) |
| return out_path |
|
|
|
|
| def unlock_conv_model(model_path: Path, output_dir: Path) -> Path | None: |
| """Unlock model 33 (LineLayout): OneOCRFeatureExtract → Conv1x1.""" |
| model = onnx.load(str(model_path)) |
| if not any(n.op_type == "OneOCRFeatureExtract" for n in model.graph.node): |
| return None |
|
|
| |
| config_blob = None |
| for init in model.graph.initializer: |
| if init.name == "feature/config": |
| config_blob = bytes(init.string_data[0] if init.string_data else init.raw_data) |
| break |
| if config_blob is None: |
| return None |
|
|
| be_arr = np.frombuffer(config_blob, dtype='>f4').copy() |
| in_ch, out_ch = 256, 16 |
| W = be_arr[:in_ch * out_ch].reshape(in_ch, out_ch).T.reshape(out_ch, in_ch, 1, 1).astype(np.float32) |
| b = be_arr[in_ch * out_ch:in_ch * out_ch + out_ch].astype(np.float32) |
|
|
| new_model = copy.deepcopy(model) |
| new_inits = [i for i in new_model.graph.initializer if i.name != "feature/config"] |
| new_inits.append(numpy_helper.from_array(W, name="fe_conv_weight")) |
| new_inits.append(numpy_helper.from_array(b, name="fe_conv_bias")) |
| del new_model.graph.initializer[:] |
| new_model.graph.initializer.extend(new_inits) |
|
|
| fe_node = next(n for n in new_model.graph.node if n.op_type == "OneOCRFeatureExtract") |
| fe_in, fe_out = fe_node.input[0], fe_node.output[0] |
| new_nodes = [] |
| for node in new_model.graph.node: |
| if node.op_type == "OneOCRFeatureExtract": |
| new_nodes.append(helper.make_node("Conv", [fe_in, "fe_conv_weight", "fe_conv_bias"], |
| [fe_out], kernel_shape=[1, 1], strides=[1, 1], |
| pads=[0, 0, 0, 0])) |
| else: |
| new_nodes.append(node) |
| del new_model.graph.node[:] |
| new_model.graph.node.extend(new_nodes) |
|
|
| del new_model.graph.input[:] |
| new_model.graph.input.extend([i for i in model.graph.input if i.name != "feature/config"]) |
| new_opsets = [op for op in new_model.opset_import if op.domain != "com.microsoft.oneocr"] |
| del new_model.opset_import[:] |
| new_model.opset_import.extend(new_opsets) |
|
|
| out_path = output_dir / (model_path.stem + "_unlocked.onnx") |
| onnx.save(new_model, str(out_path)) |
| return out_path |
|
|
|
|
| def unlock_all_models(onnx_dir: Path, output_dir: Path) -> dict: |
| """Step 2: Unlock models 11-33 (replace custom ops). |
| |
| Returns dict with 'unlocked', 'skipped', 'failed' lists. |
| """ |
| print("\n" + "=" * 70) |
| print(" STEP 2: UNLOCK MODELS (replace OneOCRFeatureExtract)") |
| print("=" * 70) |
|
|
| output_dir.mkdir(parents=True, exist_ok=True) |
| results = {"unlocked": [], "skipped": [], "failed": []} |
|
|
| for idx in range(11, 34): |
| matches = list(onnx_dir.glob(f"model_{idx:02d}_*")) |
| if not matches: |
| print(f" model_{idx:02d}: NOT FOUND") |
| results["failed"].append(idx) |
| continue |
|
|
| model_path = matches[0] |
| try: |
| if idx == 33: |
| out = unlock_conv_model(model_path, output_dir) |
| else: |
| out = unlock_gemm_model(model_path, output_dir) |
|
|
| if out is None: |
| results["skipped"].append(idx) |
| print(f" model_{idx:02d}: skipped (no custom op)") |
| else: |
| results["unlocked"].append(idx) |
| print(f" model_{idx:02d}: ✓ unlocked → {out.name}") |
| except Exception as e: |
| results["failed"].append(idx) |
| print(f" model_{idx:02d}: ✗ FAILED — {e}") |
|
|
| n = len(results["unlocked"]) |
| print(f"\n Unlocked: {n}/23 models") |
| return results |
|
|
|
|
| |
| |
| |
|
|
| def verify_models(onnx_dir: Path, unlocked_dir: Path) -> dict: |
| """Step 3: Verify all models load in onnxruntime. |
| |
| Returns dict with verification results. |
| """ |
| print("\n" + "=" * 70) |
| print(" STEP 3: VERIFY (onnxruntime inference test)") |
| print("=" * 70) |
|
|
| if ort is None: |
| print(" ⚠ onnxruntime not installed — skipping verification") |
| return {"status": "skipped"} |
|
|
| results = {"ok": [], "custom_op": [], "failed": []} |
|
|
| |
| print("\n Core models (0-10):") |
| for idx in range(11): |
| matches = list(onnx_dir.glob(f"model_{idx:02d}_*")) |
| if not matches: continue |
| try: |
| sess = ort.InferenceSession(str(matches[0]), |
| providers=["CPUExecutionProvider"]) |
| inputs = sess.get_inputs() |
| shapes = {i.name: i.shape for i in inputs} |
| results["ok"].append(idx) |
| print(f" model_{idx:02d}: ✓ inputs={shapes}") |
| except Exception as e: |
| err = str(e)[:60] |
| if "custom ops" in err.lower() or "oneocr" in err.lower(): |
| results["custom_op"].append(idx) |
| print(f" model_{idx:02d}: ⚠ custom_op ({err})") |
| else: |
| results["failed"].append(idx) |
| print(f" model_{idx:02d}: ✗ {err}") |
|
|
| |
| print("\n Unlocked models (11-33):") |
| for idx in range(11, 34): |
| matches = list(unlocked_dir.glob(f"model_{idx:02d}_*")) |
| if not matches: continue |
| try: |
| sess = ort.InferenceSession(str(matches[0]), |
| providers=["CPUExecutionProvider"]) |
| |
| feeds = {} |
| for inp in sess.get_inputs(): |
| shape = [d if isinstance(d, int) and d > 0 else 1 for d in inp.shape] |
| feeds[inp.name] = np.zeros(shape, dtype=np.float32) |
| out = sess.run(None, feeds) |
| results["ok"].append(idx) |
| print(f" model_{idx:02d}: ✓ output_shapes={[o.shape for o in out]}") |
| except Exception as e: |
| results["failed"].append(idx) |
| print(f" model_{idx:02d}: ✗ {str(e)[:60]}") |
|
|
| ok = len(results["ok"]) |
| total = ok + len(results["custom_op"]) + len(results["failed"]) |
| print(f"\n Verification: {ok}/{total} models OK") |
| return results |
|
|
|
|
| |
| |
| |
|
|
| def main(): |
| parser = argparse.ArgumentParser( |
| description="OneOCR extraction pipeline: decrypt → extract → unlock → verify") |
| parser.add_argument("input", nargs="?", default="ocr_data/oneocr.onemodel", |
| help="Path to .onemodel file (default: ocr_data/oneocr.onemodel)") |
| parser.add_argument("--output", "-o", default="oneocr_extracted", |
| help="Output directory (default: oneocr_extracted)") |
| parser.add_argument("--verify-only", action="store_true", |
| help="Only verify existing extracted models") |
| parser.add_argument("--skip-unlock", action="store_true", |
| help="Skip model unlocking step") |
| parser.add_argument("--skip-verify", action="store_true", |
| help="Skip verification step") |
| args = parser.parse_args() |
|
|
| input_file = Path(args.input) |
| output_dir = Path(args.output) |
| onnx_dir = output_dir / "onnx_models" |
| unlocked_dir = output_dir / "onnx_models_unlocked" |
|
|
| print() |
| print("╔══════════════════════════════════════════════════════════════════════╗") |
| print("║ OneOCR Extraction Pipeline ║") |
| print("║ Decrypt → Extract → Unlock → Verify ║") |
| print("╚══════════════════════════════════════════════════════════════════════╝") |
|
|
| t_start = time.perf_counter() |
|
|
| if args.verify_only: |
| verify_models(onnx_dir, unlocked_dir) |
| else: |
| |
| if not input_file.exists(): |
| print(f"\n ERROR: Input file not found: {input_file}") |
| print(f" Place oneocr.onemodel in ocr_data/ directory") |
| sys.exit(1) |
|
|
| extract_result = decrypt_and_extract(input_file, output_dir) |
|
|
| |
| if not args.skip_unlock: |
| unlock_result = unlock_all_models(onnx_dir, unlocked_dir) |
| else: |
| print("\n (Skipping unlock step)") |
|
|
| |
| if not args.skip_verify: |
| verify_result = verify_models(onnx_dir, unlocked_dir) |
| else: |
| print("\n (Skipping verification)") |
|
|
| elapsed = time.perf_counter() - t_start |
| print(f"\n{'=' * 70}") |
| print(f" DONE in {elapsed:.1f}s") |
| print(f" Models: {onnx_dir}") |
| print(f" Unlocked: {unlocked_dir}") |
| print(f" Config: {output_dir / 'config_data'}") |
| print(f"{'=' * 70}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|