Skip to content

Conversation

@gjoseph92
Copy link
Collaborator

An idea for an alternative architecture for #7586. See #7586 (comment) for motivation.

Core ideas:

  • Rip off the OTel tracing API with a recursive Span data structure that forms a tree of spans. This could hopefully be swapped for the OTel API itself in the future.
  • Spans are associated with asyncio Tasks via contextvars. We call start on the span before launching the task (so we can track event loop overhead)
  • State machine events have a span field (like stimulus_id, as @fjetter said). Async callbacks like execute explicitly fill in the span filed on the Event they return.
  • When the ExecuteSuccessEvent, etc is processed, we call stop on the span (again, so we can track event loop overhead)
  • span.flat() flattens the span's tree, giving us a simple breakdown of time
  • Convert span.flat() -> DigestEvent to digest the metrics via the normal Instructions infrastructure

In pseudocode (referring to @crusaderky's comment on #7586 that I now can't find, maybe deleted?):

def handle_stimulus(*stims):
    instructions = self.state.handle_stimulus(*stims)

    for inst in instructions:
        if isinstance(inst, GatherDep):
            span = get_span("gather-dep")
            span.start()
            with span.as_current():
                task = asyncio.create_task(
                    self.gather_dep(
                        inst.worker,
                        ...
                        span=span,
                    ),
                )
        elif isinstance(inst, Execute):
            span = get_span(("execute", key_split(inst.key)))
            span.start()
            with span.as_current():
                task = asyncio.create_task(
                    self.execute(
                        inst.key,
                        ...
                        span=span,
                    ),
                )


def _handle_execute_success(self, ev: ExecuteSuccessEvent) -> RecsInstrs:
    ...
    instructions.extend(
        convert_span_to_digest_metric_instrs(ev.span)
    )
    # ->
    # [
    #     DigestMetric(("execute", "task-prefix", "deserialize"), 0.5),
    #     DigestMetric(("execute", "task-prefix", "disk-read"), 2.0),
    #     DigestMetric(("execute", "task-prefix", "cpu-thread"), 4.0),
    #     DigestMetric(("execute", "task-prefix", "cpu"), 1.0),
    #     ^ non-thread CPU time (GIL?)
    #     DigestMetric(("execute", "task-prefix"), 1.0),
    #     ^ this last one is "overhead", aka event loop.
    #     the difference between start and stop of the overall span,
    #     and total time in sub-spans.
    # ]
    return recs, instructions


async def execute(..., span):
    ...
    await run_in_executor(
        apply_function_simple
    )
    ...
    return ExecuteSuccessEvent(span=span)


async def apply_function_simple(...):
    with meter("cpu"):
        func()

cc @hendrikmakait @fjetter @crusaderky

self.fast_metrics.log_read(nbytes)
with meter("memory-read"):
nbytes = cast(int, self.fast.weights[key])
self.fast_metrics.log_read(nbytes)
Copy link
Collaborator

@crusaderky crusaderky Mar 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fast_metrics / slow_metrics is a big chunk of ad-hoc code that I could remove in #7586. The key feature I leveraged (which I don't think is possible in this design) was being able to capture the metrics multiple times.

In other words: if, downstream of this PR, I want to have a holistic view of all activity of the SpillBuffer, I need to

  1. I must make 100% sure that I'm doing something with the metrics after all points of access to the SpillBuffer. I suppose we could add a assert_root: bool=False parameter to meter() to make this easier.
    For example, in Fine performance metrics for execute, gather_dep, etc. #7586 SpillBuffer.cumulative_metrics (which is conveniently posted to the worker's Prometheus API) I am also capturing scatter activity, while I don't bother in the worker state machine.
  2. I must perform a full scan of all elements in Worker.digests_total, search in the tuples for the keywords relevant to the SpillBuffer, and extrapolate from there. This is assuming that all components that access the SpillBuffer (WorkerStateMachine, WorkerMemoryManager, Worker.get_data) all post the metrics in the same way to Worker.digests_total.

Both of the above are feasible - but also quite unconvenient.

Another issue here is that this design has no support for non-time metrics. with meter("memory-read"): doesn't do anything above (reading from SpillBuffer.fast takes nanoseconds). #7586 uses the memory-read tag to post bytes and count metrics to Worker.digests_total, which can then in turn be used to calculate cache hit ratios. In this design, you were forced to keep a secondary ad-hoc metric storage system (SpillBuffer.fast_metrics / SpillBuffer.slow.metrics).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fast_metrics / slow_metrics is a big chunk of ad-hoc code that I could remove in #7586

I don't personally mind leaving these existing ad-hoc metrics in place. As I said, t1 - t0 is extremely cheap. If it was a choice between two systems that were each easier to understand, versus one that did everything but was harder to understand, my preference would be for the two simpler ones.

For SpillBuffer specifically (I haven't looked closely), aren't fast_metrics/slow_metrics basically just an aggregation of the fine-grained spans we'd be collecting? We'd be tracing read/write times/counts/bytes broken out by activity and key prefix. fast_metrics/slow_metrics to me look like that same data, just summed over activity and prefix.

Intuitively to me, if we want coarser metrics that are just an aggregation of finer ones, that seems pretty tractable. I haven't thought about exactly how we'd implement it, but either doing an aggregation during span processing, or just letting the metrics system (Prometheus) do the sum, seems reasonable.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's a way we can get aggregated metrics, like we're currently doing with SpillBuffer. I haven't actually removed the fast/slow metrics yet, but I imagine this would allow us to: 8f9fafe

Basically

  1. spans can hold arbitrary metadata, like otel
  2. we set the metadata aggregate=True on spans that we want aggregate metrics from
  3. our span-processing logic on the worker knows that when a span has aggregate=True, it exports a metric both for the fully-qualified name ("transition", "x", "released->memory", "disk-write") and the non-qualified name disk-write.
  4. Now we have both a cumulate disk-write metric across all tasks and operations, as well as disk-write broken down by task and operation. This generalizes to anything else we might want to trace.

@crusaderky
Copy link
Collaborator

One of the biggest critiques to #7586 was that there are too many .add_callback() wrappers around the code.
I don't think this PR makes it any better - it just does less.

@crusaderky
Copy link
Collaborator

Ultimately, #7586 in the current incarnation says:

If you want to meter anything, you need at top level a .add_callback() context manager to tell it what to do when it acquires the metrics. There's also an optional system, which builds on top of it, where metrics are recorded in a temporary local list so that you can post-process them later on instead of immediately publishing them.

Whereas this PR says:

All metrics are recorded in a local list. After you finish recording them, you must do something with this list.

Which, to me, feels a bit of a chicken-and-egg situation.

@gjoseph92
Copy link
Collaborator Author

Another issue here is that this design has no support for non-time metrics

Sorry, I didn't add this because this PR already went far further into implementation than I'd intended. But this is pretty straightforward, borrowing the idea of attributes on spans. You just add non-time metrics (like disk bytes read) as attributes on spans. Then in to_digest_metrics, you aggregate them and convert them to DigestMetrics, just like with the spans.

it just does less

To be clear this PR is just meant to be an illustration of an architecture. Not everything from #7586 is traced here. This is meant to show just enough to get a feel for the API and confirm it actually works.

@github-actions
Copy link
Contributor

github-actions bot commented Mar 8, 2023

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       26 files  +       1         26 suites  +1   14h 55m 25s ⏱️ + 3h 27m 17s
  3 502 tests +     11    3 156 ✔️  -    229     104 💤 +  1     163 +   160       79 🔥 +     79 
44 266 runs  +2 300  38 306 ✔️  - 1 662  2 078 💤 +84  2 855 +2 851  1 027 🔥 +1 027 

For more details on these failures and errors, see this check.

Results for commit c0fe05f. ± Comparison against base commit 310fc95.

This pull request skips 1 test.
distributed.tests.test_worker_client ‑ test_submit_different_names

@gjoseph92
Copy link
Collaborator Author

I'm hoping having an explicit Span data structure like this might also give us a nice way to handle #7601:

  1. Keep track of all currently-running spans (global contextvar, etc.). Probably just currently-running root spans, but generally have a way to get "what spans that we care about are active right now"
  2. On every metrics scrape, ingest the elapsed-so-far time of the currently-active spans, then emit metrics as usual
  3. Do something to ensure we don't double-count time when the span ends—move start up to now? I'm not decided on the details of how we'd represent this, but it's pretty straightforward.
  4. When in-progress spans actually end, ingest it as usual—any time we've already ingested due to a metrics scrape is not double-counted.

Mostly I think having the data structure of spans (and an automatic way to keep track of the currently-active ones) is what makes this a pretty simple change.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants