Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Run exec_request inside a task #1384

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
fb7b960
feat: enhance kernel message handling with memory streams to run exec…
fleming79 Mar 29, 2025
17ebe6b
Allow infinite buffer size for memory object streams in kernel
fleming79 Mar 29, 2025
5f71eaa
fix: handle kernel abort requests gracefully in message processing
fleming79 Mar 30, 2025
62300ed
Simplify abort handling and make send_stream a required parameter for…
fleming79 Mar 30, 2025
1828a88
Fix publish status for execute_request
fleming79 Mar 30, 2025
211094b
Ensure one send_stream per socket/thread
fleming79 Mar 31, 2025
f13959b
Disable timeouts for debugging tests with debugpy
fleming79 Mar 31, 2025
bee82d1
Minor tweaks to tests to make them work with kernel changes.
fleming79 Mar 31, 2025
aaa29fe
Add anyio Event for _main_shell_ready to aid with start and test reli…
fleming79 Mar 31, 2025
5bc1985
Update timing assertions in concurrent test to improve reliability
fleming79 Mar 31, 2025
533ca75
Restore test timeouts to original values
fleming79 Mar 31, 2025
f0e5116
Add small delay in process_control_message to aid with test_sequenti…
fleming79 Mar 31, 2025
8b4b086
Remove unused _eventloop_set
fleming79 Apr 1, 2025
fa09c5f
Rename _main_shell_ready to _main_subshell_ready
fleming79 Apr 1, 2025
6a68f02
Try to make test_run_concurrently_timing more reliable
fleming79 Apr 1, 2025
104ef76
Improve test_tk_loop to run localy.
fleming79 Apr 1, 2025
7b5d3e6
Add asyncio_event_loop (currently only set for asyncio backend) to th…
fleming79 Apr 1, 2025
5cb0f60
Merge branch 'ipython:main' into run-execute-request-in-task
fleming79 Apr 1, 2025
ee8b768
Fix typos in CHANGELOG.md
fleming79 Apr 1, 2025
555f4b2
Close receive_stream after task group has exited.
fleming79 Apr 2, 2025
2b3d6ec
Pass parent directly to publish_status calls to avoid changing parent…
fleming79 Apr 2, 2025
4586e34
Pass subshell_id to _execute_request_handler
fleming79 Apr 2, 2025
62d46b4
Provide a TaskGroup in the kernel with a shielded CancelScope
fleming79 Apr 2, 2025
8fc455c
Add BlockingPortal and enhance task management in Kernel class
fleming79 Apr 4, 2025
02c59e8
Catch errors on main subshell stop and remove redundant cancel scope.
fleming79 Apr 4, 2025
da2d27f
Add optional name parameter to start_soon for improved task identific…
fleming79 Apr 4, 2025
7a85d84
Refactor _execute_request_loop to ensure status is published in final…
fleming79 Apr 5, 2025
19ee3e2
Remove deprecated metadata handling and unused log module
fleming79 Apr 5, 2025
5798b4c
Only set parent and ident from _execute_request_loop.
fleming79 Apr 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@
- Fix expected text depending on IPython version. [#1354](https://github.com/ipython/ipykernel/pull/1354) ([@Carreau](https://github.com/Carreau))
- Another try at tracking down ResourceWarning with tracemalloc. [#1353](https://github.com/ipython/ipykernel/pull/1353) ([@Carreau](https://github.com/Carreau))
- Remove deprecated modules since 4.3 (2016). [#1352](https://github.com/ipython/ipykernel/pull/1352) ([@Carreau](https://github.com/Carreau))
- Try to reenable tests from downstream ipywidgets [#1350](https://github.com/ipython/ipykernel/pull/1350) ([@Carreau](https://github.com/Carreau))
- Try to re-enable tests from downstream ipywidgets [#1350](https://github.com/ipython/ipykernel/pull/1350) ([@Carreau](https://github.com/Carreau))
- Disable 3 failing downstream tests, but keep testing the rest. [#1349](https://github.com/ipython/ipykernel/pull/1349) ([@Carreau](https://github.com/Carreau))
- Licence :: * trove classifers are deprecated [#1348](https://github.com/ipython/ipykernel/pull/1348) ([@Carreau](https://github.com/Carreau))
- Licence :: * trove classifiers are deprecated [#1348](https://github.com/ipython/ipykernel/pull/1348) ([@Carreau](https://github.com/Carreau))
- Pin sphinx to resolve docs build failures [#1347](https://github.com/ipython/ipykernel/pull/1347) ([@krassowski](https://github.com/krassowski))
- Make our own mock kernel methods async [#1346](https://github.com/ipython/ipykernel/pull/1346) ([@Carreau](https://github.com/Carreau))
- Try to debug non-closed iopub socket [#1345](https://github.com/ipython/ipykernel/pull/1345) ([@Carreau](https://github.com/Carreau))
- Email is @python.org since 2018 [#1343](https://github.com/ipython/ipykernel/pull/1343) ([@Carreau](https://github.com/Carreau))
- Remove unused ignores lints. [#1342](https://github.com/ipython/ipykernel/pull/1342) ([@Carreau](https://github.com/Carreau))
- Enable ruff G002 and fix 6 occurences [#1341](https://github.com/ipython/ipykernel/pull/1341) ([@Carreau](https://github.com/Carreau))
- Enable ruff G002 and fix 6 occurrences [#1341](https://github.com/ipython/ipykernel/pull/1341) ([@Carreau](https://github.com/Carreau))
- Check ignores warnings are still relevant. [#1340](https://github.com/ipython/ipykernel/pull/1340) ([@Carreau](https://github.com/Carreau))
- Move mypy disablinging error codes on a per-file basis [#1338](https://github.com/ipython/ipykernel/pull/1338) ([@Carreau](https://github.com/Carreau))
- try to fix spyder kernel install [#1337](https://github.com/ipython/ipykernel/pull/1337) ([@Carreau](https://github.com/Carreau))
Expand All @@ -46,7 +46,7 @@
- Bump mypy [#1333](https://github.com/ipython/ipykernel/pull/1333) ([@Carreau](https://github.com/Carreau))
- Remove dead code. [#1332](https://github.com/ipython/ipykernel/pull/1332) ([@Carreau](https://github.com/Carreau))
- Ignore or fix most of the remaining ruff 0.9.6 errors [#1331](https://github.com/ipython/ipykernel/pull/1331) ([@Carreau](https://github.com/Carreau))
- minor code reformating valid ruff 0.9.6 [#1330](https://github.com/ipython/ipykernel/pull/1330) ([@Carreau](https://github.com/Carreau))
- minor code reformatting valid ruff 0.9.6 [#1330](https://github.com/ipython/ipykernel/pull/1330) ([@Carreau](https://github.com/Carreau))
- Some formatting changes to prepare bumping ruff pre-commit. [#1329](https://github.com/ipython/ipykernel/pull/1329) ([@Carreau](https://github.com/Carreau))
- Manually update Codespell and fix new errors. [#1328](https://github.com/ipython/ipykernel/pull/1328) ([@Carreau](https://github.com/Carreau))
- Manually update mdformat pre-commit and run it. [#1327](https://github.com/ipython/ipykernel/pull/1327) ([@Carreau](https://github.com/Carreau))
Expand Down
2 changes: 1 addition & 1 deletion ipykernel/comm/comm.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def publish_msg(self, msg_type, data=None, metadata=None, buffers=None, **keys):
msg_type,
content,
metadata=json_clean(metadata),
parent=self.kernel.get_parent(),
parent=self.kernel.parent_msg,
ident=self.topic,
buffers=buffers,
)
Expand Down
67 changes: 2 additions & 65 deletions ipykernel/eventloops.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import os
import platform
import sys
from functools import partial

import zmq
from packaging.version import Version as V
Expand Down Expand Up @@ -400,63 +399,11 @@ def loop_cocoa_exit(kernel):

@register_integration("asyncio")
def loop_asyncio(kernel):
"""Start a kernel with asyncio event loop support."""
import asyncio

loop = asyncio.get_event_loop()
# loop is already running (e.g. tornado 5), nothing left to do
if loop.is_running():
return

if loop.is_closed():
# main loop is closed, create a new one
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop._should_close = False # type:ignore[attr-defined]
"""Verify the asyncio event loop is supported."""

# pause eventloop when there's an event on a zmq socket
def process_stream_events(socket):
"""fall back to main loop when there's a socket event"""
loop.stop()

notifier = partial(process_stream_events, kernel.shell_socket)
loop.add_reader(kernel.shell_socket.getsockopt(zmq.FD), notifier)
loop.call_soon(notifier)

while True:
error = None
try:
loop.run_forever()
except KeyboardInterrupt:
continue
except Exception as e:
error = e
if loop._should_close: # type:ignore[attr-defined]
loop.close()
if error is not None:
raise error
break


@loop_asyncio.exit
def loop_asyncio_exit(kernel):
"""Exit hook for asyncio"""
import asyncio

loop = asyncio.get_event_loop()

async def close_loop():
if hasattr(loop, "shutdown_asyncgens"):
yield loop.shutdown_asyncgens()
loop._should_close = True # type:ignore[attr-defined]
loop.stop()

if loop.is_running():
close_loop()

elif not loop.is_closed():
loop.run_until_complete(close_loop) # type:ignore[arg-type]
loop.close()
asyncio.get_running_loop()


def set_qt_api_env_from_gui(gui):
Expand Down Expand Up @@ -602,13 +549,3 @@ def enable_gui(gui, kernel=None):
msg = "Cannot activate multiple GUI eventloops" # type:ignore[unreachable]
raise RuntimeError(msg)
kernel.eventloop = loop
# We set `eventloop`; the function the user chose is executed in `Kernel.enter_eventloop`, thus
# any exceptions raised during the event loop will not be shown in the client.

# If running in async loop then set anyio event to trigger starting the eventloop.
# If not running in async loop do nothing as this will be handled in IPKernelApp.main().
try:
kernel._eventloop_set.set()
except RuntimeError:
# Expecting sniffio.AsyncLibraryNotFoundError but don't want to import sniffio just for that
pass
6 changes: 3 additions & 3 deletions ipykernel/inprocess/ipkernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def stop(self):
def _abort_queues(self):
"""The in-process kernel doesn't abort requests."""

def _input_request(self, prompt, ident, parent, password=False):
def _input_request(self, prompt, *, password=False):
# Flush output before making the request.
self.raw_input_str = None
if sys.stdout is not None:
Expand All @@ -106,10 +106,10 @@ def _input_request(self, prompt, ident, parent, password=False):
# Send the input request.
content = json_clean(dict(prompt=prompt, password=password))
assert self.session is not None
msg = self.session.msg("input_request", content, parent)
msg = self.session.msg("input_request", content, self.parent_msg)
for frontend in self.frontends:
assert frontend is not None
if frontend.session.session == parent["header"]["session"]:
if frontend.session.session == self.parent_msg["header"]["session"]:
frontend.stdin_channel.call_handlers(msg)
break
else:
Expand Down
108 changes: 3 additions & 105 deletions ipykernel/ipkernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,43 +286,11 @@ def stop(self):
if self.debugpy_socket is not None:
self.debugpy_stop.set()

def set_parent(self, ident, parent, channel="shell"):
"""Overridden from parent to tell the display hook and output streams
about the parent message.
"""
super().set_parent(ident, parent, channel)
if channel == "shell" and self.shell:
def _set_parent_ident(self, parent, ident):
super()._set_parent_ident(parent, ident)
if self.shell:
self.shell.set_parent(parent)

def init_metadata(self, parent):
"""Initialize metadata.

Run at the beginning of each execution request.
"""
md = super().init_metadata(parent)
# FIXME: remove deprecated ipyparallel-specific code
# This is required for ipyparallel < 5.0
md.update(
{
"dependencies_met": True,
"engine": self.ident,
}
)
return md

def finish_metadata(self, parent, metadata, reply_content):
"""Finish populating metadata.

Run after completing an execution request.
"""
# FIXME: remove deprecated ipyparallel-specific code
# This is required by ipyparallel < 5.0
metadata["status"] = reply_content["status"]
if reply_content["status"] == "error" and reply_content["ename"] == "UnmetDependency":
metadata["dependencies_met"] = False

return metadata

def _forward_input(self, allow_stdin=False):
"""Forward raw_input and getpass to the current frontend.

Expand Down Expand Up @@ -478,10 +446,6 @@ async def run(execution: Execution) -> None:
}
)

# FIXME: deprecated piece for ipyparallel (remove in 5.0):
e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method="execute")
reply_content["engine_info"] = e_info

# Return the execution counter so clients can display prompts
reply_content["execution_count"] = shell.execution_count - 1

Expand Down Expand Up @@ -662,72 +626,6 @@ def do_is_complete(self, code):
r["indent"] = " " * indent_spaces
return r

def do_apply(self, content, bufs, msg_id, reply_metadata):
"""Handle an apply request."""
try:
from ipyparallel.serialize import serialize_object, unpack_apply_message
except ImportError:
from .serialize import serialize_object, unpack_apply_message

shell = self.shell
assert shell is not None
try:
working = shell.user_ns

prefix = "_" + str(msg_id).replace("-", "") + "_"
f, args, kwargs = unpack_apply_message(bufs, working, copy=False)

fname = getattr(f, "__name__", "f")

fname = prefix + "f"
argname = prefix + "args"
kwargname = prefix + "kwargs"
resultname = prefix + "result"

ns = {fname: f, argname: args, kwargname: kwargs, resultname: None}
# print ns
working.update(ns)
code = f"{resultname} = {fname}(*{argname},**{kwargname})"
try:
exec(code, shell.user_global_ns, shell.user_ns)
result = working.get(resultname)
finally:
for key in ns:
working.pop(key)

assert self.session is not None
result_buf = serialize_object(
result,
buffer_threshold=self.session.buffer_threshold,
item_threshold=self.session.item_threshold,
)

except BaseException as e:
# invoke IPython traceback formatting
shell.showtraceback()
reply_content = {
"traceback": shell._last_traceback or [],
"ename": str(type(e).__name__),
"evalue": str(e),
}
# FIXME: deprecated piece for ipyparallel (remove in 5.0):
e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method="apply")
reply_content["engine_info"] = e_info

self.send_response(
self.iopub_socket,
"error",
reply_content,
ident=self._topic("error"),
)
self.log.info("Exception in apply request:\n%s", "\n".join(reply_content["traceback"]))
result_buf = []
reply_content["status"] = "error"
else:
reply_content = {"status": "ok"}

return reply_content, result_buf

def do_clear(self):
"""Clear the kernel."""
if self.shell:
Expand Down
10 changes: 1 addition & 9 deletions ipykernel/kernelapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import zmq
import zmq_anyio
from anyio import create_task_group, run, to_thread
from anyio import create_task_group, run
from IPython.core.application import ( # type:ignore[attr-defined]
BaseIPythonApplication,
base_aliases,
Expand Down Expand Up @@ -764,18 +764,10 @@ def start(self) -> None:
backend = "trio" if self.trio_loop else "asyncio"
run(partial(self._start, backend), backend=backend)

async def _wait_to_enter_eventloop(self) -> None:
await to_thread.run_sync(self.kernel._eventloop_set.wait)
await self.kernel.enter_eventloop()

async def main(self) -> None:
async with create_task_group() as tg:
tg.start_soon(self._wait_to_enter_eventloop)
tg.start_soon(self.kernel.start)

if self.kernel.eventloop:
self.kernel._eventloop_set.set()

def stop(self) -> None:
"""Stop the kernel, thread-safe."""
self.kernel.stop()
Expand Down
Loading
Loading