| |
| """ |
| Test entropy coding with equiprobable partitioning on various datasets. |
| Compares against Huffman coding and theoretical limits. |
| """ |
|
|
| import numpy as np |
| from datasets import load_dataset |
| import os |
| from typing import List, Dict, Tuple, Optional |
| from enumerative_coding import EnumerativeEncoder |
| from entropy_coding import HuffmanEncoder, theoretical_minimum_size |
| import json |
| import signal |
| from contextlib import contextmanager |
|
|
|
|
| class TimeoutError(Exception): |
| """Raised when a timeout occurs.""" |
| pass |
|
|
|
|
| @contextmanager |
| def timeout(seconds): |
| """Context manager for timeout functionality.""" |
| def timeout_handler(signum, frame): |
| raise TimeoutError(f"Operation timed out after {seconds} seconds") |
| |
| |
| old_handler = signal.signal(signal.SIGALRM, timeout_handler) |
| signal.alarm(seconds) |
| |
| try: |
| yield |
| finally: |
| |
| signal.signal(signal.SIGALRM, old_handler) |
| signal.alarm(0) |
|
|
|
|
| def generate_iid_data(size: int, vocab_size: int, distribution: str = 'uniform') -> List[int]: |
| """ |
| Generate i.i.d. data with specified vocabulary size and distribution. |
| |
| Args: |
| size: Number of symbols to generate |
| vocab_size: Size of the vocabulary (number of unique symbols) |
| distribution: 'uniform', 'zipf', or 'geometric' |
| """ |
| if distribution == 'uniform': |
| return list(np.random.randint(0, vocab_size, size)) |
| elif distribution == 'zipf': |
| |
| probabilities = 1 / np.arange(1, vocab_size + 1) |
| probabilities /= probabilities.sum() |
| return list(np.random.choice(vocab_size, size, p=probabilities)) |
| elif distribution == 'geometric': |
| |
| p = 0.3 |
| probabilities = [(1 - p) ** i * p for i in range(vocab_size - 1)] |
| probabilities.append((1 - p) ** (vocab_size - 1)) |
| probabilities = np.array(probabilities) |
| probabilities /= probabilities.sum() |
| return list(np.random.choice(vocab_size, size, p=probabilities)) |
| else: |
| raise ValueError(f"Unknown distribution: {distribution}") |
|
|
|
|
| def download_english_text() -> str: |
| """Download English text from Hugging Face.""" |
| print("Downloading English text dataset...") |
| |
| dataset = load_dataset("wikitext", "wikitext-2-raw-v1", split="train[:10%]") |
| |
| |
| text = " ".join(dataset['text'][:1000]) |
| |
| |
| text = " ".join(text.split()) |
| |
| return text |
|
|
|
|
| def text_to_symbols(text: str) -> List[int]: |
| """Convert text to list of integer symbols (byte values).""" |
| return list(text.encode('utf-8')) |
|
|
|
|
| def symbols_to_text(symbols: List[int]) -> str: |
| """Convert list of integer symbols back to text.""" |
| return bytes(symbols).decode('utf-8', errors='ignore') |
|
|
|
|
| def compress_and_compare(data: List[int], name: str, timeout_seconds: int = 30) -> Dict: |
| """ |
| Compress data using different methods and compare results. |
| |
| Args: |
| data: Input data as list of integers |
| name: Name for this dataset |
| timeout_seconds: Timeout in seconds for enumerative coding |
| """ |
| original_size = len(data) |
| |
| results = { |
| 'name': name, |
| 'original_size': original_size, |
| 'vocabulary_size': len(set(data)), |
| 'theoretical_minimum': theoretical_minimum_size(data), |
| 'methods': {} |
| } |
| |
| |
| print(f" Compressing with Huffman coding...") |
| import time |
| |
| huffman_start_time = time.time() |
| huffman = HuffmanEncoder() |
| huffman_encoded, huffman_metadata = huffman.encode(data) |
| huffman_size = len(huffman_encoded) |
| |
| |
| huffman_decoded = huffman.decode(huffman_encoded, huffman_metadata) |
| huffman_correct = huffman_decoded == data |
| huffman_encoding_time = time.time() - huffman_start_time |
| |
| results['methods']['huffman'] = { |
| 'compressed_size': huffman_size, |
| 'compression_ratio': original_size / huffman_size, |
| 'bits_per_symbol': huffman_size * 8 / len(data), |
| 'correct': huffman_correct, |
| 'encoding_time': huffman_encoding_time |
| } |
| |
| |
| print(f" Compressing with enumerative entropy coding (timeout: {timeout_seconds}s)...") |
| import time |
| |
| start_time = time.time() |
| try: |
| with timeout(timeout_seconds): |
| ep_encoder = EnumerativeEncoder() |
| ep_encoded = ep_encoder.encode(data) |
| ep_size = len(ep_encoded) |
| |
| |
| ep_decoded = ep_encoder.decode(ep_encoded) |
| ep_correct = ep_decoded == data |
| |
| encoding_time = time.time() - start_time |
| results['methods']['enumerative'] = { |
| 'compressed_size': ep_size, |
| 'compression_ratio': original_size / ep_size, |
| 'bits_per_symbol': ep_size * 8 / len(data), |
| 'correct': ep_correct, |
| 'encoding_time': encoding_time |
| } |
| except TimeoutError as e: |
| encoding_time = time.time() - start_time |
| print(f" Enumerative coding timed out: {e}") |
| results['methods']['enumerative'] = { |
| 'compressed_size': None, |
| 'compression_ratio': None, |
| 'bits_per_symbol': None, |
| 'correct': False, |
| 'encoding_time': encoding_time, |
| 'timed_out': True |
| } |
| except ValueError as e: |
| encoding_time = time.time() - start_time |
| print(f" Enumerative coding failed: {e}") |
| results['methods']['enumerative'] = { |
| 'compressed_size': None, |
| 'compression_ratio': None, |
| 'bits_per_symbol': None, |
| 'correct': False, |
| 'encoding_time': encoding_time, |
| 'timed_out': False, |
| 'error': str(e) |
| } |
| except Exception as e: |
| encoding_time = time.time() - start_time |
| print(f" Enumerative coding failed: {e}") |
| results['methods']['enumerative'] = { |
| 'compressed_size': None, |
| 'compression_ratio': None, |
| 'bits_per_symbol': None, |
| 'correct': False, |
| 'encoding_time': encoding_time, |
| 'timed_out': False, |
| 'error': str(e) |
| } |
| |
| return results |
|
|
|
|
| def main(): |
| """Run compression tests on various datasets.""" |
| np.random.seed(42) |
| |
| all_results = [] |
| |
| |
| test_configs = [ |
| |
| |
| ("uniform_10k_v256", 10000, 256, 'uniform'), |
| |
| |
| ("zipf_10k_v16", 10000, 16, 'zipf'), |
| ("zipf_10k_v64", 10000, 64, 'zipf'), |
| ("zipf_10k_v256", 10000, 256, 'zipf'), |
| ("zipf_5k_v16", 5000, 16, 'zipf'), |
| ("zipf_5k_v64", 5000, 64, 'zipf'), |
| ("zipf_5k_v256", 5000, 256, 'zipf'), |
| |
| |
| ("geometric_10k_v16", 10000, 16, 'geometric'), |
| ("geometric_10k_v64", 10000, 64, 'geometric'), |
| ("geometric_10k_v256", 10000, 256, 'geometric'), |
| ("geometric_5k_v16", 5000, 16, 'geometric'), |
| ("geometric_5k_v64", 5000, 64, 'geometric'), |
| ("geometric_5k_v256", 5000, 256, 'geometric'), |
| |
| |
| ("zipf_100k_v256", 100000, 256, 'zipf'), |
| ] |
| |
| |
| print("Testing i.i.d. datasets...") |
| for name, size, vocab_size, distribution in test_configs: |
| print(f"\nTesting {name} (size={size}, vocab={vocab_size}, dist={distribution})...") |
| data = generate_iid_data(size, vocab_size, distribution) |
| results = compress_and_compare(data, name) |
| all_results.append(results) |
| print_results(results) |
| |
| |
| print("\nTesting English text...") |
| english_text = download_english_text() |
| print(f"Text length: {len(english_text)} characters") |
| |
| |
| text_symbols = text_to_symbols(english_text) |
| |
| |
| max_text_length = 2000 |
| if len(text_symbols) > max_text_length: |
| text_symbols = text_symbols[:max_text_length] |
| print(f"Using text subset of {len(text_symbols)} symbols (original: {len(text_to_symbols(english_text))})") |
| |
| text_results = compress_and_compare(text_symbols, "english_text") |
| all_results.append(text_results) |
| print_results(text_results) |
| |
| |
| with open('compression_results.json', 'w') as f: |
| json.dump(all_results, f, indent=2) |
| |
| print("\nResults saved to compression_results.json") |
| |
| |
| print("\n" + "="*80) |
| print("COMPRESSION SUMMARY") |
| print("="*80) |
| print(f"{'Dataset':<20} {'Original':<10} {'Theoretical':<12} {'Huffman':<10} {'Enumerative':<12}") |
| print("-"*70) |
| |
| for result in all_results: |
| name = result['name'][:20] |
| original = result['original_size'] |
| theoretical = result['theoretical_minimum'] |
| huffman = result['methods']['huffman']['compressed_size'] |
| enumerative_method = result['methods']['enumerative'] |
| if enumerative_method is not None and enumerative_method.get('compressed_size') is not None: |
| enumerative = enumerative_method['compressed_size'] |
| else: |
| enumerative = "TIMEOUT" |
| |
| row = f"{name:<20} {original:<10} {theoretical:<12.1f} {huffman:<10} {enumerative:<12}" |
| print(row) |
|
|
|
|
| def print_results(results: Dict): |
| """Print compression results for a dataset.""" |
| print(f"\nResults for {results['name']}:") |
| print(f" Original size: {results['original_size']} bytes") |
| print(f" Vocabulary size: {results['vocabulary_size']}") |
| print(f" Theoretical minimum: {results['theoretical_minimum']:.1f} bytes") |
| |
| for method, data in results['methods'].items(): |
| print(f"\n {method}:") |
| if data is None or data.get('compressed_size') is None: |
| if data and data.get('timed_out'): |
| print(f" Status: TIMEOUT after {data.get('encoding_time', 0):.2f}s - computational complexity too high") |
| elif data and data.get('error'): |
| print(f" Status: FAILED - {data.get('error')}") |
| else: |
| print(f" Status: FAILED - computational complexity too high") |
| else: |
| print(f" Compressed size: {data['compressed_size']} bytes") |
| print(f" Compression ratio: {data['compression_ratio']:.2f}") |
| print(f" Bits per symbol: {data['bits_per_symbol']:.2f}") |
| print(f" Correctly decoded: {data['correct']}") |
| if 'encoding_time' in data: |
| print(f" Encoding time: {data['encoding_time']:.3f}s") |
|
|
|
|
| if __name__ == "__main__": |
| main() |