Buckets:
MisterAI/LocalAI_Demo_backends / cpu-diffusers.upgrade-tmp /python /lib /python3.10 /unittest /signals.py
| import signal | |
| import weakref | |
| from functools import wraps | |
| __unittest = True | |
| class _InterruptHandler(object): | |
| def __init__(self, default_handler): | |
| self.called = False | |
| self.original_handler = default_handler | |
| if isinstance(default_handler, int): | |
| if default_handler == signal.SIG_DFL: | |
| # Pretend it's signal.default_int_handler instead. | |
| default_handler = signal.default_int_handler | |
| elif default_handler == signal.SIG_IGN: | |
| # Not quite the same thing as SIG_IGN, but the closest we | |
| # can make it: do nothing. | |
| def default_handler(unused_signum, unused_frame): | |
| pass | |
| else: | |
| raise TypeError("expected SIGINT signal handler to be " | |
| "signal.SIG_IGN, signal.SIG_DFL, or a " | |
| "callable object") | |
| self.default_handler = default_handler | |
| def __call__(self, signum, frame): | |
| installed_handler = signal.getsignal(signal.SIGINT) | |
| if installed_handler is not self: | |
| # if we aren't the installed handler, then delegate immediately | |
| # to the default handler | |
| self.default_handler(signum, frame) | |
| if self.called: | |
| self.default_handler(signum, frame) | |
| self.called = True | |
| for result in _results.keys(): | |
| result.stop() | |
| _results = weakref.WeakKeyDictionary() | |
| def registerResult(result): | |
| _results[result] = 1 | |
| def removeResult(result): | |
| return bool(_results.pop(result, None)) | |
| _interrupt_handler = None | |
| def installHandler(): | |
| global _interrupt_handler | |
| if _interrupt_handler is None: | |
| default_handler = signal.getsignal(signal.SIGINT) | |
| _interrupt_handler = _InterruptHandler(default_handler) | |
| signal.signal(signal.SIGINT, _interrupt_handler) | |
| def removeHandler(method=None): | |
| if method is not None: | |
| def inner(*args, **kwargs): | |
| initial = signal.getsignal(signal.SIGINT) | |
| removeHandler() | |
| try: | |
| return method(*args, **kwargs) | |
| finally: | |
| signal.signal(signal.SIGINT, initial) | |
| return inner | |
| global _interrupt_handler | |
| if _interrupt_handler is not None: | |
| signal.signal(signal.SIGINT, _interrupt_handler.original_handler) | |
Xet Storage Details
- Size:
- 2.4 kB
- Xet hash:
- 56b5dad5e585073318d5e18f3c9d7fae406894389e2b9c517bb2d73fc80551a2
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.