| | """Manually parse protobuf structure of extracted files.""" |
| | from pathlib import Path |
| |
|
| | EXTRACT_DIR = Path(r"c:\Users\MattyMroz\Desktop\PROJECTS\ONEOCR\extracted_models") |
| |
|
| | def read_varint(data, pos): |
| | val = 0 |
| | shift = 0 |
| | while pos < len(data): |
| | b = data[pos] |
| | pos += 1 |
| | val |= (b & 0x7f) << shift |
| | if not (b & 0x80): |
| | break |
| | shift += 7 |
| | return val, pos |
| |
|
| | def parse_protobuf_fields(data, max_fields=10): |
| | """Parse protobuf wire format and return field info.""" |
| | pos = 0 |
| | fields = [] |
| | for _ in range(max_fields): |
| | if pos >= len(data): |
| | break |
| | tag_byte = data[pos] |
| | field_num = tag_byte >> 3 |
| | wire_type = tag_byte & 0x07 |
| | pos += 1 |
| | |
| | if wire_type == 0: |
| | val, pos = read_varint(data, pos) |
| | fields.append((field_num, 'varint', val, None)) |
| | elif wire_type == 2: |
| | length, pos = read_varint(data, pos) |
| | if length > len(data) - pos or length < 0: |
| | fields.append((field_num, 'len-delim', length, 'OVERFLOW')) |
| | break |
| | preview = data[pos:pos+min(length, 100)] |
| | pos += length |
| | fields.append((field_num, 'len-delim', length, preview)) |
| | elif wire_type == 1: |
| | val = data[pos:pos+8] |
| | pos += 8 |
| | fields.append((field_num, '64bit', int.from_bytes(val, 'little'), None)) |
| | elif wire_type == 5: |
| | val = data[pos:pos+4] |
| | pos += 4 |
| | fields.append((field_num, '32bit', int.from_bytes(val, 'little'), None)) |
| | else: |
| | fields.append((field_num, f'wire{wire_type}', 0, 'UNKNOWN')) |
| | break |
| | return fields |
| |
|
| | |
| | files = sorted( |
| | [f for f in EXTRACT_DIR.glob("*.bin") if "0x271a" in f.name], |
| | key=lambda f: f.stat().st_size, |
| | reverse=True |
| | ) |
| |
|
| | print("=" * 70) |
| | print("PROTOBUF STRUCTURE ANALYSIS of largest heap files") |
| | print("=" * 70) |
| |
|
| | for f in files[:10]: |
| | data = open(f, 'rb').read(2048) |
| | size = f.stat().st_size |
| | print(f"\n{f.name} ({size//1024}KB):") |
| | print(f" First 32 bytes: {data[:32].hex()}") |
| | |
| | fields = parse_protobuf_fields(data) |
| | for fn, wt, val, preview in fields: |
| | if wt == 'varint': |
| | print(f" field={fn} {wt} value={val}") |
| | elif wt == 'len-delim': |
| | if preview == 'OVERFLOW': |
| | print(f" field={fn} {wt} length={val} OVERFLOW!") |
| | elif val < 200 and preview: |
| | try: |
| | txt = preview.decode('utf-8', errors='replace') |
| | printable = all(c.isprintable() or c in '\n\r\t' for c in txt[:50]) |
| | if printable and len(txt) > 0: |
| | print(f" field={fn} {wt} length={val} text='{txt[:80]}'") |
| | else: |
| | print(f" field={fn} {wt} length={val} hex={preview[:40].hex()}") |
| | except: |
| | print(f" field={fn} {wt} length={val} hex={preview[:40].hex()}") |
| | else: |
| | if preview: |
| | print(f" field={fn} {wt} length={val} first_bytes={preview[:20].hex()}") |
| | else: |
| | print(f" field={fn} {wt} length={val}") |
| | else: |
| | print(f" field={fn} {wt} value={val}") |
| |
|
| | |
| | print("\n" + "=" * 70) |
| | print("CHECKING MID-SIZED FILES (100KB - 2MB range)") |
| | print("=" * 70) |
| |
|
| | mid_files = sorted( |
| | [f for f in EXTRACT_DIR.glob("*.bin") |
| | if "0x271a" in f.name and 100*1024 < f.stat().st_size < 2*1024*1024], |
| | key=lambda f: f.stat().st_size, |
| | reverse=True |
| | ) |
| |
|
| | import onnx |
| | valid_count = 0 |
| | for f in mid_files[:100]: |
| | try: |
| | m = onnx.load(str(f)) |
| | valid_count += 1 |
| | print(f" VALID: {f.name} ({f.stat().st_size//1024}KB)") |
| | print(f" ir={m.ir_version} producer='{m.producer_name}' " |
| | f"graph='{m.graph.name}' nodes={len(m.graph.node)}") |
| | except: |
| | pass |
| |
|
| | if valid_count == 0: |
| | print(" No valid ONNX models in mid-range files either.") |
| |
|
| | |
| | print("\n" + "=" * 70) |
| | print("CHECKING FOR INTERNAL ONNX BOUNDARIES IN LARGEST FILE") |
| | print("=" * 70) |
| |
|
| | biggest = files[0] |
| | data = open(biggest, 'rb').read() |
| | print(f"File: {biggest.name}, total size: {len(data)} bytes") |
| |
|
| | |
| | import re |
| | |
| | pattern = re.compile(b'\\x08[\\x03-\\x09]\\x12') |
| | matches = [(m.start(), data[m.start()+1]) for m in pattern.finditer(data[:1000])] |
| | print(f"ONNX-like headers in first 1000 bytes: {len(matches)}") |
| | for offset, ir in matches[:10]: |
| | print(f" offset={offset}: ir_version={ir}") |
| |
|
| | |
| | for needle in [b'ONNX', b'onnx', b'graph', b'Conv', b'Relu', b'BatchNorm', b'MatMul']: |
| | positions = [m.start() for m in re.finditer(re.escape(needle), data[:50000])] |
| | if positions: |
| | print(f" Found '{needle.decode()}' at offsets: {positions[:5]}") |
| |
|