Skip to content

Conversation

@nitinmuteja
Copy link

@nitinmuteja nitinmuteja commented May 6, 2022

closes: #23529


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragement file, named {pr_number}.significant.rst, in newsfragments.

@nitinmuteja nitinmuteja requested a review from jedcunningham as a code owner May 6, 2022 13:50
@boring-cyborg boring-cyborg bot added provider:cncf-kubernetes Kubernetes (k8s) provider related issues area:providers labels May 6, 2022
@boring-cyborg
Copy link

boring-cyborg bot commented May 6, 2022

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/main/CONTRIBUTING.rst)
Here are some useful points:

  • Pay attention to the quality of your code (flake8, mypy and type annotations). Our pre-commits will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it’s a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

@nitinmuteja
Copy link
Author

Added test for the change.

@potiuk
Copy link
Member

potiuk commented May 9, 2022

Cool. Running tests and hopefully will include it in the next provider's release.

@eladkal
Copy link
Contributor

eladkal commented May 9, 2022

How can we template resources? Its not a string. Its a list object of V1ResourceRequirements.

Maybe im missing something here but I would love to see an actual test that template this field.

@eladkal
Copy link
Contributor

eladkal commented May 9, 2022

Do we mean to something like?

    resources = [
        V1ResourceRequirements(
            requests={"cpu": "{{ dag_run.conf[\'request_cpu\'] }}", "memory": "8G"},
            limits={"cpu": "16000m", "memory": "128G"},
        )
    ]

@nitinmuteja
Copy link
Author

@eladkal yes. This is the expected behaviour.

@talnagar
Copy link
Contributor

I tried locally with kind-cluster and the templating didn't take. @nitinmuteja can you supply a usage example for it?
this is what I tried using the example_kubernetes_executor.py from the example_dags in the repo:

....
  with DAG(
        dag_id='example_kubernetes_executor',
        schedule_interval=None,
        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
        catchup=False,
        tags=['example3'],
        params={
            'mem': Param(default='512Mi',type='string', description="memory"),
        }
....
 # Use k8s_client.V1ResourceRequirements to define resource limits
        k8s_resource_requirements = k8s.V1ResourceRequirements(
            requests={'memory': "{{ params['mem'] }}"}, limits={'memory': '512Mi'}
        )

@eladkal
Copy link
Contributor

eladkal commented May 10, 2022

@nitinmuteja It would be great if you can add a test that verify the templating similar to:

def test_templates(self, _, create_task_instance_of_operator):
dag_id = 'TestGoogleCloudStorageToGoogleCloudStorageTransferOperator_test_templates'
ti = create_task_instance_of_operator(
CloudDataTransferServiceGCSToGCSOperator,
dag_id=dag_id,
source_bucket='{{ dag.dag_id }}',
destination_bucket='{{ dag.dag_id }}',
description='{{ dag.dag_id }}',
object_conditions={'exclude_prefixes': ['{{ dag.dag_id }}']},
gcp_conn_id='{{ dag.dag_id }}',
task_id=TASK_ID,
)
ti.render_templates()
assert dag_id == ti.task.source_bucket
assert dag_id == ti.task.destination_bucket
assert dag_id == ti.task.description
assert dag_id == ti.task.object_conditions['exclude_prefixes'][0]
assert dag_id == ti.task.gcp_conn_id

@potiuk
Copy link
Member

potiuk commented May 10, 2022

@nitinmuteja -> last call before preparing newr round of providers - do you have time/capability to add the test as @eladkal asked?

@nitinmuteja
Copy link
Author

I can add today.

@potiuk
Copy link
Member

potiuk commented May 11, 2022

Cool. I can hold on then :)

@nitinmuteja
Copy link
Author

@talnagar I have added test cases for the requirements.

@nitinmuteja
Copy link
Author

@potiuk please check the changes. Created a subclass from the k8s class and added class attributes to make limits and requests as template attributes.

Copy link
Member

@jedcunningham jedcunningham left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really on board with having our own V1ResourceRequirements - we intentionally moved away from this pattern for a reason.

I haven't tried it, but would this pattern work instead (making this a documentation issue)?

resources = k8s.V1ResourceRequirements(requests={'memory': '100Mi'})
resources.template_fields = ('requests')

KPO(
...,
resources=resources
)

@eladkal
Copy link
Contributor

eladkal commented May 11, 2022

I'm not really on board with having our own V1ResourceRequirements - we intentionally moved away from this pattern for a reason.

I agree

@potiuk
Copy link
Member

potiuk commented May 11, 2022

Agree - this one will wait move to next round of providers.

@nitinmuteja
Copy link
Author

I'm not really on board with having our own V1ResourceRequirements - we intentionally moved away from this pattern for a reason.

@jedcunningham we are just inheriting the object and will continue using the object. The only advantage it provides us with is that the props can be templated.

@nitinmuteja
Copy link
Author

nitinmuteja commented May 12, 2022

I'm not really on board with having our own V1ResourceRequirements - we intentionally moved away from this pattern for a reason.

I haven't tried it, but would this pattern work instead (making this a documentation issue)?

resources = k8s.V1ResourceRequirements(requests={'memory': '100Mi'})
resources.template_fields = ('requests')

KPO(
...,
resources=resources
)

This one does not work as the external V1ResourceRequirements does not have template_fields class attribute. I can only think of Inheritance as a stable solution for this. Let me know if you folks can think of another solution.

@nitinmuteja nitinmuteja requested a review from jedcunningham May 12, 2022 05:26
@lior1990
Copy link

Suggestion: you can modify _render_nested_template_fields in KubernetesPodOperator and add the following code, right after the custom code that is handling k8s.V1EnvVar:

if id(content) not in seen_oids and isinstance(content, k8s.V1ResourceRequirements):
    seen_oids.add(id(content))
    self._do_render_template_fields(content, ("limits", "requests"), context, jinja_env, seen_oids)
    return

@akakakakakaa
Copy link
Contributor

akakakakakaa commented May 23, 2022

In addition to previously mentioned by @lior1990,

class CustomKubernetesPodOperator(KubernetesPodOperator):
    template_fields: Sequence[str] = KubernetesPodOperator.template_fields + ('k8s_resources', )

    def _render_nested_template_fields(
        self,
        content: Any,
        context: 'Context',
        jinja_env: "jinja2.Environment",
        seen_oids: set,
    ) -> None:
        if id(content) not in seen_oids and isinstance(content, k8s.V1ResourceRequirements):
            seen_oids.add(id(content))
            self._do_render_template_fields(content, ("limits", "requests"), context, jinja_env, seen_oids)
            return

        super()._render_nested_template_fields(content, context, jinja_env, seen_oids)

In KubernetesPodOperator, resources allocated to k8s_resources

self.k8s_resources = convert_resources(resources) if resources else {}

Instead of resources, k8s_resources should be added to template_fields.

@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Jul 26, 2022
@github-actions github-actions bot closed this Aug 1, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers provider:cncf-kubernetes Kubernetes (k8s) provider related issues stale Stale PRs per the .github/workflows/stale.yml policy file

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Provide resources attribute in KubernetesPodOperator to be templated

7 participants