-
-
Notifications
You must be signed in to change notification settings - Fork 717
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Expose message-bytes-limit
in config
#7074
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -339,6 +339,17 @@ properties: | |
To avoid this, Dask usually used a file-based lock. | ||
However, on some systems file-based locks don't work. | ||
This is particularly common on HPC NFS systems, where users may want to set this to false. | ||
transfer: | ||
type: object | ||
description: | | ||
Configuration setting for data transfer between workers | ||
properties: | ||
message-bytes-limit: | ||
type: | ||
- string | ||
- integer | ||
description: | | ||
The maximum size of a message sent between workers | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This description is not correct, no? Rather, IIUC, this is the maximum amount of "extra" data we'll ask for from a worker once we've picked a worker to transfer a task from. So an individual message may be (much) larger than There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It does not explain the fine print, that's a fair point. I'll file a follow-up PR that highlights that this is not an absolute maximum. |
||
connections: | ||
type: object | ||
description: | | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ | |
import asyncio | ||
import heapq | ||
import logging | ||
import math | ||
import operator | ||
import random | ||
import sys | ||
|
@@ -1129,7 +1130,7 @@ class WorkerState: | |
#: :meth:`BaseWorker.gather_dep`. Multiple small tasks that can be fetched from the | ||
#: same worker will be clustered in a single instruction as long as their combined | ||
#: size doesn't exceed this value. | ||
transfer_message_target_bytes: int | ||
transfer_message_bytes_limit: float | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Driveby: Renamed to match other attribute names (e.g., |
||
|
||
#: All and only tasks with ``TaskState.state == 'missing'``. | ||
missing_dep_flight: set[TaskState] | ||
|
@@ -1254,6 +1255,7 @@ def __init__( | |
validate: bool = True, | ||
transition_counter_max: int | Literal[False] = False, | ||
transfer_incoming_bytes_limit: int | None = None, | ||
transfer_message_bytes_limit: float = math.inf, | ||
): | ||
self.nthreads = nthreads | ||
|
||
|
@@ -1292,7 +1294,7 @@ def __init__( | |
self.in_flight_tasks = set() | ||
self.executed_count = 0 | ||
self.long_running = set() | ||
self.transfer_message_target_bytes = int(50e6) # 50 MB | ||
self.transfer_message_bytes_limit = transfer_message_bytes_limit | ||
self.log = deque(maxlen=100_000) | ||
self.stimulus_log = deque(maxlen=10_000) | ||
self.transition_counter = 0 | ||
|
@@ -1635,7 +1637,7 @@ def _select_keys_for_gather( | |
"""Helper of _ensure_communicating. | ||
|
||
Fetch all tasks that are replicated on the target worker within a single | ||
message, up to transfer_message_target_bytes or until we reach the limit | ||
message, up to transfer_message_bytes_limit or until we reach the limit | ||
for the size of incoming data transfers. | ||
""" | ||
to_gather: list[TaskState] = [] | ||
|
@@ -1644,10 +1646,10 @@ def _select_keys_for_gather( | |
if self.transfer_incoming_bytes_limit is not None: | ||
bytes_left_to_fetch = min( | ||
self.transfer_incoming_bytes_limit - self.transfer_incoming_bytes, | ||
self.transfer_message_target_bytes, | ||
self.transfer_message_bytes_limit, | ||
) | ||
else: | ||
bytes_left_to_fetch = self.transfer_message_target_bytes | ||
bytes_left_to_fetch = self.transfer_message_bytes_limit | ||
|
||
while available: | ||
ts = available.peek() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
XREF: #6977
I went with singular
transfer
to be coherent with variable naming.