Skip to content

Commit a4cda09

Browse files
author
Rollo Konig Brock
committed
Issue #12786: Create hook for dispatching messages out of order
1 parent 02a25a3 commit a4cda09

File tree

2 files changed

+43
-27
lines changed

2 files changed

+43
-27
lines changed

ipykernel/inprocess/client.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -172,10 +172,7 @@ def _dispatch_to_kernel(self, msg):
172172
raise RuntimeError('Cannot send request. No kernel exists.')
173173

174174
stream = DummySocket()
175-
self.session.send(stream, msg)
176-
msg_parts = stream.recv_multipart()
177-
kernel.dispatch_shell(stream, msg_parts)
178-
175+
kernel.dispatch_shell(msg, stream=stream)
179176
idents, reply_msg = self.session.recv(stream, copy=False)
180177
self.shell_channel.call_handlers_later(reply_msg)
181178

ipykernel/kernelbase.py

Lines changed: 42 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@
3838

3939
from ._version import kernel_protocol_version
4040

41-
CONTROL_PRIORITY = 1
42-
SHELL_PRIORITY = 10
41+
CONTROL_PRIORITY = (1, 'control')
42+
SHELL_PRIORITY = (10, 'shell')
4343

4444

4545
class Kernel(SingletonConfigurable):
@@ -167,14 +167,10 @@ def __init__(self, **kwargs):
167167
self.control_handlers[msg_type] = getattr(self, msg_type)
168168

169169
@gen.coroutine
170-
def dispatch_control(self, msg):
170+
def dispatch_control(self, msg, idents=None, stream=None):
171171
"""dispatch control requests"""
172-
idents, msg = self.session.feed_identities(msg, copy=False)
173-
try:
174-
msg = self.session.deserialize(msg, content=True, copy=False)
175-
except:
176-
self.log.error("Invalid Control Message", exc_info=True)
177-
return
172+
if idents is None:
173+
idents = []
178174

179175
self.log.debug("Control received: %s", msg)
180176

@@ -215,14 +211,10 @@ def should_handle(self, stream, msg, idents):
215211
return True
216212

217213
@gen.coroutine
218-
def dispatch_shell(self, stream, msg):
214+
def dispatch_shell(self, msg, idents=None, stream=None):
219215
"""dispatch shell requests"""
220-
idents, msg = self.session.feed_identities(msg, copy=False)
221-
try:
222-
msg = self.session.deserialize(msg, content=True, copy=False)
223-
except:
224-
self.log.error("Invalid Message", exc_info=True)
225-
return
216+
if idents is None:
217+
idents = []
226218

227219
# Set the parent message for side effects.
228220
self.set_parent(idents, msg)
@@ -385,16 +377,43 @@ def dispatch_queue(self):
385377
def _message_counter_default(self):
386378
return itertools.count()
387379

388-
def schedule_dispatch(self, priority, dispatch, *args):
380+
def should_dispatch_immediately(
381+
self, msg, idents, stream, priority, dispatch
382+
):
383+
"""
384+
This provides a hook for dispatching incoming messages
385+
from the frontend immediately, and out of order.
386+
387+
It could be used to allow asynchronous messages from
388+
GUIs to be processed.
389+
"""
390+
return False
391+
392+
def schedule_dispatch(self, msg, priority, dispatch, stream=None):
389393
"""schedule a message for dispatch"""
394+
395+
idents, msg = self.session.feed_identities(msg, copy=False)
396+
try:
397+
msg = self.session.deserialize(msg, content=True, copy=False)
398+
except:
399+
self.log.error("Invalid %s Message", priority[1], exc_info=True)
400+
return
401+
402+
new_args = (msg, idents, stream)
403+
404+
if self.should_dispatch_immediately(
405+
msg, idents, stream, priority, dispatch, stream
406+
):
407+
return self.io_loop.add_callback(dispatch, *new_args)
408+
390409
idx = next(self._message_counter)
391410

392411
self.msg_queue.put_nowait(
393412
(
394413
priority,
395414
idx,
396415
dispatch,
397-
args,
416+
new_args,
398417
)
399418
)
400419
# ensure the eventloop wakes up
@@ -411,8 +430,8 @@ def start(self):
411430
self.control_stream.on_recv(
412431
partial(
413432
self.schedule_dispatch,
414-
CONTROL_PRIORITY,
415-
self.dispatch_control,
433+
priority=CONTROL_PRIORITY,
434+
dispatch=self.dispatch_control,
416435
),
417436
copy=False,
418437
)
@@ -423,9 +442,9 @@ def start(self):
423442
s.on_recv(
424443
partial(
425444
self.schedule_dispatch,
426-
SHELL_PRIORITY,
427-
self.dispatch_shell,
428-
s,
445+
priority=SHELL_PRIORITY,
446+
dispatch=self.dispatch_shell,
447+
stream=s,
429448
),
430449
copy=False,
431450
)

0 commit comments

Comments
 (0)