Skip to content

User-defined spans #7860

@crusaderky

Description

@crusaderky

Dask users expect some sort of grouping of tasks that is more granular than the lifetime of a whole cluster but more coarse than TaskGroup. Currently, this is delivered through Computation objects, which (AFAIK) are used by third parties only (such as Coiled).

A Computation is defined as the union of all TaskGroups that were created since the cluster last became idle. Unlike TaskGroups, which are forgotten as soon as all of their tasks are forgotten, Computations are retained semi-indefinitely in a long fixed-size deque (and retain a strong reference to all the forgotten TaskGroups).

Computations fall short of their intended goal:

  • There are many edge cases where the concept of "continuous time where the cluster is doing something" falls apart and you have computations that should be intuitively obsolete but that still accrue work; this causes overlap which is hard to understand:
  • They can't support multi-tenancy: if two independent clients submit work at the same time, it will end up in the same computation
  • They can't support nesting; as a Dask user I would like to have a fairly large grouping of tasks, which breaks down into subgroupings

I propose scrapping the Computation class and replace it with a Span class.
What follows is an early design draft and an invitation to discussion.

High level design

  • A span is defined by the client using a context manager;
  • The context manager can be nested;
  • Each client automatically defines its own root span when it's initialised, setting it to its client_id
  • If a client is created by get_client() inside a task, it instead inherits the parent task's span.

Information can be retrieved from Spans by SchedulerPlugins, like it happens today for Computations. Built-in use of Spans (e.g. by the Bokeh dashboard) is out of scope for the time being.

Usage example

client = Client()

@dask.span("base_collection")
def base_collection():
    a = da.random.random(...)
    return a + 1

with dask.span("my_workflow"):
    a = base_collection()
    with dask.span("step2"):
        b = a.sum()
    c = b * 2

c.compute()

Low level design

dask.span()

The new context manager dask.span is a simple wrapper around dask.annotate.
A new annotation span is a tuple of all span tags so far. dask.span appends to the end of it on enter and removes from the end on exit.

In the above example, layers will have the following annotations:

  • random: {"span": (<client id>, "my_workflow", "base_collection")}
  • add: {"span": (<client id>, "my_workflow", "base_collection")}
  • sum: {"span": (<client id>, "my_workflow", "step2")}
  • mul: {"span": (<client id>, "my_workflow")}

In the unhappy event that optimization strips away all task annotations, either the client or the scheduler will re-add the client id. This is a temporary hack - annotations should never be stripped to begin with.

Server side

Everything about spans can be encapsulated in a module that is separate from distributed.scheduler.

This module defines a new SchedulerPlugin, SpanPlugin, and a standalone class Span.

SpanPlugin contains the structures

  • SpanPlugin.spans: dict[tuple[str, ...], Span]
  • SpanPlugin.span_search: dict[str, list[Span]] - this is a convenience facility to make search of span tags O(1)

Each Span class contains links to its direct children plus arbitrary metadata.

When update_graph() receives a snippet of source code, the SpanPlugin posts it to all leaf Spans that are attached to the tasks.

Unlike Computation objects, Spans don't link to TaskGroups. Instead, individual tasks can contribute to them through SpanPlugin.transition(). This allows recoding, e.g.:

  • current task counts by prefix and state
  • cumulative task counts by prefix
  • when the span was first submitted by the client
  • when the span was first received by the scheduler
  • when the first task of the span was sent to a worker
  • when the first task of the span started computing on a worker
  • when the last task of the span finished computing on a worker
  • when the last task of the span was forgotten by the scheduler
  • Discuss: what other notable events should be posted to the Span objects?

These pieces of information should be each separated into

  • an attribute, which records data from the span itself when it's directly mentioned by the span annotation by the tasks;
  • a property, which recursively aggregates the attribute of the span itself and the property of its children.

Fine performance metrics

On workers, execute fine performance metrics (#7665 (comment)) shall change

from {("execute", prefix, activity, unit): value}
to {("execute", span, prefix, activity, unit): value}

e.g. from {("execute", "random", "thread-cpu", "seconds"): 12.3}
to {("execute", (<client id>, "my_workflow", "base_collection"), "random", "thread-cpu", "seconds"): 12.3}

This won't substantially change the cardinality of the data, unless the workflows creates and destroys many clients for some weird reason.

When metrics are transferred from worker to scheduler through the heartbeat,

  • execute metrics are apportioned to their leaf-level scheduler.Span object
  • Nothing special happens to gather_dep and get_data metrics

A @property on the Span objects allows recursively aggregating on the fly the metrics from children and grandchildren.

Note: the heartbeat may arrive when all tasks that point to a Span have already been forgotten.

Lifecycle

Spans are modest in size and in most cases can just be retained indefinitely.
This is potentially problematic for long-lived servers which receive connections from many short-lived clients.

To solve this problem, we could define an expiration timeout for each Span, e.g. 6 hours, that starts ticking when the last task belonging to it is forgotten by the scheduler.
When a Span is forgotten, this should also clear all matching fine performance metrics from the scheduler and the workers.

Deprecation cycle

We would like to retire Computation objects.
Who uses them today? What kind of deprecation cycle do we need to adopt?

Implementation steps

(to be replaced with links to PRs / sub-issues)

  • [dask/dask] dask.span context manager
  • SpanPlugin and Span classes
  • post notable events to Span
  • clients to define a default span = (client_id, )
  • update_graph to force span=(client id, ) if span annotations have been stripped by optimization
  • propagate span context to tasks (to be used by worker clients)
  • Post execute fine performance metrics to spans
  • Deprecate Computation objects
  • Delete spans that received no activity for more than X hours

CC @fjetter @hendrikmakait @ntabris

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions