diff --git a/distributed/distributed-schema.yaml b/distributed/distributed-schema.yaml index 1e6daf54253..8e159616800 100644 --- a/distributed/distributed-schema.yaml +++ b/distributed/distributed-schema.yaml @@ -339,6 +339,17 @@ properties: To avoid this, Dask usually used a file-based lock. However, on some systems file-based locks don't work. This is particularly common on HPC NFS systems, where users may want to set this to false. + transfer: + type: object + description: | + Configuration setting for data transfer between workers + properties: + message-bytes-limit: + type: + - string + - integer + description: | + The maximum size of a message sent between workers connections: type: object description: | diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index d3a93a7a651..0653af7f883 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -78,6 +78,8 @@ distributed: blocked-handlers: [] multiprocessing-method: spawn use-file-locking: True + transfer: + message-bytes-limit: 50MB connections: # Maximum concurrent connections for data outgoing: 50 # This helps to control network saturation incoming: 10 diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 24204481f02..5e71c499000 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -673,8 +673,8 @@ async def test_clean(c, s, a, b): @gen_cluster(client=True) async def test_message_breakup(c, s, a, b): n = 100_000 - a.state.transfer_message_target_bytes = 10 * n - b.state.transfer_message_target_bytes = 10 * n + a.state.transfer_message_bytes_limit = 10 * n + b.state.transfer_message_bytes_limit = 10 * n xs = [ c.submit(mul, b"%d" % i, n, key=f"x{i}", workers=[a.address]) for i in range(30) ] @@ -809,10 +809,10 @@ async def test_multiple_transfers(c, s, w1, w2, w3): @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3) async def test_share_communication(c, s, w1, w2, w3): x = c.submit( - mul, b"1", int(w3.transfer_message_target_bytes + 1), workers=w1.address + mul, b"1", int(w3.transfer_message_bytes_limit + 1), workers=w1.address ) y = c.submit( - mul, b"2", int(w3.transfer_message_target_bytes + 1), workers=w2.address + mul, b"2", int(w3.transfer_message_bytes_limit + 1), workers=w2.address ) await wait([x, y]) await c._replicate([x, y], workers=[w1.address, w2.address]) @@ -826,8 +826,8 @@ async def test_share_communication(c, s, w1, w2, w3): @pytest.mark.xfail(reason="very high flakiness") @gen_cluster(client=True) async def test_dont_overlap_communications_to_same_worker(c, s, a, b): - x = c.submit(mul, b"1", int(b.transfer_message_target_bytes + 1), workers=a.address) - y = c.submit(mul, b"2", int(b.transfer_message_target_bytes + 1), workers=a.address) + x = c.submit(mul, b"1", int(b.transfer_message_bytes_limit + 1), workers=a.address) + y = c.submit(mul, b"2", int(b.transfer_message_bytes_limit + 1), workers=a.address) await wait([x, y]) z = c.submit(add, x, y, workers=b.address) await wait(z) @@ -3001,9 +3001,9 @@ async def test_acquire_replicas_with_no_priority(c, s, a, b): @gen_cluster(client=True, nthreads=[("", 1)]) async def test_acquire_replicas_large_data(c, s, a): """When acquire-replicas is used to acquire multiple sizeable tasks, it respects - transfer_message_target_bytes and acquires them over multiple iterations. + transfer_message_bytes_limit and acquires them over multiple iterations. """ - size = a.state.transfer_message_target_bytes // 5 - 10_000 + size = a.state.transfer_message_bytes_limit // 5 - 10_000 class C: def __sizeof__(self): diff --git a/distributed/tests/test_worker_state_machine.py b/distributed/tests/test_worker_state_machine.py index 2f1861252c4..eaec94725c7 100644 --- a/distributed/tests/test_worker_state_machine.py +++ b/distributed/tests/test_worker_state_machine.py @@ -1013,13 +1013,13 @@ async def test_deprecated_worker_attributes(s, a, b): @pytest.mark.parametrize( "nbytes,n_in_flight", [ - # Note: transfer_message_target_bytes = 50e6 bytes (int(10e6), 3), (int(20e6), 2), (int(30e6), 1), ], ) def test_aggregate_gather_deps(ws, nbytes, n_in_flight): + ws.transfer_message_bytes_limit = int(50e6) ws2 = "127.0.0.1:2" instructions = ws.handle_stimulus( AcquireReplicasEvent( @@ -1066,7 +1066,7 @@ def test_gather_priority(ws): }, # Substantial nbytes prevents transfer_incoming_count_limit to be # overridden by transfer_incoming_bytes_throttle_threshold, - # but it's less than transfer_message_target_bytes + # but it's less than transfer_message_bytes_limit nbytes={f"x{i}": 4 * 2**20 for i in range(1, 9)}, stimulus_id="compute1", ), diff --git a/distributed/worker.py b/distributed/worker.py index 8663705941b..29ef7939f6f 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -528,7 +528,9 @@ def __init__( self.transfer_outgoing_count_limit = dask.config.get( "distributed.worker.connections.incoming" ) - + transfer_message_bytes_limit = parse_bytes( + dask.config.get("distributed.worker.transfer.message-bytes-limit") + ) self.threads = {} self.active_threads_lock = threading.Lock() @@ -767,6 +769,7 @@ def __init__( validate=validate, transition_counter_max=transition_counter_max, transfer_incoming_bytes_limit=transfer_incoming_bytes_limit, + transfer_message_bytes_limit=transfer_message_bytes_limit, ) BaseWorker.__init__(self, state) @@ -889,7 +892,7 @@ def data(self) -> MutableMapping[str, Any]: ready = DeprecatedWorkerStateAttribute() tasks = DeprecatedWorkerStateAttribute() target_message_size = DeprecatedWorkerStateAttribute( - target="transfer_message_target_bytes" + target="transfer_message_bytes_limit" ) total_out_connections = DeprecatedWorkerStateAttribute( target="transfer_incoming_count_limit" diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index cffe77dcb7a..30b39f8d051 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -4,6 +4,7 @@ import asyncio import heapq import logging +import math import operator import random import sys @@ -1129,7 +1130,7 @@ class WorkerState: #: :meth:`BaseWorker.gather_dep`. Multiple small tasks that can be fetched from the #: same worker will be clustered in a single instruction as long as their combined #: size doesn't exceed this value. - transfer_message_target_bytes: int + transfer_message_bytes_limit: float #: All and only tasks with ``TaskState.state == 'missing'``. missing_dep_flight: set[TaskState] @@ -1254,6 +1255,7 @@ def __init__( validate: bool = True, transition_counter_max: int | Literal[False] = False, transfer_incoming_bytes_limit: int | None = None, + transfer_message_bytes_limit: float = math.inf, ): self.nthreads = nthreads @@ -1292,7 +1294,7 @@ def __init__( self.in_flight_tasks = set() self.executed_count = 0 self.long_running = set() - self.transfer_message_target_bytes = int(50e6) # 50 MB + self.transfer_message_bytes_limit = transfer_message_bytes_limit self.log = deque(maxlen=100_000) self.stimulus_log = deque(maxlen=10_000) self.transition_counter = 0 @@ -1635,7 +1637,7 @@ def _select_keys_for_gather( """Helper of _ensure_communicating. Fetch all tasks that are replicated on the target worker within a single - message, up to transfer_message_target_bytes or until we reach the limit + message, up to transfer_message_bytes_limit or until we reach the limit for the size of incoming data transfers. """ to_gather: list[TaskState] = [] @@ -1644,10 +1646,10 @@ def _select_keys_for_gather( if self.transfer_incoming_bytes_limit is not None: bytes_left_to_fetch = min( self.transfer_incoming_bytes_limit - self.transfer_incoming_bytes, - self.transfer_message_target_bytes, + self.transfer_message_bytes_limit, ) else: - bytes_left_to_fetch = self.transfer_message_target_bytes + bytes_left_to_fetch = self.transfer_message_bytes_limit while available: ts = available.peek() diff --git a/docs/source/worker-state.rst b/docs/source/worker-state.rst index a9dce7c26bf..cfb6a4d36fd 100644 --- a/docs/source/worker-state.rst +++ b/docs/source/worker-state.rst @@ -98,9 +98,10 @@ These :class:`TaskState` objects have their state set to ``fetch``, are put in t network. For each dependency we select a worker at random that has that data and collect the dependency from that worker. To improve bandwidth, we opportunistically gather other dependencies of other tasks that are known to be on that worker, up to a maximum of 50MB -of data (:attr:`~WorkerState.transfer_message_target_bytes`) - too little data and -bandwidth suffers, too much data and responsiveness suffers. We use a fixed number of 50 -connections (:attr:`~WorkerState.transfer_incoming_count_limit`, which is in turn +of data (:attr:`~WorkerState.transfer_message_bytes_limit`, which is acquired from the +configuration key ``distributed.worker.transfer.message-bytes-limit``) - too little data +and bandwidth suffers, too much data and responsiveness suffers. We use a fixed number of +50 connections (:attr:`~WorkerState.transfer_incoming_count_limit`, which is in turn acquired from the configuration key ``distributed.worker.connections.outgoing``) so as to avoid overly-fragmenting our network bandwidth.