Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

retry kernel_info_requests in wait_for_ready #592

Merged
merged 1 commit into from
Dec 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions jupyter_client/asynchronous/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,21 @@ async def wait_for_ready(self, timeout=None):

# Wait for kernel info reply on shell channel
while True:
self.kernel_info()
try:
msg = await self.shell_channel.get_msg(timeout=1)
except Empty:
pass
else:
if msg['msg_type'] == 'kernel_info_reply':
self._handle_kernel_info_reply(msg)
break
# Checking that IOPub is connected. If it is not connected, start over.
try:
await self.iopub_channel.get_msg(timeout=0.2)
except Empty:
pass
else:
self._handle_kernel_info_reply(msg)
break

if not await self.is_alive():
raise RuntimeError('Kernel died before replying to kernel_info')
Expand Down
11 changes: 9 additions & 2 deletions jupyter_client/blocking/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,21 @@ def wait_for_ready(self, timeout=None):

# Wait for kernel info reply on shell channel
while True:
self.kernel_info()
try:
msg = self.shell_channel.get_msg(block=True, timeout=1)
except Empty:
pass
else:
if msg['msg_type'] == 'kernel_info_reply':
self._handle_kernel_info_reply(msg)
break
# Checking that IOPub is connected. If it is not connected, start over.
try:
self.iopub_channel.get_msg(block=True, timeout=0.2)
except Empty:
pass
else:
self._handle_kernel_info_reply(msg)
break

if not self.is_alive():
raise RuntimeError('Kernel died before replying to kernel_info')
Expand Down
5 changes: 2 additions & 3 deletions jupyter_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,10 @@ def start_channels(self, shell=True, iopub=True, stdin=True, hb=True, control=Tr
:meth:`start_kernel`. If the channels have been stopped and you
call this, :class:`RuntimeError` will be raised.
"""
if shell:
self.shell_channel.start()
self.kernel_info()
if iopub:
self.iopub_channel.start()
if shell:
self.shell_channel.start()
if stdin:
self.stdin_channel.start()
self.allow_stdin = True
Expand Down
7 changes: 0 additions & 7 deletions jupyter_client/tests/signalkernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,6 @@ def do_execute(self, code, silent, store_history=True, user_expressions=None,
reply['traceback'] = ['no such command: %s' % code]
return reply

def kernel_info_request(self, *args, **kwargs):
"""Add delay to kernel_info_request

triggers slow-response code in KernelClient.wait_for_ready
"""
return super().kernel_info_request(*args, **kwargs)


class SignalTestApp(IPKernelApp):
kernel_class = SignalTestKernel
Expand Down
35 changes: 25 additions & 10 deletions jupyter_client/tests/test_kernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,11 @@ def test_signal_kernel_subprocesses(self, install_kernel, start_kernel):
km, kc = start_kernel

def execute(cmd):
kc.execute(cmd)
reply = kc.get_shell_msg(TIMEOUT)
request_id = kc.execute(cmd)
while True:
reply = kc.get_shell_msg(TIMEOUT)
if reply['parent_header']['msg_id'] == request_id:
break
content = reply['content']
assert content['status'] == 'ok'
return content
Expand Down Expand Up @@ -172,8 +175,11 @@ def test_start_new_kernel(self, install_kernel, start_kernel):

def _env_test_body(self, kc):
def execute(cmd):
kc.execute(cmd)
reply = kc.get_shell_msg(TIMEOUT)
request_id = kc.execute(cmd)
while True:
reply = kc.get_shell_msg(TIMEOUT)
if reply['parent_header']['msg_id'] == request_id:
break
content = reply['content']
assert content['status'] == 'ok'
return content
Expand Down Expand Up @@ -274,8 +280,11 @@ def _run_signaltest_lifecycle(self, config=None):
kc = self._prepare_kernel(km, stdout=PIPE, stderr=PIPE)

def execute(cmd):
kc.execute(cmd)
reply = kc.get_shell_msg(TIMEOUT)
request_id = kc.execute(cmd)
while True:
reply = kc.get_shell_msg(TIMEOUT)
if reply['parent_header']['msg_id'] == request_id:
break
content = reply['content']
assert content['status'] == 'ok'
return content
Expand Down Expand Up @@ -344,8 +353,11 @@ async def test_signal_kernel_subprocesses(self, install_kernel, start_async_kern
km, kc = start_async_kernel

async def execute(cmd):
kc.execute(cmd)
reply = await kc.get_shell_msg(TIMEOUT)
request_id = kc.execute(cmd)
while True:
reply = await kc.get_shell_msg(TIMEOUT)
if reply['parent_header']['msg_id'] == request_id:
break
content = reply['content']
assert content['status'] == 'ok'
return content
Expand All @@ -360,10 +372,13 @@ async def execute(cmd):
assert reply['user_expressions']['poll'] == [None] * N

# start a job on the kernel to be interrupted
kc.execute('sleep')
request_id = kc.execute('sleep')
await asyncio.sleep(1) # ensure sleep message has been handled before we interrupt
await km.interrupt_kernel()
reply = await kc.get_shell_msg(TIMEOUT)
while True:
reply = await kc.get_shell_msg(TIMEOUT)
if reply['parent_header']['msg_id'] == request_id:
break
content = reply['content']
assert content['status'] == 'ok'
assert content['user_expressions']['interrupted'] is True
Expand Down