|
|
|
|
|
|
|
|
|
|
|
|
|
import logging |
|
import os |
|
from queue import Queue, Empty |
|
import signal |
|
import sys |
|
import threading |
|
import traceback |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class DeadlockDetect: |
|
def __init__(self, use: bool = False, timeout: float = 120.): |
|
self.use = use |
|
self.timeout = timeout |
|
self._queue: Queue = Queue() |
|
|
|
def update(self, stage: str): |
|
if self.use: |
|
self._queue.put(stage) |
|
|
|
def __enter__(self): |
|
if self.use: |
|
self._thread = threading.Thread(target=self._detector_thread) |
|
self._thread.start() |
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb): |
|
if self.use: |
|
self._queue.put(None) |
|
self._thread.join() |
|
|
|
def _detector_thread(self): |
|
logger.debug("Deadlock detector started") |
|
last_stage = "init" |
|
while True: |
|
try: |
|
stage = self._queue.get(timeout=self.timeout) |
|
except Empty: |
|
break |
|
if stage is None: |
|
logger.debug("Exiting deadlock detector thread") |
|
return |
|
else: |
|
last_stage = stage |
|
logger.error("Deadlock detector timed out, last stage was %s", last_stage) |
|
for th in threading.enumerate(): |
|
print(th, file=sys.stderr) |
|
traceback.print_stack(sys._current_frames()[th.ident]) |
|
print(file=sys.stderr) |
|
sys.stdout.flush() |
|
sys.stderr.flush() |
|
os.kill(os.getpid(), signal.SIGKILL) |
|
|