Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make concurrent connections respect limits #581

Merged
merged 2 commits into from
Nov 1, 2015
Merged

Conversation

jpwatts
Copy link
Contributor

@jpwatts jpwatts commented Oct 22, 2015

When I tried to use aiohttp.ClientSession to make many concurrent requests to the same server, I found that I ended up with more connections than I expected.

Using the current master branch, the script below makes 3 connections for 3 requests, despite setting a limit of 1 on the connector. The problem seems to be that connections aren't counted against the limit until after the underlying trasport has been created. This creates a race condition where connect yields from _create_connection before recording anywhere that one of the available connections has been consumed. Subsequent calls to connect that begin before the earlier _create_connnection call has returned are able to create an unlimited number of connections.

# This script should make only 1 connection, but it actually makes 3.

import asyncio
import logging

import aiohttp


logger = logging.getLogger(__name__)


class LoggingTCPConnector(aiohttp.TCPConnector):
    async def _create_connection(self, req):
        try:
            connection_id = self._connection_id
        except AttributeError:
            connection_id = 0
        self._connection_id = connection_id + 1
        logger.debug("CREATING CONNECTION %s", connection_id)
        transport, protocol = await super()._create_connection(req)
        transport.connection_id = connection_id
        logger.info("CREATED CONNECTION %s", connection_id)
        return transport, protocol

    def _release(self, key, req, transport, protocol, *, should_close=False):
        connection_id = transport.connection_id
        logger.debug("RELEASING CONNECTION %s", connection_id)
        super()._release(key, req, transport, protocol, should_close=False)
        logger.info("RELEASED CONNECTION %s", connection_id)


async def make_many_requests(url, num_connections, num_requests):
    http = aiohttp.ClientSession(
        connector=LoggingTCPConnector(limit=num_connections),
    )

    async def make_one_request(request_id):
        logger.debug("MAKING REQUEST %s", request_id)
        response = await http.request("GET", url)
        await response.release()
        logger.info("MADE REQUEST %s", request_id)

    with http:
        tasks = [
            asyncio.ensure_future(make_one_request(request_id))
            for request_id in range(num_requests)
        ]
        await asyncio.wait(tasks)


logging.basicConfig(level=logging.INFO)
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(
        make_many_requests("http://example.com/", 1, 3)
    )
except KeyboardInterrupt:
    loop.stop()
finally:
    loop.close()

This pull request includes a new test that fails on the current master branch, but passes with my changes to the connector. I'm also including the first version of the test that I wrote below, because I was surprised when it actually passed on current master. I want to highlight the use of a mock that returns a done future in place of a coroutine, in this case _create_connection. I copied this pattern from test_connect_with_limit and it broke my test because while a function that returns a done future can be yielded from as if it were a coroutine, it never actually yields control back to the event loop. The result in this case was that all tasks ran sequentially rather than concurrently as they would in real use. I replaced the mock with a real coroutine and the test failed as expected. I mention all this because I'm concerned about this pattern potentially masking other problems elsewhere in the tests.

# This test passes, but it shouldn't.

def test_connect_with_limit_concurrent(self):

    @asyncio.coroutine
    def go():
        tr, proto = unittest.mock.Mock(), unittest.mock.Mock()
        proto.is_connected.return_value = True

        class Req:
            host = 'host'
            port = 80
            ssl = False
            response = unittest.mock.Mock(_should_close=False)

        conn = aiohttp.BaseConnector(loop=self.loop, limit=1)
        conn._create_connection = unittest.mock.Mock()
        conn._create_connection.return_value = asyncio.Future(
            loop=self.loop)
        conn._create_connection.return_value.set_result((tr, proto))

        @asyncio.coroutine
        def f():
            connection = yield from conn.connect(Req())
            connection.release()

        tasks = [asyncio.async(f(), loop=self.loop) for i in range(10)]
        yield from asyncio.wait(tasks, loop=self.loop)
        self.assertEqual(1, conn._create_connection.call_count)

    self.loop.run_until_complete(go())

# The limit defines the maximum number of concurrent connections
# for a key. Waiters must be counted against the limit, even before
# the underlying connection is created.
available = limit - len(waiters)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect you should respect self._acquired too.

@jpwatts
Copy link
Contributor Author

jpwatts commented Oct 29, 2015

@asvetlov I updated the check for available connections based on your feedback.

My original test didn't catch the problem because I was only checking the case where all connections were requested up front. I've now updated it to make more than limit connections up front and then also follow up with more connections after the initial ones are released.

Sorry it took me a while to get back to this. I had prepared the changes, but then our new baby decided it was time to arrive. I got a little distracted for a few days. :-)

@asvetlov
Copy link
Member

asvetlov commented Nov 1, 2015

I wish happy long life for your baby.

available = limit - len(waiters) - len(self._acquired[key])

# Don't wait if there are connections available.
if available > 0:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make short-circuit here?
If no available connections then create future and wait for it, pass the next lines otherwise.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind.

@asvetlov asvetlov merged commit 2c4b1f0 into aio-libs:master Nov 1, 2015
@asvetlov
Copy link
Member

asvetlov commented Nov 1, 2015

Thanks a lot!

@lock
Copy link

lock bot commented Oct 29, 2019

This thread has been automatically locked since there has not been
any recent activity after it was closed. Please open a new issue for
related bugs.

If you feel like there's important points made in this discussion,
please include those exceprts into that new issue.

@lock lock bot added the outdated label Oct 29, 2019
@lock lock bot locked as resolved and limited conversation to collaborators Oct 29, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants