|
11 | 11 | from datetime import datetime, timedelta
|
12 | 12 | from functools import partial
|
13 | 13 | import os
|
| 14 | +import time |
14 | 15 |
|
15 | 16 | from tornado import web
|
16 | 17 | from tornado.concurrent import Future
|
@@ -320,37 +321,62 @@ async def restart_kernel(self, kernel_id):
|
320 | 321 | await ensure_async(self.pinned_superclass.restart_kernel(self, kernel_id))
|
321 | 322 | kernel = self.get_kernel(kernel_id)
|
322 | 323 | # return a Future that will resolve when the kernel has successfully restarted
|
323 |
| - channel = kernel.connect_shell() |
| 324 | + shell_channel = kernel.connect_shell() |
| 325 | + iopub_channel = kernel.connect_iopub() |
324 | 326 | future = Future()
|
| 327 | + info_future = Future() |
| 328 | + iopub_future = Future() |
325 | 329 |
|
326 | 330 | def finish():
|
327 | 331 | """Common cleanup when restart finishes/fails for any reason."""
|
328 |
| - if not channel.closed(): |
329 |
| - channel.close() |
330 | 332 | loop.remove_timeout(timeout)
|
331 | 333 | kernel.remove_restart_callback(on_restart_failed, 'dead')
|
332 | 334 |
|
333 |
| - def on_reply(msg): |
| 335 | + def on_shell_reply(msg): |
334 | 336 | self.log.debug("Kernel info reply received: %s", kernel_id)
|
335 |
| - finish() |
336 |
| - if not future.done(): |
337 |
| - future.set_result(msg) |
| 337 | + shell_channel.close() |
| 338 | + if not info_future.done(): |
| 339 | + info_future.set_result(msg) |
| 340 | + if iopub_future.done(): |
| 341 | + finish() |
| 342 | + future.set_result(info_future.result()) |
| 343 | + |
| 344 | + def on_iopub(msg): |
| 345 | + self.log.debug("first IOPub received: %s", kernel_id) |
| 346 | + iopub_channel.close() |
| 347 | + if not iopub_future.done(): |
| 348 | + iopub_future.set_result(None) |
| 349 | + if info_future.done(): |
| 350 | + finish() |
| 351 | + future.set_result(info_future.result()) |
338 | 352 |
|
339 | 353 | def on_timeout():
|
340 | 354 | self.log.warning("Timeout waiting for kernel_info_reply: %s", kernel_id)
|
| 355 | + if not shell_channel.closed(): |
| 356 | + shell_channel.close() |
| 357 | + if not iopub_channel.closed(): |
| 358 | + iopub_channel.close() |
341 | 359 | finish()
|
342 | 360 | if not future.done():
|
343 | 361 | future.set_exception(TimeoutError("Timeout waiting for restart"))
|
344 | 362 |
|
345 | 363 | def on_restart_failed():
|
346 | 364 | self.log.warning("Restarting kernel failed: %s", kernel_id)
|
| 365 | + if not shell_channel.closed(): |
| 366 | + shell_channel.close() |
| 367 | + if not iopub_channel.closed(): |
| 368 | + iopub_channel.close() |
347 | 369 | finish()
|
348 | 370 | if not future.done():
|
349 | 371 | future.set_exception(RuntimeError("Restart failed"))
|
350 | 372 |
|
351 | 373 | kernel.add_restart_callback(on_restart_failed, 'dead')
|
352 |
| - kernel.session.send(channel, "kernel_info_request") |
353 |
| - channel.on_recv(on_reply) |
| 374 | + iopub_channel.on_recv(on_iopub) |
| 375 | + shell_channel.on_recv(on_shell_reply) |
| 376 | + while not future.done(): |
| 377 | + time.sleep(0.2) |
| 378 | + # Nudge the kernel with kernel info requests until we get an IOPub message |
| 379 | + kernel.session.send(shell_channel, "kernel_info_request") |
354 | 380 | loop = IOLoop.current()
|
355 | 381 | timeout = loop.add_timeout(loop.time() + self.kernel_info_timeout, on_timeout)
|
356 | 382 | return future
|
|
0 commit comments