File size: 7,882 Bytes
288007d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 |
import _thread
import contextlib
import functools
import sys
import threading
import time
import unittest
from test import support
#=======================================================================
# Threading support to prevent reporting refleaks when running regrtest.py -R
# NOTE: we use thread._count() rather than threading.enumerate() (or the
# moral equivalent thereof) because a threading.Thread object is still alive
# until its __bootstrap() method has returned, even after it has been
# unregistered from the threading module.
# thread._count(), on the other hand, only gets decremented *after* the
# __bootstrap() method has returned, which gives us reliable reference counts
# at the end of a test run.
def threading_setup():
return _thread._count(), len(threading._dangling)
def threading_cleanup(*original_values):
orig_count, orig_ndangling = original_values
timeout = 1.0
for _ in support.sleeping_retry(timeout, error=False):
# Copy the thread list to get a consistent output. threading._dangling
# is a WeakSet, its value changes when it's read.
dangling_threads = list(threading._dangling)
count = _thread._count()
if count <= orig_count:
return
# Timeout!
support.environment_altered = True
support.print_warning(
f"threading_cleanup() failed to clean up threads "
f"in {timeout:.1f} seconds\n"
f" before: thread count={orig_count}, dangling={orig_ndangling}\n"
f" after: thread count={count}, dangling={len(dangling_threads)}")
for thread in dangling_threads:
support.print_warning(f"Dangling thread: {thread!r}")
# The warning happens when a test spawns threads and some of these threads
# are still running after the test completes. To fix this warning, join
# threads explicitly to wait until they complete.
#
# To make the warning more likely, reduce the timeout.
def reap_threads(func):
"""Use this function when threads are being used. This will
ensure that the threads are cleaned up even when the test fails.
"""
@functools.wraps(func)
def decorator(*args):
key = threading_setup()
try:
return func(*args)
finally:
threading_cleanup(*key)
return decorator
@contextlib.contextmanager
def wait_threads_exit(timeout=None):
"""
bpo-31234: Context manager to wait until all threads created in the with
statement exit.
Use _thread.count() to check if threads exited. Indirectly, wait until
threads exit the internal t_bootstrap() C function of the _thread module.
threading_setup() and threading_cleanup() are designed to emit a warning
if a test leaves running threads in the background. This context manager
is designed to cleanup threads started by the _thread.start_new_thread()
which doesn't allow to wait for thread exit, whereas thread.Thread has a
join() method.
"""
if timeout is None:
timeout = support.SHORT_TIMEOUT
old_count = _thread._count()
try:
yield
finally:
start_time = time.monotonic()
for _ in support.sleeping_retry(timeout, error=False):
support.gc_collect()
count = _thread._count()
if count <= old_count:
break
else:
dt = time.monotonic() - start_time
msg = (f"wait_threads() failed to cleanup {count - old_count} "
f"threads after {dt:.1f} seconds "
f"(count: {count}, old count: {old_count})")
raise AssertionError(msg)
def join_thread(thread, timeout=None):
"""Join a thread. Raise an AssertionError if the thread is still alive
after timeout seconds.
"""
if timeout is None:
timeout = support.SHORT_TIMEOUT
thread.join(timeout)
if thread.is_alive():
msg = f"failed to join the thread in {timeout:.1f} seconds"
raise AssertionError(msg)
@contextlib.contextmanager
def start_threads(threads, unlock=None):
import faulthandler
threads = list(threads)
started = []
try:
try:
for t in threads:
t.start()
started.append(t)
except:
if support.verbose:
print("Can't start %d threads, only %d threads started" %
(len(threads), len(started)))
raise
yield
finally:
try:
if unlock:
unlock()
endtime = time.monotonic()
for timeout in range(1, 16):
endtime += 60
for t in started:
t.join(max(endtime - time.monotonic(), 0.01))
started = [t for t in started if t.is_alive()]
if not started:
break
if support.verbose:
print('Unable to join %d threads during a period of '
'%d minutes' % (len(started), timeout))
finally:
started = [t for t in started if t.is_alive()]
if started:
faulthandler.dump_traceback(sys.stdout)
raise AssertionError('Unable to join %d threads' % len(started))
class catch_threading_exception:
"""
Context manager catching threading.Thread exception using
threading.excepthook.
Attributes set when an exception is caught:
* exc_type
* exc_value
* exc_traceback
* thread
See threading.excepthook() documentation for these attributes.
These attributes are deleted at the context manager exit.
Usage:
with threading_helper.catch_threading_exception() as cm:
# code spawning a thread which raises an exception
...
# check the thread exception, use cm attributes:
# exc_type, exc_value, exc_traceback, thread
...
# exc_type, exc_value, exc_traceback, thread attributes of cm no longer
# exists at this point
# (to avoid reference cycles)
"""
def __init__(self):
self.exc_type = None
self.exc_value = None
self.exc_traceback = None
self.thread = None
self._old_hook = None
def _hook(self, args):
self.exc_type = args.exc_type
self.exc_value = args.exc_value
self.exc_traceback = args.exc_traceback
self.thread = args.thread
def __enter__(self):
self._old_hook = threading.excepthook
threading.excepthook = self._hook
return self
def __exit__(self, *exc_info):
threading.excepthook = self._old_hook
del self.exc_type
del self.exc_value
del self.exc_traceback
del self.thread
def _can_start_thread() -> bool:
"""Detect whether Python can start new threads.
Some WebAssembly platforms do not provide a working pthread
implementation. Thread support is stubbed and any attempt
to create a new thread fails.
- wasm32-wasi does not have threading.
- wasm32-emscripten can be compiled with or without pthread
support (-s USE_PTHREADS / __EMSCRIPTEN_PTHREADS__).
"""
if sys.platform == "emscripten":
return sys._emscripten_info.pthreads
elif sys.platform == "wasi":
return False
else:
# assume all other platforms have working thread support.
return True
can_start_thread = _can_start_thread()
def requires_working_threading(*, module=False):
"""Skip tests or modules that require working threading.
Can be used as a function/class decorator or to skip an entire module.
"""
msg = "requires threading support"
if module:
if not can_start_thread:
raise unittest.SkipTest(msg)
else:
return unittest.skipUnless(can_start_thread, msg)
|