|
from __future__ import annotations |
|
|
|
import asyncio |
|
import contextvars |
|
import sys |
|
import time |
|
from asyncio import get_running_loop |
|
from types import TracebackType |
|
from typing import Any, Awaitable, Callable, TypeVar, cast |
|
|
|
__all__ = [ |
|
"run_in_executor_with_context", |
|
"call_soon_threadsafe", |
|
"get_traceback_from_context", |
|
] |
|
|
|
_T = TypeVar("_T") |
|
|
|
|
|
def run_in_executor_with_context( |
|
func: Callable[..., _T], |
|
*args: Any, |
|
loop: asyncio.AbstractEventLoop | None = None, |
|
) -> Awaitable[_T]: |
|
""" |
|
Run a function in an executor, but make sure it uses the same contextvars. |
|
This is required so that the function will see the right application. |
|
|
|
See also: https://bugs.python.org/issue34014 |
|
""" |
|
loop = loop or get_running_loop() |
|
ctx: contextvars.Context = contextvars.copy_context() |
|
|
|
return loop.run_in_executor(None, ctx.run, func, *args) |
|
|
|
|
|
def call_soon_threadsafe( |
|
func: Callable[[], None], |
|
max_postpone_time: float | None = None, |
|
loop: asyncio.AbstractEventLoop | None = None, |
|
) -> None: |
|
""" |
|
Wrapper around asyncio's `call_soon_threadsafe`. |
|
|
|
This takes a `max_postpone_time` which can be used to tune the urgency of |
|
the method. |
|
|
|
Asyncio runs tasks in first-in-first-out. However, this is not what we |
|
want for the render function of the prompt_toolkit UI. Rendering is |
|
expensive, but since the UI is invalidated very often, in some situations |
|
we render the UI too often, so much that the rendering CPU usage slows down |
|
the rest of the processing of the application. (Pymux is an example where |
|
we have to balance the CPU time spend on rendering the UI, and parsing |
|
process output.) |
|
However, we want to set a deadline value, for when the rendering should |
|
happen. (The UI should stay responsive). |
|
""" |
|
loop2 = loop or get_running_loop() |
|
|
|
|
|
if max_postpone_time is None: |
|
loop2.call_soon_threadsafe(func) |
|
return |
|
|
|
max_postpone_until = time.time() + max_postpone_time |
|
|
|
def schedule() -> None: |
|
|
|
|
|
|
|
|
|
if not getattr(loop2, "_ready", []): |
|
func() |
|
return |
|
|
|
|
|
if time.time() > max_postpone_until: |
|
func() |
|
return |
|
|
|
|
|
loop2.call_soon_threadsafe(schedule) |
|
|
|
loop2.call_soon_threadsafe(schedule) |
|
|
|
|
|
def get_traceback_from_context(context: dict[str, Any]) -> TracebackType | None: |
|
""" |
|
Get the traceback object from the context. |
|
""" |
|
exception = context.get("exception") |
|
if exception: |
|
if hasattr(exception, "__traceback__"): |
|
return cast(TracebackType, exception.__traceback__) |
|
else: |
|
|
|
|
|
|
|
return sys.exc_info()[2] |
|
|
|
return None |
|
|