Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websocket reading message loop raises low-level CanceledError when connection is closed unexpectedly #2061

Open
kr41 opened this issue Jul 6, 2017 · 4 comments

Comments

@kr41
Copy link

kr41 commented Jul 6, 2017

Actual behaviour

Reading message loop async for msg in ws: raises low-level concurrent.futures._base.CancelledError when connection is closed unexpectedly.

Expected behaviour

Expected to get message with type aiohtto.http_websocket.WSMsgType.ERROR, or silently stop the loop, or at least aiohtto.http_websocket.WebSocketError.

Steps to reproduce

Run the following two scripts server.py and client.py, then stop client.py by Ctrl+C.

server.py

import logging

from aiohttp import web


logger = logging.getLogger(__name__)


async def index(request):
    ws = web.WebSocketResponse()
    request.app['websockets'].add(ws)

    try:
        await ws.prepare(request)
        logger.debug('Connected')
        async for msg in ws:
            logger.info('Received: %r', msg.data)
    except Exception:
        logger.exception('Error')
    logger.debug('Disconnected')

    request.app['websockets'].discard(ws)
    return ws


async def on_shutdown(app):
    for ws in app['websockets']:
        await ws.close()
    app['websockets'].clear()


def main():
    logging.basicConfig(level=logging.DEBUG)

    app = web.Application()
    app['websockets'] = set()
    app.router.add_get('/', index)
    app.on_shutdown.append(on_shutdown)

    web.run_app(app, host='127.0.0.1', port=9000)


if __name__ == '__main__':
    main()

client.py

import asyncio

import aiohttp


async def communicate(loop):
    async with aiohttp.ClientSession(loop=loop) as session:
        async with session.ws_connect('http://127.0.0.1:9000') as ws:
            while True:
                await ws.send_str('Hello')
                await asyncio.sleep(1, loop=loop)


def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(communicate(loop))


if __name__ == '__main__':
    main()

Log output of server.py

$ python server.py 
DEBUG:asyncio:Using selector: EpollSelector
======== Running on http://127.0.0.1:9000 ========
(Press CTRL+C to quit)
DEBUG:__main__:Connected
INFO:__main__:Received: 'Hello'
INFO:__main__:Received: 'Hello'
INFO:__main__:Received: 'Hello'
INFO:__main__:Received: 'Hello'
INFO:__main__:Received: 'Hello'
INFO:__main__:Received: 'Hello'
INFO:__main__:Received: 'Hello'
INFO:__main__:Received: 'Hello'
INFO:__main__:Received: 'Hello'
ERROR:__main__:Error
Traceback (most recent call last):
  File "server.py", line 16, in index
    async for msg in ws:
  File "/home/vagrant/project/workspace/pyenv_dev/lib64/python3.5/site-packages/aiohttp/web_ws.py", line 343, in __anext__
    msg = yield from self.receive()
  File "/home/vagrant/project/workspace/pyenv_dev/lib64/python3.5/site-packages/aiohttp/web_ws.py", line 273, in receive
    msg = yield from self._reader.read()
  File "/home/vagrant/project/workspace/pyenv_dev/lib64/python3.5/site-packages/aiohttp/streams.py", line 627, in read
    return (yield from super().read())
  File "/home/vagrant/project/workspace/pyenv_dev/lib64/python3.5/site-packages/aiohttp/streams.py", line 509, in read
    yield from self._waiter
  File "/usr/lib64/python3.5/asyncio/futures.py", line 380, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/usr/lib64/python3.5/asyncio/tasks.py", line 304, in _wakeup
    future.result()
  File "/usr/lib64/python3.5/asyncio/futures.py", line 285, in result
    raise CancelledError
concurrent.futures._base.CancelledError
DEBUG:__main__:Disconnected
INFO:aiohttp.access:- - - [06/Jul/2017:11:41:25 +0000] "GET / HTTP/1.1" 101 0 "-" "Python/3.5 aiohttp/2.2.3"

Your environment

OS: CentOS Linux 7
Linux kernel: 3.10.0-514.16.1.el7.x86_64
Python: 3.5.3
aiohttp: 2.2.3

@asvetlov
Copy link
Member

Technically aiohttp creates a task per client request.
On client disconnection the system stops the task ASAP.
The only way to do it is task cancelling (let's assume web handler is waiting response from DB or other service, we want to cancel it too without waiting for explicit operation over connection to websocket client).

Task.cancel() is done by sending asyncio.CancelledError exception, the exception class is derived from standard Exception. This is asyncio behavior, nothing specific to aiohttp itself.

The only thing I could suggest is catching CancelledError in your handler explicitly:

try:
    ...
except asyncio.CancelledError:
    pass
except Exception as exc:
    log(exc)

Or you could just don't catch so broad type like Exception.

@fafhrd91
Copy link
Member

I see two options:

  • do not change anything, CancelledError is normal in async world.
  • catch CancelledError and return closed message. I think, this is better solution for webocket handler.

@s-kostyuk
Copy link

Maybe it's better to introduce a separate ConnectionClosed exception, in the same way as was done in websockets library?

@twisteroidambassador
Copy link

Technically aiohttp creates a task per client request.
On client disconnection the system stops the task ASAP.
The only way to do it is task cancelling (let's assume web handler is waiting response from DB or other service, we want to cancel it too without waiting for explicit operation over connection to websocket client).

I think I have just been bitten by this behavior. I had some code like this:

async def handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    
    async with contextlib.AsyncExitStack() as stack:
        # acquire_resource_X are async context managers
        await stack.enter_async_context(acquire_resource_1())
        await stack.enter_async_context(acquire_resource_2())
        await stack.enter_async_context(acquire_resource_3())
        
        async for msg in ws:
            # do stuff
        
    await ws.close()
    
    return ws

After putting it in production I found that the exiting part of acquire_resource_3() would be silently skipped. More logging revealed that a CancelledError was being raised inside acquire_resource_3. Here's what I think happened:

  • Client closes the WebSocket
  • async for msg in ws loop exits, the AsyncExitStack starts to unwind, the exiting part of acquire_resource_3 starts to execute, hits an await
  • aiohttp cancels the handler task
  • CancelledError is raised inside handler at the current await, which is inside acquire_resource_3, therefore the remaining part of acquire_resource_3 is skipped
  • The exiting part of acquire_resource_2 and acquire_resource_1 still executes normally, since from their perspective they are simply exiting an async context on an exception

This is a really weird problem, particularly because how it breaks the expectation that the exiting part of a context manager will always run. I had to basically shield all the async contexts from cancellation, like this:

async def handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    
    await asyncio.shield(asyncio.ensure_future(actually_do_stuff(ws)))
    
    return ws

async def actually_do_stuff(ws):
    async with contextlib.AsyncExitStack() as stack:
        # acquire_resource_X are async context managers
        await stack.enter_async_context(acquire_resource_1())
        await stack.enter_async_context(acquire_resource_2())
        await stack.enter_async_context(acquire_resource_3())
        
        async for msg in ws:
            # do stuff
        
    await ws.close()

Is there a better way to do this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants