Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make concurrent connections respect limits #581

Merged
merged 2 commits into from
Nov 1, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions aiohttp/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,12 +271,24 @@ def connect(self, req):
"""Get from pool or create new connection."""
key = (req.host, req.port, req.ssl)

# use short-circuit
if self._limit is not None:
while len(self._acquired[key]) >= self._limit:
fut = asyncio.Future(loop=self._loop)
self._waiters[key].append(fut)
yield from fut
limit = self._limit
if limit is not None:
fut = asyncio.Future(loop=self._loop)
waiters = self._waiters[key]

# The limit defines the maximum number of concurrent connections
# for a key. Waiters must be counted against the limit, even before
# the underlying connection is created.
available = limit - len(waiters) - len(self._acquired[key])

# Don't wait if there are connections available.
if available > 0:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make short-circuit here?
If no available connections then create future and wait for it, pass the next lines otherwise.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind.

fut.set_result(None)

# This connection will now count towards the limit.
waiters.append(fut)

yield from fut

transport, proto = self._get(key)
if transport is None:
Expand Down
69 changes: 69 additions & 0 deletions tests/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,75 @@ class Req:

self.loop.run_until_complete(go())

def test_connect_with_limit_concurrent(self):

@asyncio.coroutine
def go():
proto = unittest.mock.Mock()
proto.is_connected.return_value = True

class Req:
host = 'host'
port = 80
ssl = False
response = unittest.mock.Mock(_should_close=False)

max_connections = 2
num_connections = 0

conn = aiohttp.BaseConnector(limit=max_connections, loop=self.loop)

# Use a real coroutine for _create_connection; a mock would mask
# problems that only happen when the method yields.

@asyncio.coroutine
def create_connection(req):
nonlocal num_connections
num_connections += 1
yield from asyncio.sleep(0, loop=self.loop)

# Make a new transport mock each time because acquired
# transports are stored in a set. Reusing the same object
# messes with the count.
tr = unittest.mock.Mock()

return tr, proto

conn._create_connection = create_connection

# Simulate something like a crawler. It opens a connection, does
# something with it, closes it, then creates tasks that make more
# connections and waits for them to finish. The crawler is started
# with multiple concurrent requests and stops when it hits a
# predefined maximum number of requests.

max_requests = 10
num_requests = 0
start_requests = max_connections + 1

@asyncio.coroutine
def f(start=True):
nonlocal num_requests
if num_requests == max_requests:
return
num_requests += 1
if not start:
connection = yield from conn.connect(Req())
yield from asyncio.sleep(0, loop=self.loop)
connection.release()
tasks = [
asyncio.async(f(start=False), loop=self.loop)
for i in range(start_requests)
]
yield from asyncio.wait(tasks, loop=self.loop)

yield from f()
conn.close()

self.assertEqual(max_connections, num_connections)

self.loop.run_until_complete(go())

def test_close_with_acquired_connection(self):

@asyncio.coroutine
Expand Down