Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Top-level cluster efficiency metric #7565

Open
crusaderky opened this issue Feb 21, 2023 · 6 comments · Fixed by #7586
Open

Top-level cluster efficiency metric #7565

crusaderky opened this issue Feb 21, 2023 · 6 comments · Fixed by #7586
Assignees

Comments

@crusaderky
Copy link
Collaborator

crusaderky commented Feb 21, 2023

Executive summary

We want to answer the question: Where is Dask wasting time? How much time is not "good" CPU time?
This is typically the very first question when analysing performance, and it's a question we're not answering straightforwardly now.

This ticket suggests creating a new cumulative Prometheus scheduler metric, which is actually collected on the workers.
This would be a cumulative time, whose total increases by construction by N seconds each second where N is the number of cluster-wide threads, and is broken down as follows:

  • Time spent with idle threads
    • Due to pause
    • Due to imperfectly pipelined data gathering from other workers
    • Due to not receiving any compute-task operation from the scheduler
  • Time spent computing currently running tasks (not cumulative)
  • Time spent computing terminated tasks
    • Time spent computing failed tasks
    • Time spent computing cancelled tasks
    • Time spent re-computing tasks due to worker death
    • Time spent computing tasks successfully for the first time
      • CPU time in thread, a.k.a. the "good" time
      • non-CPU time in thread: I/O, GPU, GIL contention, load exceeding the host CPUs
      • Spill/unspill
      • Other

The question

One of the first metrics that most users want to know about is likely this:

  1. How efficient is the dask cluster?
    How does my end-to-end runtime compare to a perfect runtime?

Where a perfect runtime is achieved when the computation:

  • never halts to wait for data transfers
  • is never slown down by multithreading context switches or the GIL
  • never has to deal with spill/unspill
  • never pauses due to memory pressure
  • never loses data
  • never has to wait for more compute requests from the scheduler

In other words, the perfect runtime is the runtime you would get on the single-threaded dask scheduler, running on a host with enough RAM to fit the whole problem in memory and infinitely fast scheduling algorithm, divided by the number of cores on the cluster.

After the above is answered, performance analysis forks into two question:
2. Where my cluster is efficient, on which tasks does it spend most time? Can I change the algorithm to speed it up?
3. Where my cluster is inefficient, where does inefficiency come from?

I feel that, at the moment, question (2) above is the only one we're answering, through the metric of runtime per task prefix.
Questions (1) and (3) could use a single metric that breaks them down.

Design of the metric

I would like to add a new cumulative time metric on each worker, which adds up by definition to the number of threads on the worker:

  1. time spent by each thread running successful tasks
    = number of tasks in executing or resumed(executing->fetch) state with a final transition to memory, multiplied by the number of seconds they staid in the state(s). For long-running tasks, add the time before the call to secede().
  2. time spent by each thread running cancelled and erred tasks
    = number of tasks in cancelled(executing) state, plus tasks in executing or resumed(executing->fetch) state with a final transition to erred. For long-running tasks, add the time before the call to secede().
  3. time when threads were idle because the worker is paused
    = number of threads - (1) - (2) if paused else 0
  4. time when threads were idle, but there were tasks waiting for data
    = min(number of threads - (1) - (2) - (3), number of tasks in waiting state)
  5. time when threads were idle and not enough tasks were waiting for data either, and the worker is running
    = number of threads - (1) - (2) - (3) - (4)

(5) is something that is between the client and the scheduler - further investigation is out of scope for this PR.
(4) can be investigated with #7217 (not in the same metric / plot).
(3) needs to be investigated with memory metrics - we've got plenty of those
(2) should hopefully be rare enough not to warrant further breakdown, but if needed it can follow the same treatment as (1).
(1) can be additively broken down, per task, as follows:

1a. Time between the moment when the task transitioned to executing and when execute() actually started. This can be caused by an unresponsive event loop.
1b. Time spent deserializing the run_spec
1c. Time spent unspilling dependencies
1d. Threadpool spinup and thread sync: between the call to self.loop.run_in_executor in the main thread and the start of the actual user code in the thread
1e. Time in thread, spent actually running the user code: time.thread_time()
1f. Time in thread, spent in I/O, GPU, GIL contention, or running more threads than there are CPUs available (in linux top: load average > 1): time.perf_counter() - time.thread_time()
1g. Time between the end of user code in thread and the return to the main thread in Worker.execute()` - again this can be high if the event loop is unresponsive
1h. Time spent spilling the output

All of the above can be individually metered. However, I suspect that the only material ones will be (1c), (1e), (1f), and (1h).
I think it's safe to clump together (1a), (1b), (1d), and (1g) in a single "other" timing.

Note that the above granular breakdown is for finished tasks. Because of the separation between successful and erred/cancelled tasks, and also because we can't acquire time.thread_time() until the user code is finished, currently running tasks would need a generic measure which is the total of (1) and (2) and that, unlike everything else, is not ever-increasing but goes up and down with time.

What do we do with the metric?

The above breakdown of (1) describes information that we can collect at task level on the workers. The disaggregated data would be quite overwhelming in cardinality.

I think it's best to aggregate it to a per-worker total and have that total sent to the scheduler through the heartbeat, where it is aggregated again and exported to Prometheus.

I'm typically wary of weighing down the heartbeat with new information, but I think there is good reason here:

  • Collecting on the scheduler means you won't lose data in case of worker shutdown or abrupt termination
  • Cardinality when exporting to Prometheus from the scheduler is 1 instead of however many workers are on the cluster
  • The scheduler knows things that the workers don't - see below.

Where do we go from here?

These are additional deliverables in addition to the core Prometheus metric described above:

  • Write a context manager on the client that builds a pie chart of the time spent during a computation
  • Break down initial and final times spent during a compute() call: optimization, graph upload, initial scheduler time, and final gather.
  • Separately measure time spent recomputing keys in case of abrupt worker termination. This could be straightforwardly added by having the scheduler send a flag to the worker {op: compute-task is_recompute: true}. It would effectively be a point (6) above.
  • Figure out a CUDA-specific extension to separate "good" GPU time from the rest of non-CPU in-thread time

CC @fjetter @ntabris

@gjoseph92
Copy link
Collaborator

Thanks for writing this up, @crusaderky. This sounds exactly like a metric I've wanted to have for a long time. As you said, it's currently difficult enough to answer "how well is dask utilizing my resources". It's often impossible to answer, "why is dask not utilizing my resources well—how is it spending time when not running tasks?"

One thing we've noticed from looking at metrics on Coiled is that the worker event loop is blocked a lot more often than you'd expect. It's not uncommon to see >~20% of workers not respond to a given /metrics scrape. When implementing these metrics, I think we always need to assume that other things (GIL, GC) might be blocking during a given span of time we measure. And we'll need to ensure that the metrics are still valid even with pauses between their collection, and incomplete reporting from all workers.

Implementation-wise, do you think we should do #7364 first, to make writing these metrics more convenient?

I think it's best to aggregate it to a per-worker total and have that total sent to the scheduler through the heartbeat, where it is aggregated again and exported to Prometheus

I'm skeptical of needing to do this. Maybe we'll find that it's necessary/advantageous, but I definitely think that the first implementation of the metric should be Prometheus-only, just from the workers, aggregated to worker level (not task level). If we find that the things you've mentioned are true (we're losing data from worker termination—but then the scheduler wouldn't get the data either; high cardinality—doesn't seem higher cardinality than all the other metrics we're exporting per-worker), then we can push it over the heartbeat, but starting without heartbeats seems much simpler.

@crusaderky
Copy link
Collaborator Author

we're losing data from worker termination—but then the scheduler wouldn't get the data either

If you send the data through the heartbeat, when a worker dies the scheduler won't get the data since the previous heartbeat.
The problem is when you don't have prometheus and you just want to collect the data before and after a compute, like MemorySampler does. If a worker dies in such a design, it will look like your end-to-end computation got faster since it will look like you never spent any work on the dead worker.

@crusaderky
Copy link
Collaborator Author

Implementation-wise, do you think we should do #7364 first, to make writing these metrics more convenient?

I don't think the throwaway work is enough to justify a blocker IMHO

@gjoseph92
Copy link
Collaborator

The problem is when you don't have prometheus and you just want to collect the data before and after a compute

Of course—if you don't have prometheus, then a prometheus metric isn't very useful to you.

I think this is the main argument for pushing it over the heartbeat: making the information available natively on the dashboard, or through a MemorySampler type thing.

We'll probably want this eventually, but I'm saying we should be able to start and iterate just with Prometheus. Exposing it over the heartbeat, making a bokeh dashboard, making a client utility, etc. could be separate steps, no?

@crusaderky
Copy link
Collaborator Author

We'll probably want this eventually, but I'm saying we should be able to start and iterate just with Prometheus. Exposing it over the heartbeat, making a bokeh dashboard, making a client utility, etc. could be separate steps, no?

Yes indeed.

@crusaderky
Copy link
Collaborator Author

I opened #7665 to list all possible follow-ups to #7586.
I think we should re-scope this ticket so that it covers the bare minimum follow-ups that make the functionality useful to final users; I would suggests:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants