Skip to content

Commit db498e0

Browse files
sonthonaxrkdavidbrochart
authored andcommitted
Issue #12786: Create hook for dispatching messages out of order
1 parent 22bd412 commit db498e0

File tree

2 files changed

+30
-13
lines changed

2 files changed

+30
-13
lines changed

ipykernel/inprocess/client.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,10 +173,8 @@ def _dispatch_to_kernel(self, msg):
173173
raise RuntimeError('Cannot send request. No kernel exists.')
174174

175175
stream = kernel.shell_stream
176-
self.session.send(stream, msg)
177-
msg_parts = stream.recv_multipart()
178176
loop = asyncio.get_event_loop()
179-
loop.run_until_complete(kernel.dispatch_shell(msg_parts))
177+
loop.run_until_complete(kernel.dispatch_shell(msg))
180178
idents, reply_msg = self.session.recv(stream, copy=False)
181179
self.shell_channel.call_handlers_later(reply_msg)
182180

ipykernel/kernelbase.py

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -302,18 +302,14 @@ def should_handle(self, stream, msg, idents):
302302
return False
303303
return True
304304

305-
async def dispatch_shell(self, msg):
305+
async def dispatch_shell(self, msg, idents=None):
306306
"""dispatch shell requests"""
307307

308308
# flush control queue before handling shell requests
309309
await self._flush_control_queue()
310310

311-
idents, msg = self.session.feed_identities(msg, copy=False)
312-
try:
313-
msg = self.session.deserialize(msg, content=True, copy=False)
314-
except Exception:
315-
self.log.error("Invalid Message", exc_info=True)
316-
return
311+
if idents is None:
312+
idents = []
317313

318314
# Set the parent message for side effects.
319315
self.set_parent(idents, msg, channel='shell')
@@ -467,15 +463,38 @@ async def dispatch_queue(self):
467463
def _message_counter_default(self):
468464
return itertools.count()
469465

470-
def schedule_dispatch(self, dispatch, *args):
466+
def should_dispatch_immediately(self, msg):
467+
"""
468+
This provides a hook for dispatching incoming messages
469+
from the frontend immediately, and out of order.
470+
471+
It could be used to allow asynchronous messages from
472+
GUIs to be processed.
473+
"""
474+
return False
475+
476+
def schedule_dispatch(self, msg, dispatch):
471477
"""schedule a message for dispatch"""
478+
479+
idents, msg = self.session.feed_identities(msg, copy=False)
480+
try:
481+
msg = self.session.deserialize(msg, content=True, copy=False)
482+
except:
483+
self.log.error("Invalid shell message", exc_info=True)
484+
return
485+
486+
new_args = (msg, idents)
487+
488+
if self.should_dispatch_immediately(msg):
489+
return self.io_loop.add_callback(dispatch, *new_args)
490+
472491
idx = next(self._message_counter)
473492

474493
self.msg_queue.put_nowait(
475494
(
476495
idx,
477496
dispatch,
478-
args,
497+
new_args,
479498
)
480499
)
481500
# ensure the eventloop wakes up
@@ -499,7 +518,7 @@ def start(self):
499518
self.shell_stream.on_recv(
500519
partial(
501520
self.schedule_dispatch,
502-
self.dispatch_shell,
521+
dispatch=self.dispatch_shell,
503522
),
504523
copy=False,
505524
)

0 commit comments

Comments
 (0)