-
-
Notifications
You must be signed in to change notification settings - Fork 717
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Top-level cluster efficiency metric #7565
Comments
Thanks for writing this up, @crusaderky. This sounds exactly like a metric I've wanted to have for a long time. As you said, it's currently difficult enough to answer "how well is dask utilizing my resources". It's often impossible to answer, "why is dask not utilizing my resources well—how is it spending time when not running tasks?" One thing we've noticed from looking at metrics on Coiled is that the worker event loop is blocked a lot more often than you'd expect. It's not uncommon to see >~20% of workers not respond to a given Implementation-wise, do you think we should do #7364 first, to make writing these metrics more convenient?
I'm skeptical of needing to do this. Maybe we'll find that it's necessary/advantageous, but I definitely think that the first implementation of the metric should be Prometheus-only, just from the workers, aggregated to worker level (not task level). If we find that the things you've mentioned are true (we're losing data from worker termination—but then the scheduler wouldn't get the data either; high cardinality—doesn't seem higher cardinality than all the other metrics we're exporting per-worker), then we can push it over the heartbeat, but starting without heartbeats seems much simpler. |
If you send the data through the heartbeat, when a worker dies the scheduler won't get the data since the previous heartbeat. |
I don't think the throwaway work is enough to justify a blocker IMHO |
Of course—if you don't have prometheus, then a prometheus metric isn't very useful to you. I think this is the main argument for pushing it over the heartbeat: making the information available natively on the dashboard, or through a MemorySampler type thing. We'll probably want this eventually, but I'm saying we should be able to start and iterate just with Prometheus. Exposing it over the heartbeat, making a bokeh dashboard, making a client utility, etc. could be separate steps, no? |
Yes indeed. |
Executive summary
We want to answer the question: Where is Dask wasting time? How much time is not "good" CPU time?
This is typically the very first question when analysing performance, and it's a question we're not answering straightforwardly now.
This ticket suggests creating a new cumulative Prometheus scheduler metric, which is actually collected on the workers.
This would be a cumulative time, whose total increases by construction by N seconds each second where N is the number of cluster-wide threads, and is broken down as follows:
The question
One of the first metrics that most users want to know about is likely this:
How does my end-to-end runtime compare to a perfect runtime?
Where a perfect runtime is achieved when the computation:
In other words, the perfect runtime is the runtime you would get on the single-threaded dask scheduler, running on a host with enough RAM to fit the whole problem in memory and infinitely fast scheduling algorithm, divided by the number of cores on the cluster.
After the above is answered, performance analysis forks into two question:
2. Where my cluster is efficient, on which tasks does it spend most time? Can I change the algorithm to speed it up?
3. Where my cluster is inefficient, where does inefficiency come from?
I feel that, at the moment, question (2) above is the only one we're answering, through the metric of runtime per task prefix.
Questions (1) and (3) could use a single metric that breaks them down.
Design of the metric
I would like to add a new cumulative time metric on each worker, which adds up by definition to the number of threads on the worker:
= number of tasks in
executing
orresumed(executing->fetch)
state with a final transition tomemory
, multiplied by the number of seconds they staid in the state(s). Forlong-running
tasks, add the time before the call to secede().= number of tasks in
cancelled(executing)
state, plus tasks inexecuting
orresumed(executing->fetch)
state with a final transition toerred
. Forlong-running
tasks, add the time before the call to secede().= number of threads - (1) - (2) if paused else 0
= min(number of threads - (1) - (2) - (3), number of tasks in
waiting
state)= number of threads - (1) - (2) - (3) - (4)
(5) is something that is between the client and the scheduler - further investigation is out of scope for this PR.
(4) can be investigated with #7217 (not in the same metric / plot).
(3) needs to be investigated with memory metrics - we've got plenty of those
(2) should hopefully be rare enough not to warrant further breakdown, but if needed it can follow the same treatment as (1).
(1) can be additively broken down, per task, as follows:
1a. Time between the moment when the task transitioned to executing and when execute() actually started. This can be caused by an unresponsive event loop.
1b. Time spent deserializing the run_spec
1c. Time spent unspilling dependencies
1d. Threadpool spinup and thread sync: between the call to
self.loop.run_in_executor
in the main thread and the start of the actual user code in the thread1e. Time in thread, spent actually running the user code:
time.thread_time()
1f. Time in thread, spent in I/O, GPU, GIL contention, or running more threads than there are CPUs available (in linux
top
: load average > 1):time.perf_counter() - time.thread_time()
1g. Time between the end of user code in thread and the return to the main thread in Worker.execute()` - again this can be high if the event loop is unresponsive
1h. Time spent spilling the output
All of the above can be individually metered. However, I suspect that the only material ones will be (1c), (1e), (1f), and (1h).
I think it's safe to clump together (1a), (1b), (1d), and (1g) in a single "other" timing.
Note that the above granular breakdown is for finished tasks. Because of the separation between successful and erred/cancelled tasks, and also because we can't acquire
time.thread_time()
until the user code is finished, currently running tasks would need a generic measure which is the total of (1) and (2) and that, unlike everything else, is not ever-increasing but goes up and down with time.What do we do with the metric?
The above breakdown of (1) describes information that we can collect at task level on the workers. The disaggregated data would be quite overwhelming in cardinality.
I think it's best to aggregate it to a per-worker total and have that total sent to the scheduler through the heartbeat, where it is aggregated again and exported to Prometheus.
I'm typically wary of weighing down the heartbeat with new information, but I think there is good reason here:
Where do we go from here?
These are additional deliverables in addition to the core Prometheus metric described above:
{op: compute-task is_recompute: true}
. It would effectively be a point (6) above.CC @fjetter @ntabris
The text was updated successfully, but these errors were encountered: