Description
This issue comes from a discussion on the Slack channel.
I wanted to have a finer display of the tasks running on a particular flow that has about 19 tasks, with many of them as a .map
.
Currently, the dask-ui shows all tasks on either a run_task or run_fn name, mixing all tasks together. I thought it would be useful to separate them according to task names.
A small proof of concept that does this would be to create a custom executor and use it instead of DaskExecutor when running the flow:
import uuid
from typing import Any, Callable, List
from distributed import Future, fire_and_forget, worker_client
from prefect import context
from prefect.engine.executors import DaskExecutor
class CustomDaskExecutor(DaskExecutor):
def submit(self, fn: Callable, *args: Any, **kwargs: Any) -> Future:
if 'key' not in kwargs and 'task' in kwargs:
# approach 1: use the kwargs to extract the task class name
kwargs['key'] = kwargs['task'].__class__.__name__ + '-' + str(uuid.uuid4())
return super().submit(fn, *args, **kwargs)
def map(self, fn: Callable, *args: Any) -> List[Future]:
if not args:
return []
# approach 2: use the context full task name
key = context.get('task_full_name', 'mapped')
if self.is_started and hasattr(self, "client"):
futures = self.client.map(fn, *args, key=key, pure=False)
elif self.is_started:
with worker_client(separate_thread=True) as client:
futures = client.map(fn, *args, key=key, pure=False)
return client.gather(futures)
else:
raise ValueError("This executor has not been started.")
fire_and_forget(futures)
return futures
This works, and exemplifies two approaches. The first one is to use the "task" kwargs to extract the task class name. This is possible on the submit
function because the flowrunner fills this kwargs. However, it is not possible on the map
function because this is called by the taskrunner and the original executor signature does not admit any kwargs. This is why there is a second approach, using the prefect context.
The result on the dask-ui looks like this
Both approaches are useful, but the problem remains that, in this example, I have to rewrite the map
function instead of using the parent class implementation, as I am currently doing with the submit
function.
Ideally, the DaskExecutor could implement this directly, removing the need for a custom executor (I don't have a particular need to customize it any further).
Activity