Skip to content

Commit

Permalink
use message queue for abort_queues
Browse files Browse the repository at this point in the history
ensures queued messages are aborted,
rather than racing with coroutine processing,
which would complete too soon during asynchronous executions
  • Loading branch information
minrk committed Feb 3, 2022
1 parent ef84cff commit 23c3709
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 11 deletions.
26 changes: 22 additions & 4 deletions ipykernel/kernelbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ async def execute_request(self, stream, ident, parent):
self.log.debug("%s", reply_msg)

if not silent and reply_msg['content']['status'] == 'error' and stop_on_error:
await self._abort_queues()
self._abort_queues()

def do_execute(self, code, silent, store_history=True,
user_expressions=None, allow_stdin=False):
Expand Down Expand Up @@ -974,13 +974,31 @@ def _topic(self, topic):

_aborting = Bool(False)

async def _abort_queues(self):
self.shell_stream.flush()
def _abort_queues(self):
# while this flag is true,
# execute requests will be aborted
self._aborting = True
self.log.info("Aborting queue")

# flush streams, so all currently waiting messages
# are added to the queue
self.shell_stream.flush()

# Callback to signal that we are done aborting
def stop_aborting():
self.log.info("Finishing abort")
self._aborting = False
asyncio.get_event_loop().call_later(self.stop_on_error_timeout, stop_aborting)

# put the stop-aborting event on the message queue
# so that all messages already waiting in the queue are aborted
# before we reset the flag
schedule_stop_aborting = partial(self.schedule_dispatch, stop_aborting)

# if we have a delay, give messages this long to arrive on the queue
# before we stop aborting requests
asyncio.get_event_loop().call_later(
self.stop_on_error_timeout, schedule_stop_aborting
)

def _send_abort_reply(self, stream, msg, idents):
"""Send a reply to an aborted request"""
Expand Down
22 changes: 15 additions & 7 deletions ipykernel/tests/test_message_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,15 +341,23 @@ def test_execute_stop_on_error():
"""execute request should not abort execution queue with stop_on_error False"""
flush_channels()

fail = '\n'.join([
# sleep to ensure subsequent message is waiting in the queue to be aborted
'import time',
'time.sleep(0.5)',
'raise ValueError',
])
fail = "\n".join(
[
# sleep to ensure subsequent message is waiting in the queue to be aborted
# async sleep to ensure coroutines are processing while this happens
"import asyncio",
"await asyncio.sleep(1)",
"raise ValueError()",
]
)
KC.execute(code=fail)
KC.execute(code='print("Hello")')
KC.get_shell_msg(timeout=TIMEOUT)
KC.execute(code='print("world")')
reply = KC.get_shell_msg(timeout=TIMEOUT)
print(reply)
reply = KC.get_shell_msg(timeout=TIMEOUT)
assert reply["content"]["status"] == "aborted"
# second message, too
reply = KC.get_shell_msg(timeout=TIMEOUT)
assert reply['content']['status'] == 'aborted'

Expand Down

0 comments on commit 23c3709

Please sign in to comment.