| | |
| | """ |
| | OneOCR .onemodel Static Decryptor |
| | ================================= |
| | Cross-platform tool to extract ONNX models and config data from |
| | Windows OneOCR's encrypted .onemodel container files. |
| | |
| | No Windows APIs, DLLs, or runtime hooking required. |
| | Only dependency: pycryptodome (pip install pycryptodome) |
| | |
| | Crypto scheme (fully reverse-engineered): |
| | - Algorithm: AES-256-CFB128 |
| | - Master Key: hardcoded 32-byte ASCII string |
| | - IV: "Copyright @ OneO" (16 bytes, same for all chunks) |
| | - DX index key: SHA256(master_key + file[8:24]) |
| | - Config key: SHA256(DX[48:64] + DX[32:48]) (sizes + checksum) |
| | - Per-chunk key: SHA256(chunk_header[16:32] + chunk_header[0:16]) |
| | - Chunk header in file: checksum(16) + size1(8) + size2(8) = 32 bytes |
| | - On-disk encrypted data follows immediately: size1 + 8 bytes |
| | |
| | File structure: |
| | [0:8] uint64 LE H (header value) |
| | [8:24] 16 bytes file_hash (used in DX key derivation) |
| | [24:H+12] encrypted DX index |
| | [H+12:H+16] 4 zero bytes (gap) |
| | [H+16:] payload chunks (checksum(16) + sizes(16) + encrypted_data) |
| | |
| | Usage: |
| | python onemodel_decrypt.py [onemodel_file] [output_dir] |
| | python onemodel_decrypt.py # uses defaults |
| | """ |
| |
|
| | import struct |
| | import hashlib |
| | import sys |
| | import os |
| | from pathlib import Path |
| |
|
| | try: |
| | from Crypto.Cipher import AES |
| | except ImportError: |
| | print("ERROR: pycryptodome is required. Install with: pip install pycryptodome") |
| | sys.exit(1) |
| |
|
| | |
| | MASTER_KEY = b'kj)TGtrK>f]b[Piow.gU+nC@s""""""4' |
| | IV = b"Copyright @ OneO" |
| | CONTAINER_MAGIC = bytes.fromhex("4a1a082b25000000") |
| |
|
| |
|
| | |
| | def aes_cfb128_decrypt(key: bytes, data: bytes) -> bytes: |
| | """Decrypt data with AES-256-CFB128 using the global IV.""" |
| | cipher = AES.new(key, AES.MODE_CFB, iv=IV, segment_size=128) |
| | return cipher.decrypt(data) |
| |
|
| |
|
| | def derive_key(sha256_input: bytes) -> bytes: |
| | """Derive AES key via SHA256.""" |
| | return hashlib.sha256(sha256_input).digest() |
| |
|
| |
|
| | |
| | def read_varint(data: bytes, pos: int) -> tuple[int, int]: |
| | """Read protobuf varint, return (value, new_pos).""" |
| | val = 0 |
| | shift = 0 |
| | while pos < len(data): |
| | b = data[pos] |
| | pos += 1 |
| | val |= (b & 0x7F) << shift |
| | if not (b & 0x80): |
| | break |
| | shift += 7 |
| | return val, pos |
| |
|
| |
|
| | def measure_protobuf(data: bytes) -> int: |
| | """Walk ONNX ModelProto protobuf fields; return byte length of valid data. |
| | Valid fields for ONNX ModelProto: 1-9, 14, 20.""" |
| | VALID_FIELDS = {1, 2, 3, 4, 5, 6, 7, 8, 9, 14, 20} |
| | pos = 0 |
| | while pos < len(data): |
| | start = pos |
| | tag, pos = read_varint(data, pos) |
| | if pos > len(data): |
| | return start |
| | field_num = tag >> 3 |
| | wire_type = tag & 7 |
| |
|
| | if field_num not in VALID_FIELDS: |
| | return start |
| |
|
| | if wire_type == 0: |
| | _, pos = read_varint(data, pos) |
| | elif wire_type == 1: |
| | pos += 8 |
| | elif wire_type == 2: |
| | length, pos = read_varint(data, pos) |
| | pos += length |
| | elif wire_type == 5: |
| | pos += 4 |
| | else: |
| | return start |
| |
|
| | if pos > len(data): |
| | return start |
| | return pos |
| |
|
| |
|
| | |
| | class OneModelFile: |
| | """Parser for .onemodel encrypted containers.""" |
| |
|
| | def __init__(self, filepath: str): |
| | with open(filepath, "rb") as f: |
| | self.data = f.read() |
| | self.filepath = filepath |
| |
|
| | |
| | self.H = struct.unpack_from("<Q", self.data, 0)[0] |
| | self.file_hash = self.data[8:24] |
| |
|
| | |
| | self.dx_offset = 24 |
| | self.dx_size = self.H - 12 |
| | self.payload_start = self.H + 16 |
| |
|
| | def decrypt_dx(self) -> bytes: |
| | """Decrypt the DX index.""" |
| | key = derive_key(MASTER_KEY + self.file_hash) |
| | dx_enc = self.data[self.dx_offset : self.dx_offset + self.dx_size] |
| | return aes_cfb128_decrypt(key, dx_enc) |
| |
|
| | def decrypt_config(self, dx: bytes) -> bytes: |
| | """Decrypt the config chunk embedded in DX.""" |
| | sha_input = dx[48:64] + dx[32:48] |
| | key = derive_key(sha_input) |
| | config_s1 = struct.unpack_from("<Q", dx, 48)[0] |
| | config_enc = dx[64 : 64 + config_s1 + 8] |
| | return aes_cfb128_decrypt(key, config_enc) |
| |
|
| | def iter_payload_chunks(self): |
| | """Iterate over all payload chunks, yielding (index, metadata, decrypted_payload). |
| | |
| | Each payload chunk in file: |
| | [16 bytes] checksum |
| | [8 bytes] uint64 LE size1 (data size excl. 8-byte container header) |
| | [8 bytes] uint64 LE size2 (always size1 + 24) |
| | [size1+8 bytes] encrypted data |
| | """ |
| | off = self.payload_start |
| | idx = 0 |
| |
|
| | while off + 32 <= len(self.data): |
| | checksum = self.data[off : off + 16] |
| | s1, s2 = struct.unpack_from("<QQ", self.data, off + 16) |
| |
|
| | |
| | if s2 != s1 + 24 or s1 == 0 or s1 > len(self.data): |
| | break |
| |
|
| | enc_size = s1 + 8 |
| | data_off = off + 32 |
| |
|
| | if data_off + enc_size > len(self.data): |
| | break |
| |
|
| | |
| | sha_input = self.data[off + 16 : off + 32] + checksum |
| | key = derive_key(sha_input) |
| |
|
| | |
| | dec = aes_cfb128_decrypt(key, self.data[data_off : data_off + enc_size]) |
| |
|
| | |
| | if dec[:8] != CONTAINER_MAGIC: |
| | print(f" WARNING: chunk#{idx} container magic mismatch!") |
| |
|
| | |
| | payload = dec[8:] |
| |
|
| | meta = { |
| | "index": idx, |
| | "file_offset": off, |
| | "size1": s1, |
| | "size2": s2, |
| | "checksum": checksum.hex(), |
| | } |
| |
|
| | yield idx, meta, payload |
| |
|
| | off = data_off + enc_size |
| | idx += 1 |
| |
|
| |
|
| | |
| | def classify_chunk(payload: bytes) -> str: |
| | """Classify a decrypted chunk payload.""" |
| | if len(payload) > 100 and payload[0] == 0x08 and payload[1] in (0x06, 0x07): |
| | return "onnx" |
| | |
| | |
| | try: |
| | sample = payload[:100].decode("ascii") |
| | if all(c.isprintable() or c in "\n\r\t" for c in sample): |
| | if "<LogPrior>" in sample: |
| | return "rnn_info" |
| | elif sample.startswith("! ") or sample.startswith('" '): |
| | if any(c.isdigit() for c in sample[:20]): |
| | return "char2ind" |
| | else: |
| | return "char2inschar" |
| | elif sample.startswith("0."): |
| | return "score_calibration" |
| | elif "text_script" in sample: |
| | return "ocr_config" |
| | elif "//" in sample[:5]: |
| | return "composite_chars" |
| | return "text_data" |
| | except (UnicodeDecodeError, ValueError): |
| | pass |
| |
|
| | return "binary_data" |
| |
|
| |
|
| | def get_onnx_info(data: bytes) -> dict: |
| | """Get basic ONNX model info from raw protobuf bytes.""" |
| | info = {} |
| | pos = 0 |
| | while pos < min(len(data), 500): |
| | tag, pos = read_varint(data, pos) |
| | field_num = tag >> 3 |
| | wire_type = tag & 7 |
| |
|
| | if wire_type == 0: |
| | val, pos = read_varint(data, pos) |
| | if field_num == 1: |
| | info["ir_version"] = val |
| | elif wire_type == 2: |
| | length, pos = read_varint(data, pos) |
| | payload_bytes = data[pos : pos + length] |
| | if field_num == 3: |
| | try: |
| | info["producer"] = payload_bytes.decode("utf-8") |
| | except: |
| | pass |
| | elif field_num == 4: |
| | try: |
| | info["producer_version"] = payload_bytes.decode("utf-8") |
| | except: |
| | pass |
| | pos += length |
| | elif wire_type == 5: |
| | pos += 4 |
| | elif wire_type == 1: |
| | pos += 8 |
| | else: |
| | break |
| |
|
| | if "producer" in info and "ir_version" in info: |
| | break |
| |
|
| | return info |
| |
|
| |
|
| | def extract_all(input_file: str, output_dir: str, verify: bool = True): |
| | """Extract all content from a .onemodel file.""" |
| | model_file = OneModelFile(input_file) |
| |
|
| | print(f"File: {input_file}") |
| | print(f"Size: {len(model_file.data):,} bytes") |
| | print(f"Header value: {model_file.H}") |
| | print(f"DX size: {model_file.dx_size:,} bytes") |
| |
|
| | |
| | dx = model_file.decrypt_dx() |
| | valid_size = struct.unpack_from("<Q", dx, 8)[0] |
| | print(f"DX valid size: {valid_size:,}") |
| | assert dx[:2] == b"DX", "DX magic mismatch!" |
| |
|
| | |
| | config_dec = model_file.decrypt_config(dx) |
| | assert config_dec[:8] == CONTAINER_MAGIC, "Config magic mismatch!" |
| | config_payload = config_dec[8:] |
| |
|
| | |
| | out = Path(output_dir) |
| | onnx_dir = out / "onnx_models" |
| | config_dir = out / "config_data" |
| | onnx_dir.mkdir(parents=True, exist_ok=True) |
| | config_dir.mkdir(parents=True, exist_ok=True) |
| |
|
| | |
| | config_path = config_dir / "manifest.bin" |
| | config_path.write_bytes(config_payload) |
| | print(f"\nConfig manifest saved: {config_path} ({len(config_payload):,} bytes)") |
| |
|
| | |
| | onnx_models = [] |
| | config_files = [] |
| |
|
| | print(f"\n{'='*70}") |
| | print(f"{'#':>4} {'Type':<18} {'Size':>12} {'Filename':<40}") |
| | print(f"{'='*70}") |
| |
|
| | for idx, meta, payload in model_file.iter_payload_chunks(): |
| | chunk_type = classify_chunk(payload) |
| |
|
| | if chunk_type == "onnx": |
| | |
| | exact_size = measure_protobuf(payload) |
| | onnx_data = payload[:exact_size] |
| |
|
| | info = get_onnx_info(onnx_data) |
| | ir = info.get("ir_version", "?") |
| | producer = info.get("producer", "unknown") |
| | size_kb = len(onnx_data) // 1024 |
| |
|
| | |
| | if "quantize" in producer.lower() or "onnx" in producer.lower(): |
| | prod_tag = "onnx_quantize" |
| | elif "pytorch" in producer.lower() or "torch" in producer.lower(): |
| | if size_kb < 50: |
| | prod_tag = "pytorch_small" |
| | else: |
| | prod_tag = "pytorch" |
| | else: |
| | prod_tag = producer.replace(" ", "_") |
| |
|
| | onnx_idx = len(onnx_models) |
| | fname = f"model_{onnx_idx:02d}_ir{ir}_{prod_tag}_{size_kb}KB.onnx" |
| | fpath = onnx_dir / fname |
| |
|
| | fpath.write_bytes(onnx_data) |
| | onnx_models.append(fpath) |
| | print(f"{idx:4d} {'ONNX':18s} {len(onnx_data):12,} {fname}") |
| |
|
| | else: |
| | |
| | ext_map = { |
| | "rnn_info": ".rnn_info", |
| | "char2ind": ".char2ind.txt", |
| | "char2inschar": ".char2inschar.txt", |
| | "score_calibration": ".calibration.txt", |
| | "ocr_config": ".config.txt", |
| | "composite_chars": ".composite.txt", |
| | "text_data": ".txt", |
| | "binary_data": ".bin", |
| | } |
| | ext = ext_map.get(chunk_type, ".bin") |
| | fname = f"chunk_{idx:02d}_{chunk_type}{ext}" |
| | fpath = config_dir / fname |
| |
|
| | fpath.write_bytes(payload) |
| | config_files.append(fpath) |
| | print(f"{idx:4d} {chunk_type:18s} {len(payload):12,} {fname}") |
| |
|
| | print(f"\n{'='*70}") |
| | print(f"ONNX models extracted: {len(onnx_models)}") |
| | print(f"Config files extracted: {len(config_files)}") |
| |
|
| | |
| | if verify: |
| | print(f"\n{'='*70}") |
| | print("ONNX Verification") |
| | print(f"{'='*70}") |
| |
|
| | try: |
| | import onnx |
| | onnx_ok = 0 |
| | onnx_fail = 0 |
| | for fpath in onnx_models: |
| | try: |
| | model = onnx.load(str(fpath)) |
| | onnx.checker.check_model(model) |
| | onnx_ok += 1 |
| | print(f" OK {fpath.name}") |
| | except Exception as e: |
| | try: |
| | |
| | model = onnx.load(str(fpath)) |
| | onnx_ok += 1 |
| | print(f" OK* {fpath.name} (loads but checker warning: {str(e)[:50]})") |
| | except Exception as e2: |
| | onnx_fail += 1 |
| | print(f" FAIL {fpath.name}: {e2}") |
| | print(f"\nVerification: {onnx_ok}/{len(onnx_models)} models load successfully") |
| | except ImportError: |
| | print(" (onnx package not installed, skipping verification)") |
| |
|
| | try: |
| | import onnxruntime as ort |
| | rt_ok = 0 |
| | rt_custom_ops = 0 |
| | for fpath in onnx_models: |
| | try: |
| | sess = ort.InferenceSession(str(fpath)) |
| | rt_ok += 1 |
| | except Exception as e: |
| | if "custom ops" in str(e).lower() or "oneocr" in str(e).lower(): |
| | rt_custom_ops += 1 |
| | else: |
| | pass |
| | print(f" onnxruntime: {rt_ok} standard, {rt_custom_ops} need custom ops") |
| | except ImportError: |
| | pass |
| |
|
| | print(f"\nDone! All files saved to: {out.resolve()}") |
| |
|
| |
|
| | |
| | if __name__ == "__main__": |
| | default_input = "ocr_data/oneocr.onemodel" |
| | default_output = "oneocr_extracted" |
| |
|
| | input_file = sys.argv[1] if len(sys.argv) > 1 else default_input |
| | output_dir = sys.argv[2] if len(sys.argv) > 2 else default_output |
| |
|
| | if not os.path.exists(input_file): |
| | print(f"ERROR: Input file not found: {input_file}") |
| | sys.exit(1) |
| |
|
| | extract_all(input_file, output_dir) |
| |
|