Skip to content

deprecate the io_loop and loop kwarg to Server, Worker, and Nanny #6473

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,13 @@ def __init__(
timeout=None,
io_loop=None,
):
if io_loop is not None:
warnings.warn(
"The io_loop kwarg to Server is ignored and will be deprecated",
DeprecationWarning,
stacklevel=2,
)

self._status = Status.init
self.handlers = {
"identity": self.identity,
Expand Down Expand Up @@ -191,8 +198,7 @@ def __init__(
self._event_finished = asyncio.Event()

self.listeners = []
self.io_loop = io_loop or IOLoop.current()
self.loop = self.io_loop
self.io_loop = self.loop = IOLoop.current()

if not hasattr(self.io_loop, "profile"):
ref = weakref.ref(self.io_loop)
Expand Down
6 changes: 3 additions & 3 deletions distributed/diagnostics/tests/test_worker_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def transition(self, key, start, finish, **kwargs):
async def test_create_with_client(c, s):
await c.register_worker_plugin(MyPlugin(123))

worker = await Worker(s.address, loop=s.loop)
worker = await Worker(s.address)
assert worker._my_plugin_status == "setup"
assert worker._my_plugin_data == 123

Expand All @@ -55,7 +55,7 @@ async def test_remove_with_client(c, s):
await c.register_worker_plugin(MyPlugin(123), name="foo")
await c.register_worker_plugin(MyPlugin(546), name="bar")

worker = await Worker(s.address, loop=s.loop)
worker = await Worker(s.address)
# remove the 'foo' plugin
await c.unregister_worker_plugin("foo")
assert worker._my_plugin_status == "teardown"
Expand All @@ -79,7 +79,7 @@ async def test_remove_with_client(c, s):
async def test_remove_with_client_raises(c, s):
await c.register_worker_plugin(MyPlugin(123), name="foo")

worker = await Worker(s.address, loop=s.loop)
worker = await Worker(s.address)
with pytest.raises(ValueError, match="bar"):
await c.unregister_worker_plugin("bar")

Expand Down
14 changes: 10 additions & 4 deletions distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,16 @@ def __init__(
config=None,
**worker_kwargs,
):
if loop is not None:
warnings.warn(
"the `loop` kwarg to `Nanny` is ignored, and will be removed in a future release. "
"The Nanny always binds to the current loop.",
DeprecationWarning,
stacklevel=2,
)

self._setup_logging(logger)
self.loop = loop or IOLoop.current()
self.loop = self.io_loop = IOLoop.current()

if isinstance(security, dict):
security = Security(**security)
Expand Down Expand Up @@ -246,9 +254,7 @@ def __init__(

self.plugins: dict[str, NannyPlugin] = {}

super().__init__(
handlers=handlers, io_loop=self.loop, connection_args=self.connection_args
)
super().__init__(handlers=handlers, connection_args=self.connection_args)

self.scheduler = self.rpc(self.scheduler_addr)
self.memory_manager = NannyMemoryManager(self, memory_limit=memory_limit)
Expand Down
3 changes: 1 addition & 2 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2889,7 +2889,7 @@ def __init__(
stacklevel=2,
)

self.loop = IOLoop.current()
self.loop = self.io_loop = IOLoop.current()
self._setup_logging(logger)

# Attributes
Expand Down Expand Up @@ -3124,7 +3124,6 @@ def __init__(
self,
handlers=self.handlers,
stream_handlers=merge(worker_handlers, client_handlers),
io_loop=self.loop,
connection_limit=connection_limit,
deserialize=False,
connection_args=self.connection_args,
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2980,7 +2980,7 @@ async def test_unrunnable_task_runs(c, s, a, b):
assert s.tasks[x.key] in s.unrunnable
assert s.get_task_status(keys=[x.key]) == {x.key: "no-worker"}

w = await Worker(s.address, loop=s.loop)
w = await Worker(s.address)

while x.status != "finished":
await asyncio.sleep(0.01)
Expand Down
12 changes: 6 additions & 6 deletions distributed/tests/test_nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,8 @@ async def test_wait_for_scheduler():

@gen_cluster(nthreads=[], client=True)
async def test_environment_variable(c, s):
a = Nanny(s.address, loop=s.loop, memory_limit=0, env={"FOO": "123"})
b = Nanny(s.address, loop=s.loop, memory_limit=0, env={"FOO": "456"})
a = Nanny(s.address, memory_limit=0, env={"FOO": "123"})
b = Nanny(s.address, memory_limit=0, env={"FOO": "456"})
await asyncio.gather(a, b)
results = await c.run(lambda: os.environ["FOO"])
assert results == {a.worker_address: "123", b.worker_address: "456"}
Expand All @@ -288,18 +288,18 @@ async def test_environment_variable_by_config(c, s, monkeypatch):

with dask.config.set({"distributed.nanny.environ": "456"}):
with pytest.raises(TypeError, match="configuration must be of type dict"):
Nanny(s.address, loop=s.loop, memory_limit=0)
Nanny(s.address, memory_limit=0)

with dask.config.set({"distributed.nanny.environ": {"FOO": "456"}}):

# precedence
# kwargs > env var > config

with mock.patch.dict(os.environ, {"FOO": "BAR"}, clear=True):
a = Nanny(s.address, loop=s.loop, memory_limit=0, env={"FOO": "123"})
x = Nanny(s.address, loop=s.loop, memory_limit=0)
a = Nanny(s.address, memory_limit=0, env={"FOO": "123"})
x = Nanny(s.address, memory_limit=0)

b = Nanny(s.address, loop=s.loop, memory_limit=0)
b = Nanny(s.address, memory_limit=0)

await asyncio.gather(a, b, x)
results = await c.run(lambda: os.environ["FOO"])
Expand Down
4 changes: 2 additions & 2 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,7 @@ async def test_file_descriptors(c, s):
num_fds_1 = proc.num_fds()

N = 20
nannies = await asyncio.gather(*(Nanny(s.address, loop=s.loop) for _ in range(N)))
nannies = await asyncio.gather(*(Nanny(s.address) for _ in range(N)))

while len(s.workers) < N:
await asyncio.sleep(0.1)
Expand Down Expand Up @@ -2220,7 +2220,7 @@ async def test_worker_name_collision(s, a):
with raises_with_cause(
RuntimeError, None, ValueError, f"name taken, {a.name!r}"
):
await Worker(s.address, name=a.name, loop=s.loop, host="127.0.0.1")
await Worker(s.address, name=a.name, host="127.0.0.1")

s.validate_state()
assert set(s.workers) == {a.address}
Expand Down
14 changes: 3 additions & 11 deletions distributed/tests/test_steal.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ async def test_new_worker_steals(c, s, a):
while len(a.tasks) < 10:
await asyncio.sleep(0.01)

b = await Worker(s.address, loop=s.loop, nthreads=1, memory_limit=MEMORY_LIMIT)
b = await Worker(s.address, nthreads=1, memory_limit=MEMORY_LIMIT)

result = await total
assert result == sum(map(inc, range(100)))
Expand Down Expand Up @@ -478,7 +478,7 @@ async def test_steal_resource_restrictions(c, s, a):
await asyncio.sleep(0.01)
assert len(a.tasks) == 101

b = await Worker(s.address, loop=s.loop, nthreads=1, resources={"A": 4})
b = await Worker(s.address, nthreads=1, resources={"A": 4})

while not b.tasks or len(a.tasks) == 101:
await asyncio.sleep(0.01)
Expand All @@ -500,15 +500,7 @@ async def test_steal_resource_restrictions_asym_diff(c, s, a):
await asyncio.sleep(0.01)
assert len(a.tasks) == 101

b = await Worker(
s.address,
loop=s.loop,
nthreads=1,
resources={
"A": 4,
"B": 5,
},
)
b = await Worker(s.address, nthreads=1, resources={"A": 4, "B": 5})

while not b.tasks or len(a.tasks) == 101:
await asyncio.sleep(0.01)
Expand Down
31 changes: 23 additions & 8 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import psutil
import pytest
from tlz import first, pluck, sliding_window
from tornado.ioloop import IOLoop

import dask
from dask import delayed
Expand All @@ -38,7 +39,7 @@
wait,
)
from distributed.comm.registry import backends
from distributed.compatibility import LINUX, WINDOWS
from distributed.compatibility import LINUX, WINDOWS, to_thread
from distributed.core import CommClosedError, Status, rpc
from distributed.diagnostics import nvml
from distributed.diagnostics.plugin import PipInstall
Expand Down Expand Up @@ -524,8 +525,22 @@ async def test_gather_missing_workers_replicated(c, s, a, b, missing_first):

@gen_cluster(nthreads=[])
async def test_io_loop(s):
async with Worker(s.address, loop=s.loop) as w:
assert w.io_loop is s.loop
async with Worker(s.address) as w:
assert w.io_loop is w.loop is s.loop


@gen_cluster(nthreads=[])
async def test_io_loop_alternate_loop(s, loop):
async def main():
with pytest.warns(
DeprecationWarning,
match=r"The `loop` argument to `Worker` is ignored, and will be "
r"removed in a future release. The Worker always binds to the current loop",
):
async with Worker(s.address, loop=loop) as w:
assert w.io_loop is w.loop is IOLoop.current()

await to_thread(asyncio.run, main())


@gen_cluster(client=True)
Expand Down Expand Up @@ -1004,7 +1019,7 @@ async def test_worker_fds(s):
proc = psutil.Process()
before = psutil.Process().num_fds()

async with Worker(s.address, loop=s.loop):
async with Worker(s.address):
assert proc.num_fds() > before

while proc.num_fds() > before:
Expand Down Expand Up @@ -1182,7 +1197,7 @@ def func(dask_scheduler):
@gen_cluster(nthreads=[], client=True)
async def test_scheduler_address_config(c, s):
with dask.config.set({"scheduler-address": s.address}):
worker = await Worker(loop=s.loop)
worker = await Worker()
assert worker.scheduler.address == s.address
await worker.close()

Expand Down Expand Up @@ -1276,7 +1291,7 @@ def test_startup2():
assert list(result.values()) == [False] * 2

# Start a worker and check that startup is not run
worker = await Worker(s.address, loop=s.loop)
worker = await Worker(s.address)
result = await c.run(test_import, workers=[worker.address])
assert list(result.values()) == [False]
await worker.close()
Expand All @@ -1290,7 +1305,7 @@ def test_startup2():
assert list(result.values()) == [True] * 2

# Start a worker and check it is ran on it
worker = await Worker(s.address, loop=s.loop)
worker = await Worker(s.address)
result = await c.run(test_import, workers=[worker.address])
assert list(result.values()) == [True]
await worker.close()
Expand All @@ -1304,7 +1319,7 @@ def test_startup2():
assert list(result.values()) == [True] * 2

# Start a worker and check it is ran on it
worker = await Worker(s.address, loop=s.loop)
worker = await Worker(s.address)
result = await c.run(test_import, workers=[worker.address])
assert list(result.values()) == [True]
result = await c.run(test_startup2, workers=[worker.address])
Expand Down
3 changes: 0 additions & 3 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -885,14 +885,12 @@ def test_func(*args, **kwargs):
async def start_cluster(
nthreads: list[tuple[str, int] | tuple[str, int, dict]],
scheduler_addr: str,
loop: IOLoop | None = None,
security: Security | dict[str, Any] | None = None,
Worker: type[ServerNode] = Worker,
scheduler_kwargs: dict[str, Any] = {},
worker_kwargs: dict[str, Any] = {},
) -> tuple[Scheduler, list[ServerNode]]:
s = await Scheduler(
loop=loop,
validate=True,
security=security,
port=0,
Expand All @@ -906,7 +904,6 @@ async def start_cluster(
nthreads=ncore[1],
name=i,
security=security,
loop=loop,
validate=True,
host=ncore[0],
**(
Expand Down
22 changes: 14 additions & 8 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,6 @@ class Worker(ServerNode):
data: MutableMapping, type, None
The object to use for storage, builds a disk-backed LRU dict by default
nthreads: int, optional
loop: tornado.ioloop.IOLoop
local_directory: str, optional
Directory where we place local resources
name: str, optional
Expand Down Expand Up @@ -514,7 +513,7 @@ def __init__(
*,
scheduler_file: str | None = None,
nthreads: int | None = None,
loop: IOLoop | None = None,
loop: IOLoop | None = None, # Deprecated
local_dir: None = None, # Deprecated, use local_directory instead
local_directory: str | None = None,
services: dict | None = None,
Expand Down Expand Up @@ -579,6 +578,13 @@ def __init__(
DeprecationWarning,
stacklevel=2,
)
if loop is not None:
warnings.warn(
"The `loop` argument to `Worker` is ignored, and will be removed in a future release. "
"The Worker always binds to the current loop",
DeprecationWarning,
stacklevel=2,
)
self.tasks = {}
self.waiting_for_data_count = 0
self.has_what = defaultdict(set)
Expand Down Expand Up @@ -729,7 +735,7 @@ def __init__(
self.connection_args = self.security.get_connection_args("worker")

self.actors = {}
self.loop = loop or IOLoop.current()
self.loop = self.io_loop = IOLoop.current()

# Common executors always available
self.executors = {
Expand Down Expand Up @@ -819,7 +825,6 @@ def __init__(
super().__init__(
handlers=handlers,
stream_handlers=stream_handlers,
io_loop=self.loop,
connection_args=self.connection_args,
**kwargs,
)
Expand Down Expand Up @@ -878,7 +883,7 @@ def __init__(

if lifetime is None:
lifetime = dask.config.get("distributed.worker.lifetime.duration")
self.lifetime = parse_timedelta(lifetime)
lifetime = parse_timedelta(lifetime)

if lifetime_stagger is None:
lifetime_stagger = dask.config.get("distributed.worker.lifetime.stagger")
Expand All @@ -888,9 +893,10 @@ def __init__(
lifetime_restart = dask.config.get("distributed.worker.lifetime.restart")
self.lifetime_restart = lifetime_restart

if self.lifetime:
self.lifetime += (random.random() * 2 - 1) * lifetime_stagger
self.io_loop.call_later(self.lifetime, self.close_gracefully)
if lifetime:
lifetime += (random.random() * 2 - 1) * lifetime_stagger
self.io_loop.call_later(lifetime, self.close_gracefully)
self.lifetime = lifetime

self._async_instructions = set()

Expand Down