|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import hashlib |
|
import os |
|
import random |
|
import threading |
|
import time |
|
from typing import Any, Optional |
|
|
|
|
|
class EntropyPool: |
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, seed: Optional[bytes] = None): |
|
self.pool_index = 0 |
|
self.digest: Optional[bytearray] = None |
|
self.next_byte = 0 |
|
self.lock = threading.Lock() |
|
self.hash = hashlib.sha1() |
|
self.hash_len = 20 |
|
self.pool = bytearray(b"\0" * self.hash_len) |
|
if seed is not None: |
|
self._stir(seed) |
|
self.seeded = True |
|
self.seed_pid = os.getpid() |
|
else: |
|
self.seeded = False |
|
self.seed_pid = 0 |
|
|
|
def _stir(self, entropy: bytes) -> None: |
|
for c in entropy: |
|
if self.pool_index == self.hash_len: |
|
self.pool_index = 0 |
|
b = c & 0xFF |
|
self.pool[self.pool_index] ^= b |
|
self.pool_index += 1 |
|
|
|
def stir(self, entropy: bytes) -> None: |
|
with self.lock: |
|
self._stir(entropy) |
|
|
|
def _maybe_seed(self) -> None: |
|
if not self.seeded or self.seed_pid != os.getpid(): |
|
try: |
|
seed = os.urandom(16) |
|
except Exception: |
|
try: |
|
with open("/dev/urandom", "rb", 0) as r: |
|
seed = r.read(16) |
|
except Exception: |
|
seed = str(time.time()).encode() |
|
self.seeded = True |
|
self.seed_pid = os.getpid() |
|
self.digest = None |
|
seed = bytearray(seed) |
|
self._stir(seed) |
|
|
|
def random_8(self) -> int: |
|
with self.lock: |
|
self._maybe_seed() |
|
if self.digest is None or self.next_byte == self.hash_len: |
|
self.hash.update(bytes(self.pool)) |
|
self.digest = bytearray(self.hash.digest()) |
|
self._stir(self.digest) |
|
self.next_byte = 0 |
|
value = self.digest[self.next_byte] |
|
self.next_byte += 1 |
|
return value |
|
|
|
def random_16(self) -> int: |
|
return self.random_8() * 256 + self.random_8() |
|
|
|
def random_32(self) -> int: |
|
return self.random_16() * 65536 + self.random_16() |
|
|
|
def random_between(self, first: int, last: int) -> int: |
|
size = last - first + 1 |
|
if size > 4294967296: |
|
raise ValueError("too big") |
|
if size > 65536: |
|
rand = self.random_32 |
|
max = 4294967295 |
|
elif size > 256: |
|
rand = self.random_16 |
|
max = 65535 |
|
else: |
|
rand = self.random_8 |
|
max = 255 |
|
return first + size * rand() // (max + 1) |
|
|
|
|
|
pool = EntropyPool() |
|
|
|
system_random: Optional[Any] |
|
try: |
|
system_random = random.SystemRandom() |
|
except Exception: |
|
system_random = None |
|
|
|
|
|
def random_16() -> int: |
|
if system_random is not None: |
|
return system_random.randrange(0, 65536) |
|
else: |
|
return pool.random_16() |
|
|
|
|
|
def between(first: int, last: int) -> int: |
|
if system_random is not None: |
|
return system_random.randrange(first, last + 1) |
|
else: |
|
return pool.random_between(first, last) |
|
|