Skip to content

Set the key kwarg when submitting to dask client for a richer dask UI #1218

Closed
@dojeda

Description

This issue comes from a discussion on the Slack channel.

I wanted to have a finer display of the tasks running on a particular flow that has about 19 tasks, with many of them as a .map.
Currently, the dask-ui shows all tasks on either a run_task or run_fn name, mixing all tasks together. I thought it would be useful to separate them according to task names.

A small proof of concept that does this would be to create a custom executor and use it instead of DaskExecutor when running the flow:

import uuid
from typing import Any, Callable, List

from distributed import Future, fire_and_forget, worker_client
from prefect import context
from prefect.engine.executors import DaskExecutor


class CustomDaskExecutor(DaskExecutor):

    def submit(self, fn: Callable, *args: Any, **kwargs: Any) -> Future:
        if 'key' not in kwargs and 'task' in kwargs:
            # approach 1: use the kwargs to extract the task class name
            kwargs['key'] = kwargs['task'].__class__.__name__ + '-' + str(uuid.uuid4())
        return super().submit(fn, *args, **kwargs)

    def map(self, fn: Callable, *args: Any) -> List[Future]:
        if not args:
            return []

        # approach 2: use the context full task name
        key = context.get('task_full_name', 'mapped')
        if self.is_started and hasattr(self, "client"):
            futures = self.client.map(fn, *args, key=key, pure=False)
        elif self.is_started:
            with worker_client(separate_thread=True) as client:
                futures = client.map(fn, *args, key=key, pure=False)
                return client.gather(futures)
        else:
            raise ValueError("This executor has not been started.")

        fire_and_forget(futures)
        return futures

This works, and exemplifies two approaches. The first one is to use the "task" kwargs to extract the task class name. This is possible on the submit function because the flowrunner fills this kwargs. However, it is not possible on the map function because this is called by the taskrunner and the original executor signature does not admit any kwargs. This is why there is a second approach, using the prefect context.

The result on the dask-ui looks like this

before
image

after
image

Both approaches are useful, but the problem remains that, in this example, I have to rewrite the map function instead of using the parent class implementation, as I am currently doing with the submit function.

Ideally, the DaskExecutor could implement this directly, removing the need for a custom executor (I don't have a particular need to customize it any further).

Activity

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions