Skip to content

Thoughts about the current state of scheduling #9077

Closed
@fjetter

Description

@fjetter

Context

I’ve been spending hundreds if not thousands of hours on various system in dask and wanted to write down a couple of thoughts about the current state of things. I’ll likely be less deeply involved with the project moving forward and it would be a shame if all of this knowledge would simply vanish. Nothing here is likely sufficiently documented to be actionable but if somebody ever digs into the topics I hope this can help people get up to speed a bit.

Occupancy

Performance

Occupancy is the measure of how much CPU time is estimated to be assigned to a worker. It is implemented by tracking the average runtime of a task by prefix, count the number per prefix and compute occupancy on demand by multiplying those two numbers.
This system that is currently used to compute this quantity was introduced to replace a "update occupancy every Xs but only if CPU utilization is low" which updated all workers at once (and subsequently ran in O(W * T_w) time, where W: number of workers and T_w: number of tasks on workers).
Whether this system is good in terms of performance depends on how often it is read and modified. Likely computing and caching occupancy right away would be a sensible improvement (let's see if I have time)

Functionality

Occupancy is deeply integrated into scheduling in two components.

Its primary function is to be used as a measure in Scheduler.worker_objective to find the optimal worker to assign a task to. The naive thought is to minimize "time to start" but occupancy is not the right measure for this because occupancy does not account for priorities.
Since the scheduler is assigning tasks greedily (ignoring queuing for a moment) it is very often, but not always, the case that a newly assigned task beats almost all already assigned tasks in priority such that time to start is typically much, much smaller than the CPU occupancy.
Additionally, this worker objective is using CPU and network occupancy on equal footing. While network occupancy is also measured in seconds, it is time spent on a different channel (typically just waiting for data to arrive) and does not necessarily impact start time. Historically, experiments that would account network occupancy correctly, i.e. only counting dependencies that have to be fetched, accounting for already in flight would yield worse scheduling behavior.

Empirically, I found that the CPU occupancy would often overshadow the more relevant network occupancy strongly such that tasks are being assigned to workers that require fetching of many dependencies. This is often corrected by work stealing later on such that the impact is difficult to estimate.

The occupancy based worker_objective is working relatively well for independent tasks. In this trivial case, the scheduling breaks down to simply counting of assigned tasks. This is also what queuing is doing since in this case there are no dependencies and the assigned prefixes are mostly identical.

The other system that is backed by occupancy is idle/saturated classification. I’m outlining in #8889 that this is mostly obsolete and with a few tweaks can be gotten rid of entirely.

Conclusion

What should occupancy be replaced with? Probably just task counting. In most situations with homogeneous dependency distribution, the distribution of tasks can be effectively described as a homogeneous distribution of task counts per prefix. In other words, if every worker has about the same number of tasks per prefix, the tasks are very well distributed and different task runtimes are accounted for by separating prefixes which removes the update performance problem of the task durations.

To account for the dependencies of a task I would probably suggest to introduce a separate system, possibly based on AMM, that ensures dependencies are spread homogeneously. Task assignment would then only consider workers with the dependency (or in flight) but ignore actual transfer cost and ties are broken by prefix counts.

I could also see how the dependency rebalancing works nicely with order assignment groups, c.f. dask/dask#10562

Queuing

The effects of queuing are not very well understood but we know that it works. The working theory during development was that scheduler worker latency would cause “root task overproduction”, i.e. the greedy scheduling of root tasks to workers would cause the threadpool to just keep on going and reducer tasks would therefore only be scheduled too late.
This effect is certainly true. I once performed experiments where I could prove this but it only accounted for about 10-20% and this effect could be compensated by adjusting for the latency.

However, a much more important effect queuing has is on ordering. Consider a simple example of a tree reduction where we have N input branches that take a few root tasks before the first reductions start. In a concurrent execution environment, every branch should be considered of equal priority. However, dask performs a topological sorting and assigns priorities given the order in this sort (this is dask.order). In this tree reduction example, the reducer of the Nth computation branch gets a priority that is somewhere at N * split_out + some_constant for further reducers. This is clearly much higher than the priorities assigned to the root tasks of the first couple of branches or even their deeper reduction nodes.
Dask assigns all tasks in a greedy way, so without queuing we are assigning all N * M tasks simultaneously to all workers and we’re concurrently computing objectively very high priority and very low priority tasks.
Without queuing we’re assigning tasks greedily to all workers in a round robin fashion such that a worker holds very low and very high priority tasks and is unaware that there will be tasks cutting the line.
Some of those low priority tasks will eventually be computed before they should be. However, their reduction will only follow much later since tasks will cut the line. At first glance this may resemble a latency problem but it is a prioritization problem.
If the prioritization / ordering is done in a way that reflects the parallel execution nature, most of the queuing effects (plus the actual latency) contributions can be reproduced with greedy assignments. However, this ordering for parallel execution is not trivial and I couldn’t get a stable version up and running (mostly because this is all very entangled, see also work stealing, occupancy, etc.)

The conclusion to this would be to scrap dask.order and replace it with something simpler. I don’t have a good proposal yet. For DFs and Arrays this could likely lean on an expressions/layer structure since the topological sorting itself is already done naturally by the scheduler (it is not trivial but this direction is worth exploring)

Work stealing

Work stealing intends to rebalance a cluster that has inhomogeneous occupancy, i.e. some workers have much more work than others.
Naively, there are two use cases for this. The first are inhomogeneous workloads, e.g. due to data distribution or due to custom functions that vary their runtime.
The other reason is adaptive scaling, particularly the case where additional workers are added to the cluster.

In both situations, without rebalancing already assigned tasks (assuming the greedy assignment discussed above), workers are at risk of work starvation. Rebalancing assigned tasks is a natural fix to this problem.

However, in practice this rebalancing causes plenty of problems. Very naive reassignment of tasks can cause a lot of network load and even with accounting of network load and dependencies it can break ordering in a way that contributes to memory pressure (see paragraphs above). The current rebalancing that accounts for both in-memory and in-flight dependencies is rather expensive to compute such that work stealing can cause quite significant scheduler CPU load. If that wasn’t all, the current heuristic that flags thieves and victims based on idleness, saturation and overall load is quite fragile such that we can frequently observe an oscillation of tasks.

I currently believe that a lasting and working version of this rebalancing should not be running continuously in the background but rather under specific circumstances, e.g. after the cluster added more workers. I’m not convinced that a dynamic rebalancing of tasks due to data or runtime inhomogeneities is worth it. We don’t have any reliable benchmarks that ever showed benefits. Quite the contrary, many workloads perform better without stealing (see reasons above). Simply disabling it is also not possible since we are observing quite severe issues where the ordinary scheduling heuristics can cause tasks to be assigned to a single worker, e.g. in the presence of a shared dependency. Therefore, getting rid of work stealing likely requires rewiring the core scheduling heuristics.

Related issues

https://github.com/coiled/dask-engineering/issues/169
#8177
#8283

Metadata

Metadata

Assignees

No one assigned

    Labels

    discussionDiscussing a topic with no specific actions yet

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions