| from __future__ import annotations |
|
|
| import math |
| from typing import TypeVar |
| from warnings import warn |
|
|
| from ..streams.memory import ( |
| MemoryObjectReceiveStream, |
| MemoryObjectSendStream, |
| _MemoryObjectStreamState, |
| ) |
|
|
| T_Item = TypeVar("T_Item") |
|
|
|
|
| class create_memory_object_stream( |
| tuple[MemoryObjectSendStream[T_Item], MemoryObjectReceiveStream[T_Item]], |
| ): |
| """ |
| Create a memory object stream. |
| |
| The stream's item type can be annotated like |
| :func:`create_memory_object_stream[T_Item]`. |
| |
| :param max_buffer_size: number of items held in the buffer until ``send()`` starts |
| blocking |
| :param item_type: old way of marking the streams with the right generic type for |
| static typing (does nothing on AnyIO 4) |
| |
| .. deprecated:: 4.0 |
| Use ``create_memory_object_stream[YourItemType](...)`` instead. |
| :return: a tuple of (send stream, receive stream) |
| |
| """ |
|
|
| def __new__( |
| cls, max_buffer_size: float = 0, item_type: object = None |
| ) -> tuple[MemoryObjectSendStream[T_Item], MemoryObjectReceiveStream[T_Item]]: |
| if max_buffer_size != math.inf and not isinstance(max_buffer_size, int): |
| raise ValueError("max_buffer_size must be either an integer or math.inf") |
| if max_buffer_size < 0: |
| raise ValueError("max_buffer_size cannot be negative") |
| if item_type is not None: |
| warn( |
| "The item_type argument has been deprecated in AnyIO 4.0. " |
| "Use create_memory_object_stream[YourItemType](...) instead.", |
| DeprecationWarning, |
| stacklevel=2, |
| ) |
|
|
| state = _MemoryObjectStreamState[T_Item](max_buffer_size) |
| return (MemoryObjectSendStream(state), MemoryObjectReceiveStream(state)) |
|
|