Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent drains #1915

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion aiohttp/http_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import collections
import socket
import zlib
from collections import deque
from urllib.parse import SplitResult

import yarl
Expand Down Expand Up @@ -39,6 +40,8 @@ def __init__(self, protocol, transport, loop):
self._waiters = []
self.available = True
self.transport = transport
self._is_drain = False
self._drain_waiters = deque()

def acquire(self, writer):
if self.available:
Expand Down Expand Up @@ -125,7 +128,18 @@ def drain(self):
yield from w.drain()
"""
if self._protocol.transport is not None:
yield from self._protocol._drain_helper()
if self._is_drain:
fut = asyncio.Future(loop=self._loop)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self._loop.create_future() I guess

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we use same future for all callers?

self._drain_waiters.append(fut)
yield from fut
self._is_drain = True
try:
yield from self._protocol._drain_helper()
finally:
self._is_drain = False
if self._drain_waiters:
fut = self._drain_waiters.popleft()
fut.set_result(None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same "deadlock" issue (a,b) can happen here: if future on line 134 is cancelled then other task that
sets result for next waiter will do nothing and all other futures will be in _drain_waiters forever.

a: bugs.python.org/issue27585
b: python/asyncio#393 and propbably python/cpython#1031

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think cancellation tests should be added as well.



class PayloadWriter(AbstractPayloadWriter):
Expand Down
70 changes: 70 additions & 0 deletions tests/test_http_stream_writer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import socket
from unittest import mock

Expand Down Expand Up @@ -332,3 +333,72 @@ def test_replace_available(loop):
payload2 = stream.replace(payload, PayloadWriter)
assert payload2._transport is transport
assert payload2 not in stream._waiters


@asyncio.coroutine
def test_concurrent_drains(loop):
# high limit for write buffer is about 4 MB
PACKET = b'1234567890' * 1024 * 1024

fut0 = asyncio.Future(loop=loop)
fut1 = asyncio.Future(loop=loop)
fut2 = asyncio.Future(loop=loop)
fut3 = asyncio.Future(loop=loop)

class Proto(asyncio.streams.FlowControlMixin):
def connection_made(self, transport):
super().connection_made(transport)
self.transport = transport

@asyncio.coroutine
def read(reader, writer):
yield from fut1
ret = object()
res = b''
while ret:
ret = yield from reader.read()
res += ret
assert res == PACKET*3
fut2.set_result(None)

server = yield from asyncio.start_server(read, '127.0.0.1', 0, loop=loop)
port = server.sockets[0].getsockname()[1]

tr, pr = yield from loop.create_connection(lambda: Proto(loop=loop),
'127.0.0.1', port)

stream = StreamWriter(pr, tr, loop)
tr.set_write_buffer_limits(1, 1)

started = 0
finished = 0

@asyncio.coroutine
def write():
nonlocal started, finished
started += 1
if started == 3:
fut0.set_result(None)
tr.write(PACKET)
yield from stream.drain()
finished += 1
if finished == 3:
fut3.set_result(None)

tasks = [loop.create_task(write()) for i in range(3)]

yield from fut0
assert started == 3
assert finished == 0
fut1.set_result(None)
yield from fut3
tr.close()
yield from fut2
assert started == 3
assert finished == 3

tr.close()

yield from asyncio.gather(*tasks, loop=loop)
server.close()
yield from server.wait_closed()