Description
I feel like this has been discussed tangentially in other places, but I couldn't find an issue just for this. The scheduler generally seems a little too eager to schedule tasks in a way that requires data transfer, when transfer could be avoided (#5253 aims to address another aspect of this).
In our worker_objective
function used to estimate how long it will take before a given task can start running on a given worker, we just consider basically n_bytes_to_transfer / self.bandwidth
. But there's a lot more than that that has to happen in reality.
Here are some things we're not accounting for:
- Fixed-cost network latency between workers, both receiver->sender to request the data, and sender->receiver to send it
- Serialization time on the sending worker
- Deserialization time on the receiving worker
- Possible disk-write (plus serialization) time on the receiving worker, if it has to spill some other key to disk to make room
- Possible disk-read (plus deserialization) time on the sending worker, if the requested value is spilled
- The worker has a queue of keys to fetch; there may already be keys ahead of this one waiting to be fetched (xref Should
Worker.data_needed
be priority-ordered? #5323) - The fetch itself is enqueued onto the worker's event loop, and is async. We've recently learned that the event loop can spend a surprising amount of time waiting for the GIL if user tasks also hold the GIL (Testing network performance #5258 (comment)). So worst-case there could be multiple (O(~10?)) 5ms pauses between the worker intending to fetch and a message even getting sent (let alone received) over the network
- Transferring might slow down other tasks currently running on the sender/receiver (GIL, CPU to (de)serialize), plus may require nontrivial memory, potentially pausing or even killing one or both workers (not time-estimate related, but worth considering someday)
Now I'm not at all saying we should try to actually measure these things. And when the data to transfer is big, what we have is pretty accurate.
But we should probably add some fixed-cost penalty for any transfer (covering network latency, GIL-pausing, etc.). Ideally at least latency can be measured (is it already on the scheduler?).
We should also just make decide_worker
less inclined to cause data transfer. Maybe, if one worker already holds all the dependencies for a task, only pick a different worker if:
- we estimate a different worker could start the task some factor sooner (1.2x? 2x?)
- the estimated delay on the no-transfer worker is non-trivial (> 100ms?)
Test to look at estimated vs actual transfer times
@gen_cluster(client=True)
async def test_transfer_cost(client, s, a, b):
from dask.utils import format_time, format_bytes
nbytes = 1024
print(f"Transferring {format_bytes(nbytes)}")
x = client.submit(lambda: "x" * nbytes, workers=a.address, key="x")
await x
bandwidth = s.bandwidth
y = client.submit(lambda x: None, x, workers=b.address, key="y")
await y
data_creation_duration = s.get_task_duration(s.tasks["x"])
print(f"Data creation: {format_time(data_creation_duration)}")
actual_nbytes = s.tasks["x"].nbytes
estimated_xfer_duration = actual_nbytes / bandwidth
print(f"Estimated transfer time: {format_time(estimated_xfer_duration)}")
xfer_duration = b.incoming_transfer_log[0]["duration"]
print(f"Actual transfer time: {format_time(xfer_duration)}")
avg_latency = (a.latency + b.latency) / 2
print(f"Worker latency: {format_time(avg_latency)}")
print(
f"Transfer estimate is {data_creation_duration/estimated_xfer_duration:.1f}x faster than data creation, "
f"{avg_latency/estimated_xfer_duration:.1f}x faster than worker latency, "
f"{xfer_duration/estimated_xfer_duration:.1f}x faster than reality "
)
assert 0.1 < xfer_duration / estimated_xfer_duration < 10
Transferring 1.00 kiB
Data creation: 44.11 us
Estimated transfer time: 10.73 us
Actual transfer time: 9.37 ms
Worker latency: 2.07 ms
Transfer estimate is 4.1x faster than data creation, 193.2x faster than worker latency, 873.0x faster than reality
Transferring 1.00 MiB
Data creation: 387.91 us
Estimated transfer time: 10.49 ms
Actual transfer time: 10.75 ms
Worker latency: 2.13 ms
Transfer estimate is 0.0x faster than data creation, 0.2x faster than worker latency, 1.0x faster than reality