Skip to content

Commit 2b8e22c

Browse files
committed
Nudge kernel with info requests
1 parent 0c83c9d commit 2b8e22c

File tree

1 file changed

+71
-3
lines changed

1 file changed

+71
-3
lines changed

notebook/services/kernels/handlers.py

Lines changed: 71 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,12 +121,71 @@ def __repr__(self):
121121
return "%s(%s)" % (self.__class__.__name__, getattr(self, 'kernel_id', 'uninitialized'))
122122

123123
def create_stream(self):
124+
self.log.debug("Create stream")
124125
km = self.kernel_manager
125126
identity = self.session.bsession
126127
for channel in ('shell', 'control', 'iopub', 'stdin'):
127128
meth = getattr(km, 'connect_' + channel)
128129
self.channels[channel] = stream = meth(self.kernel_id, identity=identity)
129130
stream.channel = channel
131+
132+
shell_channel = self.channels['shell']
133+
iopub_channel = self.channels['iopub']
134+
135+
future = Future()
136+
info_future = Future()
137+
iopub_future = Future()
138+
139+
def finish():
140+
"""Common cleanup"""
141+
loop.remove_timeout(timeout)
142+
loop.remove_timeout(nudge_handle)
143+
iopub_channel.stop_on_recv()
144+
shell_channel.stop_on_recv()
145+
146+
def on_shell_reply(msg):
147+
if not info_future.done():
148+
self.log.debug("Nudge: shell info reply received: %s", self.kernel_id)
149+
shell_channel.stop_on_recv()
150+
self.log.debug("Nudge: resolving shell future")
151+
info_future.set_result(msg)
152+
if iopub_future.done():
153+
finish()
154+
self.log.debug("Nudge: resolving main future in shell handler")
155+
future.set_result(info_future.result())
156+
157+
def on_iopub(msg):
158+
if not iopub_future.done():
159+
self.log.debug("Nudge: first IOPub received: %s", self.kernel_id)
160+
iopub_channel.stop_on_recv()
161+
self.log.debug("Nudge: resolving iopub future")
162+
iopub_future.set_result(None)
163+
if info_future.done():
164+
finish()
165+
self.log.debug("Nudge: resolving main future in iopub handler")
166+
future.set_result(info_future.result())
167+
168+
def on_timeout():
169+
self.log.warning("Nudge: Timeout waiting for kernel_info_reply: %s", self.kernel_id)
170+
finish()
171+
if not future.done():
172+
future.set_exception(TimeoutError("Timeout waiting for nudge"))
173+
174+
iopub_channel.on_recv(on_iopub)
175+
shell_channel.on_recv(on_shell_reply)
176+
loop = IOLoop.current()
177+
178+
# Nudge the kernel with kernel info requests until we get an IOPub message
179+
def nudge():
180+
self.log.debug("Nudge")
181+
if not future.done():
182+
self.log.debug("nudging")
183+
self.session.send(shell_channel, "kernel_info_request")
184+
nudge_handle = loop.call_later(0.5, nudge)
185+
nudge_handle = loop.call_later(0, nudge)
186+
187+
timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout)
188+
return future
130189

131190
def request_kernel_info(self):
132191
"""send a request for kernel_info"""
@@ -253,7 +312,9 @@ def _register_session(self):
253312
yield stale_handler.close()
254313
self._open_sessions[self.session_key] = self
255314

315+
@gen.coroutine
256316
def open(self, kernel_id):
317+
self.log.debug("======================================== OPEN")
257318
super().open()
258319
km = self.kernel_manager
259320
km.notify_connect(kernel_id)
@@ -269,9 +330,11 @@ def open(self, kernel_id):
269330
for channel, msg_list in replay_buffer:
270331
stream = self.channels[channel]
271332
self._on_zmq_reply(stream, msg_list)
333+
connected = Future()
334+
connected.set_result(None)
272335
else:
273336
try:
274-
self.create_stream()
337+
connected = self.create_stream()
275338
except web.HTTPError as e:
276339
self.log.error("Error opening stream: %s", e)
277340
# WebSockets don't response to traditional error codes so we
@@ -285,8 +348,13 @@ def open(self, kernel_id):
285348
km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
286349
km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')
287350

288-
for channel, stream in self.channels.items():
289-
stream.on_recv_stream(self._on_zmq_reply)
351+
def subscribe(value):
352+
for channel, stream in self.channels.items():
353+
stream.on_recv_stream(self._on_zmq_reply)
354+
355+
connected.add_done_callback(subscribe)
356+
357+
return connected
290358

291359
def on_message(self, msg):
292360
if not self.channels:

0 commit comments

Comments
 (0)