-
-
Notifications
You must be signed in to change notification settings - Fork 747
Description
Dask users expect some sort of grouping of tasks that is more granular than the lifetime of a whole cluster but more coarse than TaskGroup. Currently, this is delivered through Computation objects, which (AFAIK) are used by third parties only (such as Coiled).
A Computation is defined as the union of all TaskGroups that were created since the cluster last became idle. Unlike TaskGroups, which are forgotten as soon as all of their tasks are forgotten, Computations are retained semi-indefinitely in a long fixed-size deque (and retain a strong reference to all the forgotten TaskGroups).
Computations fall short of their intended goal:
- There are many edge cases where the concept of "continuous time where the cluster is doing something" falls apart and you have computations that should be intuitively obsolete but that still accrue work; this causes overlap which is hard to understand:
- Worker crash causes computations to overlap #7825
- Task prefix collision causes weird behaviour in Computations #7787
- It's been observed that there must be other use cases that have yet to be identified
- They can't support multi-tenancy: if two independent clients submit work at the same time, it will end up in the same computation
- They can't support nesting; as a Dask user I would like to have a fairly large grouping of tasks, which breaks down into subgroupings
I propose scrapping the Computation class and replace it with a Span class.
What follows is an early design draft and an invitation to discussion.
High level design
- A span is defined by the client using a context manager;
- The context manager can be nested;
- Each client automatically defines its own root span when it's initialised, setting it to its client_id
- If a client is created by
get_client()inside a task, it instead inherits the parent task's span.
Information can be retrieved from Spans by SchedulerPlugins, like it happens today for Computations. Built-in use of Spans (e.g. by the Bokeh dashboard) is out of scope for the time being.
Usage example
client = Client()
@dask.span("base_collection")
def base_collection():
a = da.random.random(...)
return a + 1
with dask.span("my_workflow"):
a = base_collection()
with dask.span("step2"):
b = a.sum()
c = b * 2
c.compute()Low level design
dask.span()
The new context manager dask.span is a simple wrapper around dask.annotate.
A new annotation span is a tuple of all span tags so far. dask.span appends to the end of it on enter and removes from the end on exit.
In the above example, layers will have the following annotations:
- random: {"span": (<client id>, "my_workflow", "base_collection")}
- add: {"span": (<client id>, "my_workflow", "base_collection")}
- sum: {"span": (<client id>, "my_workflow", "step2")}
- mul: {"span": (<client id>, "my_workflow")}
In the unhappy event that optimization strips away all task annotations, either the client or the scheduler will re-add the client id. This is a temporary hack - annotations should never be stripped to begin with.
Server side
Everything about spans can be encapsulated in a module that is separate from distributed.scheduler.
This module defines a new SchedulerPlugin, SpanPlugin, and a standalone class Span.
SpanPlugin contains the structures
SpanPlugin.spans: dict[tuple[str, ...], Span]SpanPlugin.span_search: dict[str, list[Span]]- this is a convenience facility to make search of span tags O(1)
Each Span class contains links to its direct children plus arbitrary metadata.
When update_graph() receives a snippet of source code, the SpanPlugin posts it to all leaf Spans that are attached to the tasks.
Unlike Computation objects, Spans don't link to TaskGroups. Instead, individual tasks can contribute to them through SpanPlugin.transition(). This allows recoding, e.g.:
- current task counts by prefix and state
- cumulative task counts by prefix
- when the span was first submitted by the client
- when the span was first received by the scheduler
- when the first task of the span was sent to a worker
- when the first task of the span started computing on a worker
- when the last task of the span finished computing on a worker
- when the last task of the span was forgotten by the scheduler
- Discuss: what other notable events should be posted to the Span objects?
These pieces of information should be each separated into
- an attribute, which records data from the span itself when it's directly mentioned by the
spanannotation by the tasks; - a property, which recursively aggregates the attribute of the span itself and the property of its children.
Fine performance metrics
On workers, execute fine performance metrics (#7665 (comment)) shall change
from {("execute", prefix, activity, unit): value}
to {("execute", span, prefix, activity, unit): value}
e.g. from {("execute", "random", "thread-cpu", "seconds"): 12.3}
to {("execute", (<client id>, "my_workflow", "base_collection"), "random", "thread-cpu", "seconds"): 12.3}
This won't substantially change the cardinality of the data, unless the workflows creates and destroys many clients for some weird reason.
When metrics are transferred from worker to scheduler through the heartbeat,
executemetrics are apportioned to their leaf-level scheduler.Span object- Nothing special happens to
gather_depandget_datametrics
A @property on the Span objects allows recursively aggregating on the fly the metrics from children and grandchildren.
Note: the heartbeat may arrive when all tasks that point to a Span have already been forgotten.
Lifecycle
Spans are modest in size and in most cases can just be retained indefinitely.
This is potentially problematic for long-lived servers which receive connections from many short-lived clients.
To solve this problem, we could define an expiration timeout for each Span, e.g. 6 hours, that starts ticking when the last task belonging to it is forgotten by the scheduler.
When a Span is forgotten, this should also clear all matching fine performance metrics from the scheduler and the workers.
Deprecation cycle
We would like to retire Computation objects.
Who uses them today? What kind of deprecation cycle do we need to adopt?
Implementation steps
(to be replaced with links to PRs / sub-issues)
- [dask/dask]
dask.spancontext manager - SpanPlugin and Span classes
- post notable events to Span
- clients to define a default span = (client_id, )
- update_graph to force
span=(client id, )if span annotations have been stripped by optimization - propagate span context to tasks (to be used by worker clients)
- Post
executefine performance metrics to spans - Deprecate Computation objects
- Delete spans that received no activity for more than X hours