File size: 1,997 Bytes
c72e80d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
from time import perf_counter
import logging
logger = logging.getLogger(__name__)
class BaseHandler:
"""
Base class for pipeline parts. Each part of the pipeline has an input and an output queue.
The `setup` method along with `setup_args` and `setup_kwargs` can be used to address the specific requirements of the implemented pipeline part.
To stop a handler properly, set the stop_event and, to avoid queue deadlocks, place b"END" in the input queue.
Objects placed in the input queue will be processed by the `process` method, and the yielded results will be placed in the output queue.
The cleanup method handles stopping the handler, and b"END" is placed in the output queue.
"""
def __init__(self, stop_event, queue_in, queue_out, setup_args=(), setup_kwargs={}):
self.stop_event = stop_event
self.queue_in = queue_in
self.queue_out = queue_out
self.setup(*setup_args, **setup_kwargs)
self._times = []
def setup(self):
pass
def process(self):
raise NotImplementedError
def run(self):
while not self.stop_event.is_set():
input = self.queue_in.get()
if isinstance(input, bytes) and input == b"END":
# sentinelle signal to avoid queue deadlock
logger.debug("Stopping thread")
break
start_time = perf_counter()
for output in self.process(input):
self._times.append(perf_counter() - start_time)
if self.last_time > self.min_time_to_debug:
logger.debug(f"{self.__class__.__name__}: {self.last_time: .3f} s")
self.queue_out.put(output)
start_time = perf_counter()
self.cleanup()
self.queue_out.put(b"END")
@property
def last_time(self):
return self._times[-1]
@property
def min_time_to_debug(self):
return 0.001
def cleanup(self):
pass
|