From 3455c29d8787425f58f30ff500cdca33fd11d5fc Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 6 Jun 2023 11:41:52 -0500 Subject: [PATCH 1/3] [pre-commit.ci] pre-commit autoupdate (#1120) Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Steven Silvester --- .pre-commit-config.yaml | 4 ++-- ipykernel/eventloops.py | 5 ++--- pyproject.toml | 2 +- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 58fac7cf8..82b55d101 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -21,7 +21,7 @@ repos: - id: trailing-whitespace - repo: https://github.com/python-jsonschema/check-jsonschema - rev: 0.22.0 + rev: 0.23.1 hooks: - id: check-github-workflows @@ -36,7 +36,7 @@ repos: - id: black - repo: https://github.com/charliermarsh/ruff-pre-commit - rev: v0.0.263 + rev: v0.0.270 hooks: - id: ruff args: ["--fix"] diff --git a/ipykernel/eventloops.py b/ipykernel/eventloops.py index 412a246fe..8bf0996d7 100644 --- a/ipykernel/eventloops.py +++ b/ipykernel/eventloops.py @@ -419,10 +419,9 @@ def loop_asyncio_exit(kernel): loop = asyncio.get_event_loop() - @asyncio.coroutine - def close_loop(): + async def close_loop(): if hasattr(loop, "shutdown_asyncgens"): - yield from loop.shutdown_asyncgens() + yield loop.shutdown_asyncgens() loop._should_close = True # type:ignore[attr-defined] loop.stop() diff --git a/pyproject.toml b/pyproject.toml index 3494fefae..8ba499ca2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -117,7 +117,7 @@ dependencies = ["mypy>=0.990"] test = "mypy --install-types --non-interactive {args:.}" [tool.hatch.envs.lint] -dependencies = ["black==23.3.0", "mdformat>0.7", "ruff==0.0.263"] +dependencies = ["black==23.3.0", "mdformat>0.7", "ruff==0.0.270"] detached = true [tool.hatch.envs.lint.scripts] style = [ From 1c7f626f801168293115c7343b61a21b5fa8f9e7 Mon Sep 17 00:00:00 2001 From: Charles Cooper Date: Thu, 8 Jun 2023 08:42:45 -0700 Subject: [PATCH 2/3] fix: protect stdout/stderr restoration in `InProcessKernel._redirected_io` (#1122) --- ipykernel/inprocess/ipkernel.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/ipykernel/inprocess/ipkernel.py b/ipykernel/inprocess/ipkernel.py index df34303b4..3f7fcce9e 100644 --- a/ipykernel/inprocess/ipkernel.py +++ b/ipykernel/inprocess/ipkernel.py @@ -121,9 +121,11 @@ def _input_request(self, prompt, ident, parent, password=False): def _redirected_io(self): """Temporarily redirect IO to the kernel.""" sys_stdout, sys_stderr = sys.stdout, sys.stderr - sys.stdout, sys.stderr = self.stdout, self.stderr - yield - sys.stdout, sys.stderr = sys_stdout, sys_stderr + try: + sys.stdout, sys.stderr = self.stdout, self.stderr + yield + finally: + sys.stdout, sys.stderr = sys_stdout, sys_stderr # ------ Trait change handlers -------------------------------------------- From 112ca66da0ee8156b983094b2c8e2926ed63cfcb Mon Sep 17 00:00:00 2001 From: Min RK Date: Mon, 12 Jun 2023 22:13:12 +0200 Subject: [PATCH 3/3] Avoid ResourceWarning on implicitly closed event pipe sockets (#1125) Co-authored-by: Steven Silvester Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- ipykernel/iostream.py | 50 ++++++++++++++++++++++++++++++++------ ipykernel/tests/test_io.py | 35 ++++++++++++++++++++++++++ 2 files changed, 78 insertions(+), 7 deletions(-) diff --git a/ipykernel/iostream.py b/ipykernel/iostream.py index 8b5e47b30..a1e138452 100644 --- a/ipykernel/iostream.py +++ b/ipykernel/iostream.py @@ -3,6 +3,7 @@ # Copyright (c) IPython Development Team. # Distributed under the terms of the Modified BSD License. +import asyncio import atexit import io import os @@ -14,8 +15,7 @@ from collections import deque from io import StringIO, TextIOBase from threading import local -from typing import Any, Callable, Deque, Optional -from weakref import WeakSet +from typing import Any, Callable, Deque, Dict, Optional import zmq from jupyter_client.session import extract_header @@ -63,7 +63,10 @@ def __init__(self, socket, pipe=False): self._setup_pipe_in() self._local = threading.local() self._events: Deque[Callable[..., Any]] = deque() - self._event_pipes: WeakSet[Any] = WeakSet() + self._event_pipes: Dict[threading.Thread, Any] = {} + self._event_pipe_gc_lock: threading.Lock = threading.Lock() + self._event_pipe_gc_seconds: float = 10 + self._event_pipe_gc_task: Optional[asyncio.Task] = None self._setup_event_pipe() self.thread = threading.Thread(target=self._thread_main, name="IOPub") self.thread.daemon = True @@ -73,7 +76,18 @@ def __init__(self, socket, pipe=False): def _thread_main(self): """The inner loop that's actually run in a thread""" + + def _start_event_gc(): + self._event_pipe_gc_task = asyncio.ensure_future(self._run_event_pipe_gc()) + + self.io_loop.run_sync(_start_event_gc) self.io_loop.start() + if self._event_pipe_gc_task is not None: + # cancel gc task to avoid pending task warnings + async def _cancel(): + self._event_pipe_gc_task.cancel() # type:ignore + + self.io_loop.run_sync(_cancel) self.io_loop.close(all_fds=True) def _setup_event_pipe(self): @@ -88,6 +102,26 @@ def _setup_event_pipe(self): self._event_puller = ZMQStream(pipe_in, self.io_loop) self._event_puller.on_recv(self._handle_event) + async def _run_event_pipe_gc(self): + """Task to run event pipe gc continuously""" + while True: + await asyncio.sleep(self._event_pipe_gc_seconds) + try: + await self._event_pipe_gc() + except Exception as e: + print(f"Exception in IOPubThread._event_pipe_gc: {e}", file=sys.__stderr__) + + async def _event_pipe_gc(self): + """run a single garbage collection on event pipes""" + if not self._event_pipes: + # don't acquire the lock if there's nothing to do + return + with self._event_pipe_gc_lock: + for thread, socket in list(self._event_pipes.items()): + if not thread.is_alive(): + socket.close() + del self._event_pipes[thread] + @property def _event_pipe(self): """thread-local event pipe for signaling events that should be processed in the thread""" @@ -100,9 +134,11 @@ def _event_pipe(self): event_pipe.linger = 0 event_pipe.connect(self._event_interface) self._local.event_pipe = event_pipe - # WeakSet so that event pipes will be closed by garbage collection - # when their threads are terminated - self._event_pipes.add(event_pipe) + # associate event pipes to their threads + # so they can be closed explicitly + # implicit close on __del__ throws a ResourceWarning + with self._event_pipe_gc_lock: + self._event_pipes[threading.current_thread()] = event_pipe return event_pipe def _handle_event(self, msg): @@ -188,7 +224,7 @@ def stop(self): # close *all* event pipes, created in any thread # event pipes can only be used from other threads while self.thread.is_alive() # so after thread.join, this should be safe - for event_pipe in self._event_pipes: + for _thread, event_pipe in self._event_pipes.items(): event_pipe.close() def close(self): diff --git a/ipykernel/tests/test_io.py b/ipykernel/tests/test_io.py index 6a9f65170..404657cbb 100644 --- a/ipykernel/tests/test_io.py +++ b/ipykernel/tests/test_io.py @@ -4,8 +4,10 @@ import os import subprocess import sys +import threading import time import warnings +from concurrent.futures import Future, ThreadPoolExecutor from unittest import mock import pytest @@ -114,6 +116,39 @@ def test_outstream(iopub_thread): assert stream.writable() +async def test_event_pipe_gc(iopub_thread): + session = Session(key=b'abc') + stream = OutStream( + session, + iopub_thread, + "stdout", + isatty=True, + watchfd=False, + ) + save_stdout = sys.stdout + assert iopub_thread._event_pipes == {} + with stream, mock.patch.object(sys, "stdout", stream), ThreadPoolExecutor(1) as pool: + pool.submit(print, "x").result() + pool_thread = pool.submit(threading.current_thread).result() + assert list(iopub_thread._event_pipes) == [pool_thread] + + # run gc once in the iopub thread + f: Future = Future() + + async def run_gc(): + try: + await iopub_thread._event_pipe_gc() + except Exception as e: + f.set_exception(e) + else: + f.set_result(None) + + iopub_thread.io_loop.add_callback(run_gc) + # wait for call to finish in iopub thread + f.result() + assert iopub_thread._event_pipes == {} + + def subprocess_test_echo_watch(): # handshake Pub subscription session = Session(key=b'abc')