Skip to content

bpo-36802: Drop awrite()/aclose(), support await write() and await close() instead #13099

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
May 9, 2019
Merged
90 changes: 59 additions & 31 deletions Doc/library/asyncio-stream.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ streams::
'127.0.0.1', 8888)

print(f'Send: {message!r}')
await writer.awrite(message.encode())
await writer.write(message.encode())

data = await reader.read(100)
print(f'Received: {data.decode()!r}')

print('Close the connection')
await writer.aclose()
await writer.close()

asyncio.run(tcp_echo_client('Hello World!'))

Expand Down Expand Up @@ -226,23 +226,70 @@ StreamWriter
directly; use :func:`open_connection` and :func:`start_server`
instead.

.. coroutinemethod:: awrite(data)
.. method:: write(data)

The method attempts to write the *data* to the underlying socket immediately.
If that fails, the data is queued in an internal write buffer until it can be
sent.

Starting with Python 3.8, it is possible to directly await on the `write()`
method::

await stream.write(data)

The ``await`` pauses the current coroutine until the data is written to the
socket.

Below is an equivalent code that works with Python <= 3.7::

stream.write(data)
await stream.drain()

.. versionchanged:: 3.8
Support ``await stream.write(...)`` syntax.

.. method:: writelines(data)

The method writes a list (or any iterable) of bytes to the underlying socket
immediately.
If that fails, the data is queued in an internal write buffer until it can be
sent.

Starting with Python 3.8, it is possible to directly await on the `write()`
method::

await stream.writelines(lines)

The ``await`` pauses the current coroutine until the data is written to the
socket.

Below is an equivalent code that works with Python <= 3.7::

Write *data* to the stream.
stream.writelines(lines)
await stream.drain()

The method respects flow control, execution is paused if the write
buffer reaches the high watermark.
.. versionchanged:: 3.8
Support ``await stream.writelines()`` syntax.

.. versionadded:: 3.8
.. method:: close()

The method closes the stream and the underlying socket.

Starting with Python 3.8, it is possible to directly await on the `close()`
method::

await stream.close()

.. coroutinemethod:: aclose()
The ``await`` pauses the current coroutine until the stream and the underlying
socket are closed (and SSL shutdown is performed for a secure connection).

Close the stream.
Below is an equivalent code that works with Python <= 3.7::

Wait until all closing actions are complete, e.g. SSL shutdown for
secure sockets.
stream.close()
await stream.wait_closed()

.. versionadded:: 3.8
.. versionchanged:: 3.8
Support ``await stream.close()`` syntax.

.. method:: can_write_eof()

Expand All @@ -263,21 +310,6 @@ StreamWriter
Access optional transport information; see
:meth:`BaseTransport.get_extra_info` for details.

.. method:: write(data)

Write *data* to the stream.

This method is not subject to flow control. Calls to ``write()`` should
be followed by :meth:`drain`. The :meth:`awrite` method is a
recommended alternative the applies flow control automatically.

.. method:: writelines(data)

Write a list (or any iterable) of bytes to the stream.

This method is not subject to flow control. Calls to ``writelines()``
should be followed by :meth:`drain`.

.. coroutinemethod:: drain()

Wait until it is appropriate to resume writing to the stream.
Expand All @@ -293,10 +325,6 @@ StreamWriter
be resumed. When there is nothing to wait for, the :meth:`drain`
returns immediately.

.. method:: close()

Close the stream.

.. method:: is_closing()

Return ``True`` if the stream is closed or in the process of
Expand Down
35 changes: 27 additions & 8 deletions Lib/asyncio/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ def __init__(self, transport, protocol, reader, loop,
assert reader is None or isinstance(reader, StreamReader)
self._reader = reader
self._loop = loop
self._complete_fut = self._loop.create_future()
self._complete_fut.set_result(None)

def __repr__(self):
info = [self.__class__.__name__, f'transport={self._transport!r}']
Expand All @@ -365,9 +367,33 @@ def transport(self):

def write(self, data):
self._transport.write(data)
return self._fast_drain()

def writelines(self, data):
self._transport.writelines(data)
return self._fast_drain()

def _fast_drain(self):
# The helper tries to use fast-path to return already existing complete future
# object if underlying transport is not paused and actual waiting for writing
# resume is not needed
if self._reader is not None:
# this branch will be simplified after merging reader with writer
exc = self._reader.exception()
if exc is not None:
fut = self._loop.create_future()
fut.set_exception(exc)
return fut
if not self._transport.is_closing():
if self._protocol._connection_lost:
fut = self._loop.create_future()
fut.set_exception(ConnectionResetError('Connection lost'))
return fut
if not self._protocol._paused:
# fast path, the stream is not paused
# no need to wait for resume signal
return self._complete_fut
return self._loop.create_task(self.drain())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is to avoid creating task in majority of the cases, e.g.:

In the stream __init__:

   self._nop = self.loop.create_future()
   self._nop.set_result(None)

and then in _fast_drain:

   if not self._paused:
        return self._nop

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I do almost the same a few lines above.
There is no writer._paused attribute but writer._protocol._paused one (which has undefined state if the protocol is closing).
That's why I have slightly more comlex logic that you mentioned in a comment but the functionality is the same.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see, never mind. Could you please add a comment before return self._complete_fut that it's a "fast path"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


def write_eof(self):
return self._transport.write_eof()
Expand All @@ -377,6 +403,7 @@ def can_write_eof(self):

def close(self):
self._transport.close()
return self._protocol._get_close_waiter(self)

def is_closing(self):
return self._transport.is_closing()
Expand Down Expand Up @@ -408,14 +435,6 @@ async def drain(self):
raise ConnectionResetError('Connection lost')
await self._protocol._drain_helper()

async def aclose(self):
self.close()
await self.wait_closed()

async def awrite(self, data):
self.write(data)
await self.drain()


class StreamReader:

Expand Down
42 changes: 30 additions & 12 deletions Lib/test/test_asyncio/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -1035,24 +1035,42 @@ def test_del_stream_before_connection_made(self):
messages[0]['message'])

def test_async_writer_api(self):
async def inner(httpd):
rd, wr = await asyncio.open_connection(*httpd.address)

await wr.write(b'GET / HTTP/1.0\r\n\r\n')
data = await rd.readline()
self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
data = await rd.read()
self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
await wr.close()

messages = []
self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))

with test_utils.run_test_server() as httpd:
rd, wr = self.loop.run_until_complete(
asyncio.open_connection(*httpd.address,
loop=self.loop))
self.loop.run_until_complete(inner(httpd))

f = wr.awrite(b'GET / HTTP/1.0\r\n\r\n')
self.loop.run_until_complete(f)
f = rd.readline()
data = self.loop.run_until_complete(f)
self.assertEqual(messages, [])

def test_async_writer_api(self):
async def inner(httpd):
rd, wr = await asyncio.open_connection(*httpd.address)

await wr.write(b'GET / HTTP/1.0\r\n\r\n')
data = await rd.readline()
self.assertEqual(data, b'HTTP/1.0 200 OK\r\n')
f = rd.read()
data = self.loop.run_until_complete(f)
data = await rd.read()
self.assertTrue(data.endswith(b'\r\n\r\nTest message'))
f = wr.aclose()
self.loop.run_until_complete(f)
wr.close()
with self.assertRaises(ConnectionResetError):
await wr.write(b'data')

messages = []
self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))

with test_utils.run_test_server() as httpd:
self.loop.run_until_complete(inner(httpd))

self.assertEqual(messages, [])

Expand All @@ -1066,7 +1084,7 @@ def test_eof_feed_when_closing_writer(self):
asyncio.open_connection(*httpd.address,
loop=self.loop))

f = wr.aclose()
f = wr.close()
self.loop.run_until_complete(f)
assert rd.at_eof()
f = rd.read()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Provide both sync and async calls for StreamWriter.write() and
StreamWriter.close()