Skip to content

bpo-32251: Implement asyncio.BufferedProtocol. #4755

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jan 28, 2018
75 changes: 71 additions & 4 deletions Doc/library/asyncio-protocol.rst
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,16 @@ Protocol classes
The base class for implementing streaming protocols (for use with
e.g. TCP and SSL transports).

.. class:: BufferedProtocol

A base class for implementing streaming protocols with manual
control of the receive buffer.

.. versionadded:: 3.7
**Important:** this has been been added to asyncio in Python 3.7
*on a provisional basis*! Treat it as an experimental API that
might be changed or removed in Python 3.8.

.. class:: DatagramProtocol

The base class for implementing datagram protocols (for use with
Expand Down Expand Up @@ -428,10 +438,67 @@ and, if called, :meth:`data_received` won't be called after it.

State machine:

start -> :meth:`~BaseProtocol.connection_made`
[-> :meth:`~Protocol.data_received` \*]
[-> :meth:`~Protocol.eof_received` ?]
-> :meth:`~BaseProtocol.connection_lost` -> end
.. code-block:: none

start -> connection_made
[-> data_received]*
[-> eof_received]?
-> connection_lost -> end


Streaming protocols with manual receive buffer control
------------------------------------------------------

.. versionadded:: 3.7
**Important:** :class:`BufferedProtocol` has been been added to
asyncio in Python 3.7 *on a provisional basis*! Treat it as an
experimental API that might be changed or removed in Python 3.8.


Event methods, such as :meth:`AbstractEventLoop.create_server` and
:meth:`AbstractEventLoop.create_connection`, accept factories that
return protocols that implement this interface.

The idea of BufferedProtocol is that it allows to manually allocate
and control the receive buffer. Event loops can then use the buffer
provided by the protocol to avoid unnecessary data copies. This
can result in noticeable performance improvement for protocols that
receive big amounts of data. Sophisticated protocols can allocate
the buffer only once at creation time.

The following callbacks are called on :class:`BufferedProtocol`
instances:

.. method:: BufferedProtocol.get_buffer()

Called to allocate a new receive buffer. Must return an object
that implements the :ref:`buffer protocol <bufferobjects>`.

.. method:: BufferedProtocol.buffer_updated(nbytes)

Called when the buffer was updated with the received data.

*nbytes* is the total number of bytes that were written to the buffer.

.. method:: BufferedProtocol.eof_received()

See the documentation of the :meth:`Protocol.eof_received` method.


:meth:`get_buffer` can be called an arbitrary number of times during
a connection. However, :meth:`eof_received` is called at most once
and, if called, :meth:`data_received` won't be called after it.

State machine:

.. code-block:: none

start -> connection_made
[-> get_buffer
[-> buffer_updated]?
]*
[-> eof_received]?
-> connection_lost -> end


Datagram protocols
Expand Down
137 changes: 117 additions & 20 deletions Lib/asyncio/proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from . import base_events
from . import constants
from . import futures
from . import protocols
from . import sslproto
from . import transports
from .log import logger
Expand Down Expand Up @@ -91,17 +92,19 @@ def __del__(self):
self.close()

def _fatal_error(self, exc, message='Fatal error on pipe transport'):
if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
if self._loop.get_debug():
logger.debug("%r: %s", self, message, exc_info=True)
else:
self._loop.call_exception_handler({
'message': message,
'exception': exc,
'transport': self,
'protocol': self._protocol,
})
self._force_close(exc)
try:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like not related to PR subject but I love the change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that the proactor code hasn't received any love recently, so there're many things we can improve here. I'm just fixing things I see slightly broken.

if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
if self._loop.get_debug():
logger.debug("%r: %s", self, message, exc_info=True)
else:
self._loop.call_exception_handler({
'message': message,
'exception': exc,
'transport': self,
'protocol': self._protocol,
})
finally:
self._force_close(exc)

def _force_close(self, exc):
if self._closing:
Expand Down Expand Up @@ -150,6 +153,12 @@ def __init__(self, loop, sock, protocol, waiter=None,
extra=None, server=None):
super().__init__(loop, sock, protocol, waiter, extra, server)
self._paused = False

if protocols._is_buffered_protocol(protocol):
self._loop_reading = self._loop_reading__get_buffer
else:
self._loop_reading = self._loop_reading__data_received

self._loop.call_soon(self._loop_reading)

def is_reading(self):
Expand All @@ -159,6 +168,11 @@ def pause_reading(self):
if self._closing or self._paused:
return
self._paused = True

if self._read_fut is not None and not self._read_fut.done():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proactor tricks again?
Not sure if cancelling is what we should do here.
self._read_fut should be set on next resume_reading call maybe? No sure though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I think this is just a bug. The problem is that when you call pause_reading, the current code doesn't actually pause it. It just sets a _paused flag but leaves the socket in the reading mode. Do you agree, or am I missing something here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you are right

self._read_fut.cancel()
self._read_fut = None

if self._loop.get_debug():
logger.debug("%r pauses reading", self)

Expand All @@ -170,11 +184,25 @@ def resume_reading(self):
if self._loop.get_debug():
logger.debug("%r resumes reading", self)

def _loop_reading(self, fut=None):
def _loop_reading__on_eof(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is two underscore in name intentional?
We never use this code style in asyncio

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just a bit more readable this way, since it's one "family" of functions (all three of them are combined in one "mega" function). I can replace with one underscore, but I think it's OK this way too. Up to you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, agree

if self._loop.get_debug():
logger.debug("%r received EOF", self)

try:
keep_open = self._protocol.eof_received()
except Exception as exc:
self._fatal_error(
exc, 'Fatal error: protocol.eof_received() call failed.')
return

if not keep_open:
self.close()

def _loop_reading__data_received(self, fut=None):
if self._paused:
return
data = None

data = None
try:
if fut is not None:
assert self._read_fut is fut or (self._read_fut is None and
Expand All @@ -197,7 +225,7 @@ def _loop_reading(self, fut=None):
return

# reschedule a new read
self._read_fut = self._loop._proactor.recv(self._sock, 4096)
self._read_fut = self._loop._proactor.recv(self._sock, 32768)
except ConnectionAbortedError as exc:
if not self._closing:
self._fatal_error(exc, 'Fatal read error on pipe transport')
Expand All @@ -216,12 +244,81 @@ def _loop_reading(self, fut=None):
finally:
if data:
self._protocol.data_received(data)
elif data is not None:
if self._loop.get_debug():
logger.debug("%r received EOF", self)
keep_open = self._protocol.eof_received()
if not keep_open:
self.close()
elif data == b'':
self._loop_reading__on_eof()

def _loop_reading__get_buffer(self, fut=None):
if self._paused:
return

nbytes = None
if fut is not None:
assert self._read_fut is fut or (self._read_fut is None and
self._closing)
self._read_fut = None
try:
if fut.done():
nbytes = fut.result()
else:
# the future will be replaced by next proactor.recv call
fut.cancel()
except ConnectionAbortedError as exc:
if not self._closing:
self._fatal_error(
exc, 'Fatal read error on pipe transport')
elif self._loop.get_debug():
logger.debug("Read error on pipe transport while closing",
exc_info=True)
except ConnectionResetError as exc:
self._force_close(exc)
except OSError as exc:
self._fatal_error(exc, 'Fatal read error on pipe transport')
except futures.CancelledError:
if not self._closing:
raise

if nbytes is not None:
if nbytes == 0:
# we got end-of-file so no need to reschedule a new read
self._loop_reading__on_eof()
else:
try:
self._protocol.buffer_updated(nbytes)
except Exception as exc:
self._fatal_error(
exc,
'Fatal error: '
'protocol.buffer_updated() call failed.')
return

if self._closing or nbytes == 0:
# since close() has been called we ignore any read data
return

try:
buf = self._protocol.get_buffer()
except Exception as exc:
self._fatal_error(
exc, 'Fatal error: protocol.get_buffer() call failed.')
return

try:
# schedule a new read
self._read_fut = self._loop._proactor.recv_into(self._sock, buf)
self._read_fut.add_done_callback(self._loop_reading)
except ConnectionAbortedError as exc:
if not self._closing:
self._fatal_error(exc, 'Fatal read error on pipe transport')
elif self._loop.get_debug():
logger.debug("Read error on pipe transport while closing",
exc_info=True)
except ConnectionResetError as exc:
self._force_close(exc)
except OSError as exc:
self._fatal_error(exc, 'Fatal read error on pipe transport')
except futures.CancelledError:
if not self._closing:
raise


class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
Expand Down
57 changes: 56 additions & 1 deletion Lib/asyncio/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

__all__ = (
'BaseProtocol', 'Protocol', 'DatagramProtocol',
'SubprocessProtocol',
'SubprocessProtocol', 'BufferedProtocol',
)


Expand Down Expand Up @@ -102,6 +102,57 @@ def eof_received(self):
"""


class BufferedProtocol(BaseProtocol):
"""Interface for stream protocol with manual buffer control.

Important: this has been been added to asyncio in Python 3.7
*on a provisional basis*! Treat it as an experimental API that
might be changed or removed in Python 3.8.

Event methods, such as `create_server` and `create_connection`,
accept factories that return protocols that implement this interface.

The idea of BufferedProtocol is that it allows to manually allocate
and control the receive buffer. Event loops can then use the buffer
provided by the protocol to avoid unnecessary data copies. This
can result in noticeable performance improvement for protocols that
receive big amounts of data. Sophisticated protocols can allocate
the buffer only once at creation time.

State machine of calls:

start -> CM [-> GB [-> BU?]]* [-> ER?] -> CL -> end

* CM: connection_made()
* GB: get_buffer()
* BU: buffer_updated()
* ER: eof_received()
* CL: connection_lost()
"""

def get_buffer(self):
"""Called to allocate a new receive buffer.

Must return an object that implements the
:ref:`buffer protocol <bufferobjects>`.
"""

def buffer_updated(self, nbytes):
"""Called when the buffer was updated with the received data.

*nbytes* is the total number of bytes that were written to
the buffer.
"""

def eof_received(self):
"""Called when the other end calls write_eof() or equivalent.

If this returns a false value (including None), the transport
will close itself. If it returns a true value, closing the
transport is up to the protocol.
"""


class DatagramProtocol(BaseProtocol):
"""Interface for datagram protocol."""

Expand Down Expand Up @@ -134,3 +185,7 @@ def pipe_connection_lost(self, fd, exc):

def process_exited(self):
"""Called when subprocess has exited."""


def _is_buffered_protocol(proto):
return hasattr(proto, 'get_buffer') and not hasattr(proto, 'data_received')
Loading