Skip to content

Conversation

@subkanthi
Copy link
Contributor

@subkanthi subkanthi commented Nov 21, 2021

LocalKubernetesExecutor similar to CeleryKubernetesExecutor.
Tested by running webserver, scheduler and connecting to minikube k8s cluster.
Note: This PR does not update the helm charts.

closes: #16185


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

@potiuk
Copy link
Member

potiuk commented Nov 21, 2021

Nice one. I think LocalK8S is quite a useful executor to have :).

@ashb
Copy link
Member

ashb commented Nov 22, 2021

I'm a bit worried about the possible combinations that we run in to if we add this. Is it maybe time to resurrect your multi-executor @potiuk?

@potiuk
Copy link
Member

potiuk commented Nov 22, 2021

I'm a bit worried about the possible combinations that we run in to if we add this. Is it maybe time to resurrect your multi-executor @potiuk?

I think the benefits of any common code for "multi" executor are not as big - I still think having a few dedicated, specialized executors that you can use straight away and they are well tested and supported by Helm Chart outweitght the "generic" solution.

There are really few combinations that makes sense here - like you woudn't want to support Sequential + Kubernetes or Local + Celery, I think the only "reasonable" combination is <x>Kubernetes and we basicallly exhaust all of the combinations already with this one.

The fact that nobody proposed it for about a year since we had CeleryKubernetes, makes it likely that we have about a year again when someoene wants another combination (if at all). I think it's not worth to do generic one in this case.

@subkanthi subkanthi requested a review from potiuk as a code owner November 22, 2021 18:36
@BasPH
Copy link
Contributor

BasPH commented Nov 25, 2021

I'm a bit worried about the possible combinations that we run in to if we add this. Is it maybe time to resurrect your multi-executor @potiuk?

To avoid this situation, what do you think of configuring multiple executors, and have an argument on the BaseOperator to select your executor of choice? The selection of executor with these "multi"-executors via the queue argument feels somewhat implicit to me + you avoid ending up with X+Y+Z-executors in the future.

@potiuk
Copy link
Member

potiuk commented Nov 25, 2021

To avoid this situation, what do you think of configuring multiple executors, and have an argument on the BaseOperator to select your executor of choice? The selection of executor with these "multi"-executors via the queue argument feels somewhat implicit to me + you avoid ending up with X+Y+Z-executors in the future.

Yep. Might be a good idea to do it explicitly.

@subkanthi
Copy link
Contributor Author

subkanthi commented Nov 26, 2021

To avoid this situation, what do you think of configuring multiple executors, and have an argument on the BaseOperator to select your executor of choice? The selection of executor with these "multi"-executors via the queue argument feels somewhat implicit to me + you avoid ending up with X+Y+Z-executors in the future.

Yep. Might be a good idea to do it explicitly.

@potiuk , Thoughts, does it make sense to continue with this, I wanted to add Helm charts. or if the direction is to work on the multiple executors(I could work there too), Please let me know.

@potiuk potiuk reopened this Jan 24, 2022
@potiuk
Copy link
Member

potiuk commented Jan 24, 2022

This one needs rebase!

@github-actions github-actions bot removed the stale Stale PRs per the .github/workflows/stale.yml policy file label Jan 24, 2022
@subkanthi
Copy link
Contributor Author

This one needs rebase!

Please let me know if there is anything else outstanding for this PR.

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lookks good. Any other approval?

@subkanthi
Copy link
Contributor Author

This lookks good. Any other approval?

Can this be merged, it's all green now, thanks

@potiuk
Copy link
Member

potiuk commented Feb 17, 2022

One comment - before merge (and @subkanthi - maybe you can take a closer look while you are adding it).

There is a bug with CeleryKubernetesExecutor reported recently #21548 which might also affect LocalKubernetesExecutor. Would it be possible that you take a look @subkanthi - and maybe you will be able to fix it as general case ? And maybe also take a look if there are other places where we have "hard-coded" Executor names

This was one thing that I was afraid of when we discussed "Generic Combined Excecutor" - seems that there might be at least this one place pointed out in #21548 where we implemented logic based on the type of executor configured and about the executor actually 'used" when the task was executed.

I think that LocalKubernetesExecutor might suffer the same problem actually - because file_task_handler was not touched by this PR.

Copy link
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Waiting for response from @subkanthi on the potential bug.

@arkadiusz-bach
Copy link

You will probably have the same issue as the one that I described here: #21225 (comment)

It is causing a lot of pain on CeleryKubernetesExecutor, when for example tasks are waiting for celery to autoscale, because KubernetesExecutor is restarting all of the tasks that are in queued state to scheduled, not only those that has kubernetes qeueue assigned.

@subkanthi
Copy link
Contributor Author

subkanthi commented Feb 18, 2022

Waiting for response from @subkanthi on the potential bug.

Im actually stuck with another issue, Im pretty sure it used to work before, For some reason the k8s_object is str and it ends up in the else block

       k8s_object = obj.get("pod_override", None)

        if k8s_legacy_object and k8s_object:
            raise AirflowConfigException(
                "Can not have both a legacy and new"
                "executor_config object. Please delete the KubernetesExecutor"
                "dict and only use the pod_override kubernetes.client.models.V1Pod"
                "object."
            )
        if not k8s_object and not k8s_legacy_object:
            return None

        print("TYPE" + str(type(k8s_object)))
        if isinstance(k8s_object, k8s.V1Pod):
            return k8s_object
        elif isinstance(k8s_legacy_object, dict):
            warnings.warn(
                'Using a dictionary for the executor_config is deprecated and will soon be removed.'
                'please use a `kubernetes.client.models.V1Pod` class with a "pod_override" key'
                ' instead. ',
                category=DeprecationWarning,
            )
            return PodGenerator.from_legacy_obj(obj)
        else:

            raise TypeError(
                'Cannot convert a non-kubernetes.client.models.V1Pod object into a KubernetesExecutorConfig'
            )
    kube_exec_config_special = {
        "pod_override": k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name="base", image="python"
                    ),
                ]
            )
        )
    }
    @task(
        executor_config=kube_exec_config_special,
        queue='kubernetes',
        task_id='task_with_kubernetes_executor',
    )
    def task_with_template():
        print_stuff()

@subkanthi
Copy link
Contributor Author

Waiting for response from @subkanthi on the potential bug.

Im actually stuck with another issue, Im pretty sure it used to work before, For some reason the k8s_object is str and it ends up in the else block

       k8s_object = obj.get("pod_override", None)

        if k8s_legacy_object and k8s_object:
            raise AirflowConfigException(
                "Can not have both a legacy and new"
                "executor_config object. Please delete the KubernetesExecutor"
                "dict and only use the pod_override kubernetes.client.models.V1Pod"
                "object."
            )
        if not k8s_object and not k8s_legacy_object:
            return None

        print("TYPE" + str(type(k8s_object)))
        if isinstance(k8s_object, k8s.V1Pod):
            return k8s_object
        elif isinstance(k8s_legacy_object, dict):
            warnings.warn(
                'Using a dictionary for the executor_config is deprecated and will soon be removed.'
                'please use a `kubernetes.client.models.V1Pod` class with a "pod_override" key'
                ' instead. ',
                category=DeprecationWarning,
            )
            return PodGenerator.from_legacy_obj(obj)
        else:

            raise TypeError(
                'Cannot convert a non-kubernetes.client.models.V1Pod object into a KubernetesExecutorConfig'
            )
    kube_exec_config_special = {
        "pod_override": k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name="base", image="python"
                    ),
                ]
            )
        )
    }
    @task(
        executor_config=kube_exec_config_special,
        queue='kubernetes',
        task_id='task_with_kubernetes_executor',
    )
    def task_with_template():
        print_stuff()

The problem seems to be that for some reason executor_config seems to get unpicked as str. Im trying to figure out the right combination on when it happens.

@potiuk
Copy link
Member

potiuk commented Feb 26, 2022

The problem seems to be that for some reason executor_config seems to get unpicked as str. Im trying to figure out the right combination on when it happens.

Let me know when to re-review it :)

@subkanthi
Copy link
Contributor Author

#21548

@potiuk
Im not sure how to reproduce the bug with this line
elif conf.get('core', 'executor') == 'KubernetesExecutor':
I cant find the workflow or configuration where FileTaskHandler is used directly, I tried elasticsearch remote logging and with the _read function overridden, it doesnt call the _read function in FileTaskHandler.

It could be either cloudwatch or s3, but I see _read function overridden in both those task handlers.

@potiuk
Copy link
Member

potiuk commented Mar 5, 2022

I think we should be good to go for that one then.

@potiuk potiuk merged commit 652b859 into apache:main Mar 5, 2022
@hendoxc
Copy link

hendoxc commented Mar 12, 2022

Hey, whats the timeline for this to be added to the helm chart, I'd like to test this out right away.

@potiuk
Copy link
Member

potiuk commented Mar 12, 2022

Hey, whats the timeline for this to be added to the helm chart, I'd like to test this out right away.

Maybe you can try update it in the chart and provide it as a PR back? That would be great contribution back to the community for the free software and you can become one of the (almost) 2000 contributors then.

@potiuk
Copy link
Member

potiuk commented Mar 12, 2022

Hey, whats the timeline for this to be added to the helm chart, I'd like to test this out right away.

Maybe you can try update it in the chart and provide it as a PR back? That would be great contribution back to the community for the free software and you can become one of the (almost) 2000 contributors then.

Note that this executor will only berelease in 2.3.0 - development only, so this makes perfect sense to add it soon in main BTW. PR with it are most welcome

@subkanthi
Copy link
Contributor Author

I can work on this in a week from now.

@ephraimbuddy ephraimbuddy added the type:new-feature Changelog: New Features label Apr 8, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

full tests needed We need to run full set of tests for this PR to merge type:new-feature Changelog: New Features

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Combined Executor - KubernetesExecutor + LocalExecutor