| |
| """ |
| Implementation of enumerative entropy coding as described in Han et al. (2008). |
| This is the actual "equiprobable partitioning" algorithm from the paper. |
| """ |
|
|
| |
| from enumerative_coding import EnumerativeEncoder, ExpGolombCoder |
|
|
| import numpy as np |
| from typing import Tuple, List, Dict |
| from collections import Counter |
| import heapq |
| from scipy.stats import entropy as scipy_entropy |
|
|
|
|
| |
| EquiprobablePartitioningEncoder = EnumerativeEncoder |
|
|
|
|
| class HuffmanEncoder: |
| """Standard Huffman coding for comparison.""" |
| |
| class Node: |
| def __init__(self, symbol=None, freq=0, left=None, right=None): |
| self.symbol = symbol |
| self.freq = freq |
| self.left = left |
| self.right = right |
| |
| def __lt__(self, other): |
| return self.freq < other.freq |
| |
| def __init__(self): |
| self.codes = {} |
| self.root = None |
| |
| def _build_tree(self, frequencies: Dict[int, int]): |
| """Build Huffman tree from symbol frequencies.""" |
| heap = [] |
| |
| for symbol, freq in frequencies.items(): |
| heapq.heappush(heap, self.Node(symbol=symbol, freq=freq)) |
| |
| while len(heap) > 1: |
| left = heapq.heappop(heap) |
| right = heapq.heappop(heap) |
| parent = self.Node(freq=left.freq + right.freq, left=left, right=right) |
| heapq.heappush(heap, parent) |
| |
| self.root = heap[0] |
| |
| def _generate_codes(self, node, code=''): |
| """Generate Huffman codes by traversing the tree.""" |
| if node.symbol is not None: |
| self.codes[node.symbol] = code if code else '0' |
| return |
| |
| if node.left: |
| self._generate_codes(node.left, code + '0') |
| if node.right: |
| self._generate_codes(node.right, code + '1') |
| |
| def encode(self, data: List[int]) -> Tuple[bytes, Dict]: |
| """Encode data using Huffman coding.""" |
| frequencies = Counter(data) |
| |
| if len(frequencies) == 1: |
| |
| symbol = list(frequencies.keys())[0] |
| self.codes = {symbol: '0'} |
| else: |
| self._build_tree(frequencies) |
| self._generate_codes(self.root) |
| |
| |
| encoded_bits = ''.join(self.codes[symbol] for symbol in data) |
| |
| |
| padding = (8 - len(encoded_bits) % 8) % 8 |
| encoded_bits += '0' * padding |
| |
| encoded_bytes = bytes(int(encoded_bits[i:i+8], 2) for i in range(0, len(encoded_bits), 8)) |
| |
| metadata = { |
| 'codes': self.codes, |
| 'padding': padding, |
| 'original_length': len(data) |
| } |
| |
| return encoded_bytes, metadata |
| |
| def decode(self, encoded_bytes: bytes, metadata: Dict) -> List[int]: |
| """Decode Huffman encoded data.""" |
| |
| reverse_codes = {code: symbol for symbol, code in metadata['codes'].items()} |
| |
| |
| bit_string = ''.join(format(byte, '08b') for byte in encoded_bytes) |
| |
| |
| if metadata['padding'] > 0: |
| bit_string = bit_string[:-metadata['padding']] |
| |
| decoded = [] |
| current_code = '' |
| |
| for bit in bit_string: |
| current_code += bit |
| if current_code in reverse_codes: |
| decoded.append(reverse_codes[current_code]) |
| current_code = '' |
| if len(decoded) >= metadata['original_length']: |
| break |
| |
| return decoded |
|
|
|
|
| def calculate_entropy(data: List[int]) -> float: |
| """Calculate Shannon entropy of data.""" |
| probabilities = list(Counter(data).values()) |
| return scipy_entropy(probabilities, base=2) * len(data) / 8 |
|
|
|
|
| def theoretical_minimum_size(data: List[int]) -> float: |
| """Calculate theoretical minimum compressed size in bytes.""" |
| return calculate_entropy(data) |