Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

websocket connection is closing. #1768

Open
Earthson opened this issue Mar 29, 2017 · 16 comments
Open

websocket connection is closing. #1768

Earthson opened this issue Mar 29, 2017 · 16 comments

Comments

@Earthson
Copy link

Earthson commented Mar 29, 2017

websocket connection is closing. but no Exception raise

I can't know whether a connection is available?

websocket connection is closing.
socket.send() raised exception.
async def worker(url, msg_queue, loop, on_message=None):
    async with aiohttp.ClientSession(loop=loop) as session,\
               session.ws_connect(url) as ws:
        close_flag = False
        async def bg_sending():
            nonlocal close_flag
            try:
                while True:
                    if close_flag is True:
                        break
                    msg = await msg_queue.get()
                    if isinstance(msg, dict):
                        await ws.send_json(msg)
                    elif msg == "close":
                        print("WS: GOT SELF CLOSE")
                        await ws.close()
                        close_flag = True
                        break
            except Exception as e:
                close_flag = True
                raise e
        async def bg_receive():
            nonlocal close_flag
            try:
                async for msg in ws:
                    if close_flag is True:
                        break
                    if msg.type == aiohttp.WSMsgType.TEXT:
                        if msg.data == 'close':
                            await ws.close()
                            close_flag = True
                            print("WS: ACCEPT CLOSE")
                            _thread.interrupt_main() #kill is not work here
                            break
                        else:
                            if on_message is not None:
                                res = on_message(msg.data)
                                if res is not None:
                                    ws.send_str(res)
                    elif msg.type == aiohttp.WSMsgType.CLOSED:
                        print("WS: CONNECTION IS CLOSED")
                        close_flag = True
                        break
                    elif msg.type == aiohttp.WSMsgType.ERROR:
                        print(f"WS: ERROR:{msg}")
                        close_flag = True
                        raise Exception(str(msg))
            except Exception as e:
                close_flag = True
                raise e
        bg_send_task = asyncio.ensure_future(bg_sending(), loop=loop)
        bg_receive_task = asyncio.ensure_future(bg_receive(), loop=loop)
        while close_flag is False:
            await asyncio.sleep(1)
        if not bg_send_task.done():
            bg_send_task.cancel()
        if not bg_receive_task.done():
            bg_receive_task.cancel()
        if not bg_receive_task.cancelled() and bg_receive_task.exception() is not None:
            print("WS: RECEIVE EXCEPTION")
            raise bg_receive_task.exception()
        if not bg_send_task.cancelled() and bg_send_task.exception() is not None:
            print("WS: SEND EXCEPTION")
            raise bg_send_task.exception()

Expected behaviour

If there is a error, an Exception should be raise.

Actual behaviour

It is a warning? not Exception raised

Steps to reproduce

wait a long time, and websocket send_json failed with this warning

Your environment

Ubuntu 16.04
Anaconda 4.3.1
Python 3.6.1
aiohttp 2.0.4

@samuelcolvin
Copy link
Member

Without looking at the example in detail I suspect you're getting CancelledError which aiohttp is then catching silently.

What's weird and more problematic for me is that (perhaps because it's CancelledError or perhaps because of the way it's raised) any awaited coroutine after it's raised gets cancelled too. This is causing me to leak connections from the asyncpg connection pool, see MagicStack/asyncpg#97.

If you put

try:
    ...
except Exception as e:
    print(e.__class__.__name__, e)
    raise

Around your code you'll see the error.

@fafhrd91
Copy link
Member

maybe we beed to add signal for disconnection or something

@samuelcolvin
Copy link
Member

That would be useful, this confused me for a while and has required a fix in asyncpg.

@fafhrd91
Copy link
Member

fafhrd91 commented Apr 8, 2017

I re-read issue, i need more information. what do you get? warning? error?

@Earthson
Copy link
Author

I can not repreduce this bug now, I've made some change, add ws.closed test

@Earthson
Copy link
Author

@fafhrd91 Now I just test weather the connection is closed silently, I raised my own Exception in this case.

The connection is always exit with error code 258, I've found nothing about it.

@dovahcrow
Copy link

I got this issue as same. Basically when the remote connection is closing, the async for msg in ws will end silently as if you have exhausted the iterable ws. Comparing to another python websocket library "websockets", the async for msg in ws in websockets will raise an ConnectionClosed error if the remote is closing.

@o3o3o
Copy link
Contributor

o3o3o commented Mar 18, 2019

In my case, most causes is closing action (cancel request) by client, raised asyncio.CancelledError Exception. You can catch exception like this:

try:
        async for msgobj in ws:
                  ....
except asyncio.CancelledError:
       print(f"the websocket({ws}) cancelled")
finally:
        await ws.close()

@dovahcrow
Copy link

dovahcrow commented Mar 18, 2019 via email

@dovahcrow
Copy link

dovahcrow commented Mar 18, 2019

I have the code like this:

class A:
    async def _start(self):
        async with self.session.ws_connect("...") as ws:
            async for msg in ws:
                  pass

while True:
        try:
            await self._start()
            logger.warning("Loop exit abnormally")
        except CancelledError:
            logger.warning(f"WS exiting because of cancellation")
            break
        except Exception as e:
            logger.critical(f"WS unrecoverable exception: {type(e)}: {e}")
            traceback.print_exception(*sys.exc_info())
            break

And in log it always complains "Loop exit abnormally"

@o3o3o
Copy link
Contributor

o3o3o commented Mar 18, 2019

Maybe you can try to check more type of wsobj, such as:

 try:
        async for msgobj in ws:
            if msgobj.type == aiohttp.WSMsgType.TEXT:
                if msgobj.data:
                    await conn.handle_msg_here(..)
            elif msgobj.type == aiohttp.WSMsgType.PING:
                ws.pong()
            elif msgobj.type == aiohttp.WSMsgType.CLOSED:
                logging.warn(f"ws: Closed the websocket {ws} ")
                break
            elif msgobj.type == aiohttp.WSMsgType.ERROR:
                break
except asyncio.CancelledError:
       print(f"the websocket({ws}) cancelled")
finally:
        await ws.close()

@dovahcrow
Copy link

Maybe you can try to check more type of wsobj, such as:

 try:
        async for msgobj in ws:
            if msgobj.type == aiohttp.WSMsgType.TEXT:
                if msgobj.data:
                    await conn.handle_msg_here(..)
            elif msgobj.type == aiohttp.WSMsgType.PING:
                ws.pong()
            elif msgobj.type == aiohttp.WSMsgType.CLOSED:
                logging.warn(f"ws: Closed the websocket {ws} ")
                break
            elif msgobj.type == aiohttp.WSMsgType.ERROR:
                break
except asyncio.CancelledError:
       print(f"the websocket({ws}) cancelled")
finally:
        await ws.close()

Interestingly, I have something like

async for msg in ws:
    if msg.type != WSMsgType.TEXT:
        logger.warning(f"Non text ws message: {msg.data}")
        continue

But I never see anything related in the log which suggests the for loop exits silently.

@dovahcrow
Copy link

By looking into the source code, I found this in aiohttp

class ClientWebSocketResponse:
    ...
    async def __anext__(self) -> WSMessage:
            msg = await self.receive()
            if msg.type in (WSMsgType.CLOSE,
                            WSMsgType.CLOSING,
                            WSMsgType.CLOSED):
                raise StopAsyncIteration  # NOQA
            return msg

https://github.com/aio-libs/aiohttp/blob/master/aiohttp/client_ws.py#L295

I think this perfectly solved this problem: aiohttp WON'T inform you about the websocket closing if you are using async for.

@nevelis
Copy link

nevelis commented Jul 19, 2019

The problem for me (and probably in this case) is that you have another thread (bg_sending) that is trying to send over the websocket after the bg_receive function.

The websocket connection is closing. error is actually coming from https://github.com/aio-libs/aiohttp/blob/master/aiohttp/http_websocket.py#L555 - this is happening from bg_sending, which explains why you aren't seeing the exception from within the async for in bg_receive.

It does inform you of the websocket connection closing by virtue of exiting the loop.

@nevelis
Copy link

nevelis commented Jul 19, 2019

I know this is years late and I doubt it's hopeful to the OP but it might be useful to the debugging efforts of others as the other comments on this thread have been for me! :)

Your bg_sending thread MIGHT be throwing the exception on send(), but you're only logging it if not bg_send_task.cancelled(), AND bg_send_task.exception() is not None.

The exception might not be None, however just a few lines before you call bg_send_task.cancel() and cancel it. So this condition will never be true and nothing will be logged.

@louisabraham
Copy link

I think the exception to catch is concurrent.futures.CancelledError, which is different from asyncio.CancelledError.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants