Spaces:
Build error
Build error
"""Abstract Transport class.""" | |
__all__ = ( | |
'BaseTransport', 'ReadTransport', 'WriteTransport', | |
'Transport', 'DatagramTransport', 'SubprocessTransport', | |
) | |
class BaseTransport: | |
"""Base class for transports.""" | |
__slots__ = ('_extra',) | |
def __init__(self, extra=None): | |
if extra is None: | |
extra = {} | |
self._extra = extra | |
def get_extra_info(self, name, default=None): | |
"""Get optional transport information.""" | |
return self._extra.get(name, default) | |
def is_closing(self): | |
"""Return True if the transport is closing or closed.""" | |
raise NotImplementedError | |
def close(self): | |
"""Close the transport. | |
Buffered data will be flushed asynchronously. No more data | |
will be received. After all buffered data is flushed, the | |
protocol's connection_lost() method will (eventually) be | |
called with None as its argument. | |
""" | |
raise NotImplementedError | |
def set_protocol(self, protocol): | |
"""Set a new protocol.""" | |
raise NotImplementedError | |
def get_protocol(self): | |
"""Return the current protocol.""" | |
raise NotImplementedError | |
class ReadTransport(BaseTransport): | |
"""Interface for read-only transports.""" | |
__slots__ = () | |
def is_reading(self): | |
"""Return True if the transport is receiving.""" | |
raise NotImplementedError | |
def pause_reading(self): | |
"""Pause the receiving end. | |
No data will be passed to the protocol's data_received() | |
method until resume_reading() is called. | |
""" | |
raise NotImplementedError | |
def resume_reading(self): | |
"""Resume the receiving end. | |
Data received will once again be passed to the protocol's | |
data_received() method. | |
""" | |
raise NotImplementedError | |
class WriteTransport(BaseTransport): | |
"""Interface for write-only transports.""" | |
__slots__ = () | |
def set_write_buffer_limits(self, high=None, low=None): | |
"""Set the high- and low-water limits for write flow control. | |
These two values control when to call the protocol's | |
pause_writing() and resume_writing() methods. If specified, | |
the low-water limit must be less than or equal to the | |
high-water limit. Neither value can be negative. | |
The defaults are implementation-specific. If only the | |
high-water limit is given, the low-water limit defaults to an | |
implementation-specific value less than or equal to the | |
high-water limit. Setting high to zero forces low to zero as | |
well, and causes pause_writing() to be called whenever the | |
buffer becomes non-empty. Setting low to zero causes | |
resume_writing() to be called only once the buffer is empty. | |
Use of zero for either limit is generally sub-optimal as it | |
reduces opportunities for doing I/O and computation | |
concurrently. | |
""" | |
raise NotImplementedError | |
def get_write_buffer_size(self): | |
"""Return the current size of the write buffer.""" | |
raise NotImplementedError | |
def get_write_buffer_limits(self): | |
"""Get the high and low watermarks for write flow control. | |
Return a tuple (low, high) where low and high are | |
positive number of bytes.""" | |
raise NotImplementedError | |
def write(self, data): | |
"""Write some data bytes to the transport. | |
This does not block; it buffers the data and arranges for it | |
to be sent out asynchronously. | |
""" | |
raise NotImplementedError | |
def writelines(self, list_of_data): | |
"""Write a list (or any iterable) of data bytes to the transport. | |
The default implementation concatenates the arguments and | |
calls write() on the result. | |
""" | |
data = b''.join(list_of_data) | |
self.write(data) | |
def write_eof(self): | |
"""Close the write end after flushing buffered data. | |
(This is like typing ^D into a UNIX program reading from stdin.) | |
Data may still be received. | |
""" | |
raise NotImplementedError | |
def can_write_eof(self): | |
"""Return True if this transport supports write_eof(), False if not.""" | |
raise NotImplementedError | |
def abort(self): | |
"""Close the transport immediately. | |
Buffered data will be lost. No more data will be received. | |
The protocol's connection_lost() method will (eventually) be | |
called with None as its argument. | |
""" | |
raise NotImplementedError | |
class Transport(ReadTransport, WriteTransport): | |
"""Interface representing a bidirectional transport. | |
There may be several implementations, but typically, the user does | |
not implement new transports; rather, the platform provides some | |
useful transports that are implemented using the platform's best | |
practices. | |
The user never instantiates a transport directly; they call a | |
utility function, passing it a protocol factory and other | |
information necessary to create the transport and protocol. (E.g. | |
EventLoop.create_connection() or EventLoop.create_server().) | |
The utility function will asynchronously create a transport and a | |
protocol and hook them up by calling the protocol's | |
connection_made() method, passing it the transport. | |
The implementation here raises NotImplemented for every method | |
except writelines(), which calls write() in a loop. | |
""" | |
__slots__ = () | |
class DatagramTransport(BaseTransport): | |
"""Interface for datagram (UDP) transports.""" | |
__slots__ = () | |
def sendto(self, data, addr=None): | |
"""Send data to the transport. | |
This does not block; it buffers the data and arranges for it | |
to be sent out asynchronously. | |
addr is target socket address. | |
If addr is None use target address pointed on transport creation. | |
""" | |
raise NotImplementedError | |
def abort(self): | |
"""Close the transport immediately. | |
Buffered data will be lost. No more data will be received. | |
The protocol's connection_lost() method will (eventually) be | |
called with None as its argument. | |
""" | |
raise NotImplementedError | |
class SubprocessTransport(BaseTransport): | |
__slots__ = () | |
def get_pid(self): | |
"""Get subprocess id.""" | |
raise NotImplementedError | |
def get_returncode(self): | |
"""Get subprocess returncode. | |
See also | |
http://docs.python.org/3/library/subprocess#subprocess.Popen.returncode | |
""" | |
raise NotImplementedError | |
def get_pipe_transport(self, fd): | |
"""Get transport for pipe with number fd.""" | |
raise NotImplementedError | |
def send_signal(self, signal): | |
"""Send signal to subprocess. | |
See also: | |
docs.python.org/3/library/subprocess#subprocess.Popen.send_signal | |
""" | |
raise NotImplementedError | |
def terminate(self): | |
"""Stop the subprocess. | |
Alias for close() method. | |
On Posix OSs the method sends SIGTERM to the subprocess. | |
On Windows the Win32 API function TerminateProcess() | |
is called to stop the subprocess. | |
See also: | |
http://docs.python.org/3/library/subprocess#subprocess.Popen.terminate | |
""" | |
raise NotImplementedError | |
def kill(self): | |
"""Kill the subprocess. | |
On Posix OSs the function sends SIGKILL to the subprocess. | |
On Windows kill() is an alias for terminate(). | |
See also: | |
http://docs.python.org/3/library/subprocess#subprocess.Popen.kill | |
""" | |
raise NotImplementedError | |
class _FlowControlMixin(Transport): | |
"""All the logic for (write) flow control in a mix-in base class. | |
The subclass must implement get_write_buffer_size(). It must call | |
_maybe_pause_protocol() whenever the write buffer size increases, | |
and _maybe_resume_protocol() whenever it decreases. It may also | |
override set_write_buffer_limits() (e.g. to specify different | |
defaults). | |
The subclass constructor must call super().__init__(extra). This | |
will call set_write_buffer_limits(). | |
The user may call set_write_buffer_limits() and | |
get_write_buffer_size(), and their protocol's pause_writing() and | |
resume_writing() may be called. | |
""" | |
__slots__ = ('_loop', '_protocol_paused', '_high_water', '_low_water') | |
def __init__(self, extra=None, loop=None): | |
super().__init__(extra) | |
assert loop is not None | |
self._loop = loop | |
self._protocol_paused = False | |
self._set_write_buffer_limits() | |
def _maybe_pause_protocol(self): | |
size = self.get_write_buffer_size() | |
if size <= self._high_water: | |
return | |
if not self._protocol_paused: | |
self._protocol_paused = True | |
try: | |
self._protocol.pause_writing() | |
except (SystemExit, KeyboardInterrupt): | |
raise | |
except BaseException as exc: | |
self._loop.call_exception_handler({ | |
'message': 'protocol.pause_writing() failed', | |
'exception': exc, | |
'transport': self, | |
'protocol': self._protocol, | |
}) | |
def _maybe_resume_protocol(self): | |
if (self._protocol_paused and | |
self.get_write_buffer_size() <= self._low_water): | |
self._protocol_paused = False | |
try: | |
self._protocol.resume_writing() | |
except (SystemExit, KeyboardInterrupt): | |
raise | |
except BaseException as exc: | |
self._loop.call_exception_handler({ | |
'message': 'protocol.resume_writing() failed', | |
'exception': exc, | |
'transport': self, | |
'protocol': self._protocol, | |
}) | |
def get_write_buffer_limits(self): | |
return (self._low_water, self._high_water) | |
def _set_write_buffer_limits(self, high=None, low=None): | |
if high is None: | |
if low is None: | |
high = 64 * 1024 | |
else: | |
high = 4 * low | |
if low is None: | |
low = high // 4 | |
if not high >= low >= 0: | |
raise ValueError( | |
f'high ({high!r}) must be >= low ({low!r}) must be >= 0') | |
self._high_water = high | |
self._low_water = low | |
def set_write_buffer_limits(self, high=None, low=None): | |
self._set_write_buffer_limits(high=high, low=low) | |
self._maybe_pause_protocol() | |
def get_write_buffer_size(self): | |
raise NotImplementedError | |