Skip to content

Commit

Permalink
Replace all occurrences of sys.is_finalizing (#8449)
Browse files Browse the repository at this point in the history
Co-authored-by: Hendrik Makait <hendrik@makait.com>
  • Loading branch information
fjetter and hendrikmakait authored Jan 15, 2024
1 parent 78ae97d commit ba34f53
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 13 deletions.
2 changes: 1 addition & 1 deletion distributed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def _():
Note
----
This function must be registered with atexit *after* any class that invokes
``dstributed.utils.is_python_shutting_down`` has been defined. This way it
``distributed.utils.is_python_shutting_down`` has been defined. This way it
will be called before the ``__del__`` method of those classes.
See Also
Expand Down
4 changes: 2 additions & 2 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ def __del__(self):
except AttributeError:
# Occasionally we see this error when shutting down the client
# https://github.com/dask/distributed/issues/4305
if not sys.is_finalizing():
if not is_python_shutting_down():
raise
except RuntimeError: # closed event loop
pass
Expand Down Expand Up @@ -1786,7 +1786,7 @@ async def _():

assert self.status == "closed"

if not sys.is_finalizing():
if not is_python_shutting_down():
self._loop_runner.stop()

async def _shutdown(self):
Expand Down
8 changes: 6 additions & 2 deletions distributed/comm/inproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from distributed.comm.core import BaseListener, Comm, CommClosedError, Connector
from distributed.comm.registry import Backend, backends
from distributed.protocol import nested_deserialize
from distributed.utils import get_ip
from distributed.utils import get_ip, is_python_shutting_down

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -186,7 +186,11 @@ def __init__( # type: ignore[no-untyped-def]
def _get_finalizer(self):
r = repr(self)

def finalize(write_q=self._write_q, write_loop=self._write_loop, r=r):
def finalize(
read_q=self._read_q, write_q=self._write_q, write_loop=self._write_loop, r=r
):
if read_q.peek(None) is _EOF or is_python_shutting_down():
return
logger.warning(f"Closing dangling queue in {r}")
write_loop.add_callback(write_q.put_nowait, _EOF)

Expand Down
6 changes: 2 additions & 4 deletions distributed/comm/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,7 @@ async def read(self, deserializers=None):
except StreamClosedError as e:
self.stream = None
self._closed = True
if not sys.is_finalizing():
convert_stream_closed_error(self, e)
convert_stream_closed_error(self, e)
except BaseException:
# Some OSError, CancelledError or another "low-level" exception.
# We do not really know what was already read from the underlying
Expand Down Expand Up @@ -305,8 +304,7 @@ async def write(self, msg, serializers=None, on_error="message"):
except StreamClosedError as e:
self.stream = None
self._closed = True
if not sys.is_finalizing():
convert_stream_closed_error(self, e)
convert_stream_closed_error(self, e)
except BaseException:
# Some OSError or a another "low-level" exception. We do not really know
# what was already written to the underlying socket, so it is not even safe
Expand Down
5 changes: 3 additions & 2 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
get_traceback,
has_keyword,
import_file,
is_python_shutting_down,
iscoroutinefunction,
offload,
recursive_to_dict,
Expand Down Expand Up @@ -899,7 +900,7 @@ async def _handle_comm(self, comm: Comm) -> None:
msg = await comm.read()
logger.debug("Message from %r: %s", address, msg)
except OSError as e:
if not sys.is_finalizing():
if not is_python_shutting_down():
logger.debug(
"Lost connection to %r while reading message: %s."
" Last operation: %s",
Expand Down Expand Up @@ -1003,7 +1004,7 @@ async def _handle_comm(self, comm: Comm) -> None:

finally:
del self._comms[comm]
if not sys.is_finalizing() and not comm.closed():
if not is_python_shutting_down() and not comm.closed():
try:
comm.abort()
except Exception as e:
Expand Down
4 changes: 2 additions & 2 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import os
import pickle
import random
import sys
import textwrap
import uuid
import warnings
Expand Down Expand Up @@ -119,6 +118,7 @@
TimeoutError,
format_dashboard_link,
get_fileno_limit,
is_python_shutting_down,
key_split_group,
log_errors,
offload,
Expand Down Expand Up @@ -5607,7 +5607,7 @@ async def add_client(
if not comm.closed():
self.client_comms[client].send({"op": "stream-closed"})
try:
if not sys.is_finalizing():
if not is_python_shutting_down():
await self.client_comms[client].close()
del self.client_comms[client]
if self.status == Status.running:
Expand Down

0 comments on commit ba34f53

Please sign in to comment.