-
-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Error while closing a websocket #922
Comments
It is forbidden in aiohttp (and in general too) to tamper with another task's websocket while it is being read. What happens (in your example is):
The best way to kill a task is to # aiohttp_issue.py
import asyncio
import aiohttp.web
async def echo_handler(request):
ws = aiohttp.web.WebSocketResponse()
this_task = asyncio.Task.current_task(loop=request.app.loop)
this_task_id = id(this_task)
# if this is the first ws, I should be killed when other clients disconnect
if 'other_ws' not in request.app:
request.app['other_ws'] = this_task
await ws.prepare(request)
try:
async for msg in ws:
if msg.tp == aiohttp.MsgType.text:
ws.send_str(msg.data)
else:
raise NotImplementedError()
except asyncio.CancelledError:
print('{} cancelled'.format(this_task_id))
# if there is another ws, kill it.
print('{} client disconnected'.format(this_task_id))
if 'other_ws' in request.app and request.app['other_ws'] != this_task:
print('{} killing other_ws'.format(this_task_id))
request.app['other_ws'].cancel()
del request.app['other_ws']
print('{} before close'.format(this_task_id))
try:
await ws.close()
except asyncio.CancelledError:
print('{} cancelled error on close'.format(this_task_id))
else:
print('{} after close'.format(this_task_id))
print('{} this line *is* going to be printed twice'.format(this_task_id))
return ws
def main():
loop = asyncio.get_event_loop()
loop.set_debug(True)
app = aiohttp.web.Application(loop=loop)
app.router.add_route('GET', '/echo', echo_handler)
aiohttp.web.run_app(app, host='localhost', port=8080)
if __name__ == '__main__':
main() |
So basically this should be closed as invalid IMO |
@mpaolini thank you so much for investigation. |
@asvetlov reviewing this issue once again: the only way we now have to programmatically close a websocket from the server side is to call |
@mpaolini are you asking about termination from other task? |
@asvetlov yes exactly, termination from other task is the issue. I don't know if we really need a new method or if |
You should know a task instance for cancelling. |
Thinking more about it, all this added complexity is just because we have the "only reading from handler task is allowed" thing. If that wasn't the case, I could write something like # aiohttp_issue.py
import asyncio
import aiohttp.web
async def echo(ws):
async for msg in ws:
if msg.tp == aiohttp.MsgType.text:
ws.send_str(msg.data)
else:
raise NotImplementedError()
async def echo_handler(request):
ws = aiohttp.web.WebSocketResponse()
this_task = asyncio.Task.current_task(loop=request.app.loop)
this_task_id = id(this_task)
await ws.prepare(request)
echo_coro = echo(ws)
wait_kill_coro = request.app['kill'].wait()
done, pending = await asyncio.wait(
[
echo_coro,
wait_kill_coro
],
return_when=asyncio.FIRST_COMPLETED
)
if echo_coro in done:
print('{} client disconnected'.format(this_task_id))
print('{} killing other_ws'.format(this_task_id))
request.app['kill'].set()
else:
assert wait_kill_coro in done
print('{} received kill signal'.format(this_task_id))
echo_coro.cancel()
try:
await ws.close()
except asyncio.CancelledError:
print('{} cancelled error on close'.format(this_task_id))
else:
print('{} after close'.format(this_task_id))
print('{} this line *is* going to be printed twice'.format(this_task_id))
return ws
def main():
loop = asyncio.get_event_loop()
loop.set_debug(True)
app = aiohttp.web.Application(loop=loop)
app.router.add_route('GET', '/echo', echo_handler)
app['kill'] = asyncio.Event()
aiohttp.web.run_app(app, host='localhost', port=8080)
if __name__ == '__main__':
main() this was my first take at solving this issue but I quickly realized that Is it really that hard to handle reading in a task that is separate from the handler? |
Parallel reads prevention solves philosophical problem. What messages will be consumed by parallel tasks? Random ones!!! |
got your point, makes perfect sense. let's do the I would do it this way: @asyncio.coroutine
def terminate(self):
self._terminating = True
self._cancel_reader_task() and then in the try:
msg = yield from self._reader.read()
except (asyncio.CancelledError, asyncio.TimeoutError):
if self._terminating:
return Message(MsgType.close, None, None)
raise |
with the #1004 pr, you can now do # aiohttp_issue.py
import asyncio
import aiohttp.web
async def echo_handler(request):
ws = aiohttp.web.WebSocketResponse()
request.app['websockets'].add(ws)
this_task_id = id(asyncio.Task.current_task(loop=request.app.loop))
await ws.prepare(request)
async for msg in ws:
if msg.tp == aiohttp.MsgType.text:
ws.send_str(msg.data)
else:
raise NotImplementedError()
if ws.terminating:
print('{} ws externally terminated'.format(this_task_id))
else:
print('{} client disconnected'.format(this_task_id))
while request.app['websockets']:
other_ws = request.app['websockets'].pop()
if other_ws is not ws:
print('{} killing other ws'.format(this_task_id))
other_ws.terminate()
print('{} before close'.format(this_task_id))
await ws.close()
print('{} after close'.format(this_task_id))
print('{} this line *is* going to be printed twice'.format(this_task_id))
return ws
def main():
loop = asyncio.get_event_loop()
loop.set_debug(True)
app = aiohttp.web.Application(loop=loop)
app.router.add_route('GET', '/echo', echo_handler)
app['websockets'] = set()
aiohttp.web.run_app(app, host='localhost', port=8080)
if __name__ == '__main__':
main() |
@asvetlov this one we can close (again) I think |
This is the updated version for this # aiohttp_issue.py
import asyncio
import aiohttp.web
async def echo_handler(request):
ws = aiohttp.web.WebSocketResponse()
this_task = asyncio.Task.current_task(loop=request.app.loop)
this_task_id = id(this_task)
# if this is the first ws, I should be killed when other clients disconnect
if 'other_ws' not in request.app:
request.app['other_ws'] = this_task
await ws.prepare(request)
try:
async for msg in ws:
if msg.tp == aiohttp.MsgType.text:
ws.send_str(msg.data)
else:
raise NotImplementedError()
except asyncio.CancelledError:
print('{} cancelled'.format(this_task_id))
else:
print('{} client disconnected'.format(this_task_id))
if 'other_ws' in request.app and request.app['other_ws'] != this_task:
print('{} killing other_ws'.format(this_task_id))
request.app['other_ws'].cancel()
del request.app['other_ws']
if not ws.closed:
print('{} before close'.format(this_task_id))
try:
await ws.close()
except asyncio.CancelledError:
print('{} cancelled error on close'.format(this_task_id))
else:
print('{} after close'.format(this_task_id))
else:
print('{} already closed'.format(this_task_id))
print('{} this line *is* going to be printed twice'.format(this_task_id))
return ws
def main():
loop = asyncio.get_event_loop()
loop.set_debug(True)
app = aiohttp.web.Application(loop=loop)
app.router.add_route('GET', '/echo', echo_handler)
aiohttp.web.run_app(app, host='localhost', port=8080)
if __name__ == '__main__':
main() |
Long story short
I am trying to implement an endpoint that forwards websocket messages between two client. It is necessary that when a client closes the connection, the aiohttp server also close the websocket with the second client.
The problem is my program never get through the
await other_ws.close()
step.Expected behaviour
Actual behaviour
Furthermore, I get the following error:
Steps to reproduce
Run the following file:
Now launch two websocket client on that endpoint, and close the connection on the second one (CTRL+D). You can use a CLI client like https://github.com/yhat/ws.
The python server should show the following error:
Your environment
Mac OS 10.11, python 3.5.1, aiohttp 0.21.6
The text was updated successfully, but these errors were encountered: