Skip to content

Allow @task.kubernetes to derive pod name based on decorated function (or more flexibility to override the behaviour) #44779

@wasabigeek

Description

@wasabigeek

Description

I'd like for @task.kubernetes to derive the task_id / pod_name based on the decorated function's name. To achieve this now, I would need to pass it into the decorator args, which is a bit repetitive:

@task.kubernetes(task_id="some_func", name=f"{some_prefix}_some_func")
def some_func():
    pass

We managed to do something like the above but it was more involved than expected:

# inheriting from a private class, not ideal
class DefaultKubernetesDecoratedOperator(_KubernetesDecoratedOperator):
    custom_operator_name = "@default_kubernetes_task" # the operator removes the decorator so this is required

    def __init__(self, **kwargs: Any) -> None:
        if "task_id" not in kwargs and "python_callable" in kwargs:
            task_id = kwargs["python_callable"].__name__
            kwargs["task_id"] = task_id

        if "task_id" in kwargs and "name" not in kwargs:
            name = f"{get_prefix()}-{kwargs['task_id']}"
            kwargs["name"] = name

        super().__init__(**kwargs)


def default_kubernetes_task(
    python_callable: Callable[[], None] | None = None,
    multiple_outputs: bool | None = None,
    **kwargs: Any,
) -> TaskDecorator:
    return task_decorator_factory(
        python_callable=python_callable,
        multiple_outputs=multiple_outputs,
        decorated_operator_class=DefaultKubernetesDecoratedOperator,
        **kwargs,
    )

Initially I tried wrapping task.kubernetes itself, but the source code scrubbing relies on a hardcoded decorator name, so it doesn't scrub the decorator and the pod fails with NameError: name 'default_kubernetes_task' is not defined:

def default_kubernetes_task(
    **task_kwargs: Any,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
    def decorator(func: Callable[..., Any]) -> Any:
        if "task_id" not in task_kwargs:
            task_kwargs["task_id"] = func.__name__

        if "name" not in task_kwargs:
            name = f"{get_[prefix()}-{task_kwargs['task_id']}"
            task_kwargs["name"] = name

        return task.kubernetes(**task_kwargs)(func)

    return decorator

It'd be nice if there were a more official way to achieve the above (or something similar). Perhaps:

  • @task.kubernetes could accept an override for the decorated_operator_class?
  • a list of custom operator names to remove could be set in the configuration?

Use case/motivation

I'd like for @task.kubernetes to derive the task_id / pod_name based on the decorated function's name, but more generally it could be useful to allow task.kubernetes to be overridden?

Related issues

No response

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions