Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion airflow/providers/cncf/kubernetes/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ Breaking changes

Previously KubernetesPodOperator considered some settings from the Airflow config's ``kubernetes`` section. Such consideration was deprecated in 4.1.0 and is now removed. If you previously relied on the Airflow config, and you want client generation to have non-default configuration, you will need to define your configuration in an Airflow connection and set KPO to use the connection. See kubernetes provider documentation on defining a kubernetes Airflow connection for details.

Drop support for providing ``resource`` as dict in ``KubernetesPodOperator``. You should use ``container_resources`` with ``V1ResourceRequirements``.

Features
~~~~~~~~

Expand Down Expand Up @@ -97,6 +99,10 @@ Bug Fixes
* ``Revert "Fix await_container_completion condition (#23883)" (#24474)``
* ``Update providers to use functools compat for ''cached_property'' (#24582)``

Misc
~~~~
* ``Rename 'resources' arg in Kub op to k8s_resources (#24673)``

.. Below changes are excluded from the changelog. Move them to
appropriate section above if needed. Do not delete the lines(!):
* ``Only assert stuff for mypy when type checking (#24937)``
Expand All @@ -105,7 +111,6 @@ Bug Fixes
* ``Move provider dependencies to inside provider folders (#24672)``
* ``Use our yaml util in all providers (#24720)``
* ``Remove 'hook-class-names' from provider.yaml (#24702)``
* ``Rename 'resources' arg in Kub op to k8s_resources (#24673)``

4.1.0
.....
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,6 @@ def convert_volume_mount(volume_mount) -> k8s.V1VolumeMount:
return _convert_kube_model_object(volume_mount, k8s.V1VolumeMount)


def convert_resources(resources) -> k8s.V1ResourceRequirements:
"""
Converts an airflow Resources object into a k8s.V1ResourceRequirements

:param resources:
:return: k8s.V1ResourceRequirements
"""
if isinstance(resources, dict):
from airflow.providers.cncf.kubernetes.backcompat.pod import Resources
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also remove this module?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't find any more usage in the code to convert_resources so I removed it.
I think we should remove the whole backward compact module but it needs more close look on the other functions


resources = Resources(**resources)
return _convert_kube_model_object(resources, k8s.V1ResourceRequirements)


def convert_port(port) -> k8s.V1ContainerPort:
"""
Converts an airflow Port object into a k8s.V1ContainerPort
Expand Down
21 changes: 8 additions & 13 deletions airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
convert_image_pull_secrets,
convert_pod_runtime_info_env,
convert_port,
convert_resources,
convert_toleration,
convert_volume,
convert_volume_mount,
Expand Down Expand Up @@ -213,21 +212,17 @@ def __init__(
pod_runtime_info_envs: list[k8s.V1EnvVar] | None = None,
termination_grace_period: int | None = None,
configmaps: list[str] | None = None,
resources: dict[str, Any] | None = None,
**kwargs,
) -> None:

if isinstance(resources, k8s.V1ResourceRequirements):
warnings.warn(
# TODO: remove in provider 6.0.0 release. This is a mitigate step to advise users to switch to the
# container_resources parameter.
if isinstance(kwargs.get("resources"), k8s.V1ResourceRequirements):
raise AirflowException(
"Specifying resources for the launched pod with 'resources' is deprecated. "
"Use 'container_resources' instead.",
category=DeprecationWarning,
stacklevel=2,
"Use 'container_resources' instead."
)
container_resources = resources
resources = None

super().__init__(resources=resources, **kwargs)
super().__init__(**kwargs)
self.kubernetes_conn_id = kubernetes_conn_id
self.do_xcom_push = do_xcom_push
self.image = image
Expand Down Expand Up @@ -263,7 +258,7 @@ def __init__(
self.node_selector = {}
self.annotations = annotations or {}
self.affinity = convert_affinity(affinity) if affinity else {}
self.k8s_resources = convert_resources(container_resources) if container_resources else {}
self.container_resources = container_resources
self.config_file = config_file
self.image_pull_secrets = convert_image_pull_secrets(image_pull_secrets) if image_pull_secrets else []
self.service_account_name = service_account_name
Expand Down Expand Up @@ -553,7 +548,7 @@ def build_pod_request_obj(self, context: Context | None = None) -> k8s.V1Pod:
command=self.cmds,
ports=self.ports,
image_pull_policy=self.image_pull_policy,
resources=self.k8s_resources,
resources=self.container_resources,
volume_mounts=self.volume_mounts,
args=self.arguments,
env=self.env_vars,
Expand Down
24 changes: 23 additions & 1 deletion kubernetes_tests/test_kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ def setUp(self):
"command": ["bash", "-cx"],
"env": [],
"envFrom": [],
"resources": {},
"name": "base",
"ports": [],
"volumeMounts": [],
Expand Down Expand Up @@ -1150,3 +1149,26 @@ def get_op():
with pytest.raises(AirflowException):
k.execute(context)
create_mock.assert_called_once()

def test_using_resources(self):
exception_message = (
"Specifying resources for the launched pod with 'resources' is deprecated. "
"Use 'container_resources' instead."
)
with pytest.raises(AirflowException, match=exception_message):
resources = k8s.V1ResourceRequirements(
requests={"memory": "64Mi", "cpu": "250m", "ephemeral-storage": "1Gi"},
limits={"memory": "64Mi", "cpu": 0.25, "nvidia.com/gpu": None, "ephemeral-storage": "2Gi"},
)
KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
labels=self.labels,
name="test-" + str(random.randint(0, 1000000)),
task_id="task" + self.get_current_task_name(),
in_cluster=False,
do_xcom_push=False,
resources=resources,
)
31 changes: 0 additions & 31 deletions kubernetes_tests/test_kubernetes_pod_operator_backcompat.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ def setUp(self):
"command": ["bash", "-cx"],
"env": [],
"envFrom": [],
"resources": {},
"name": "base",
"ports": [],
"volumeMounts": [],
Expand Down Expand Up @@ -194,36 +193,6 @@ def test_pod_node_selectors(self):
self.expected_pod["spec"]["nodeSelector"] = node_selectors
assert self.expected_pod == actual_pod

def test_pod_resources(self):
resources = {
"limit_cpu": 0.25,
"limit_memory": "64Mi",
"limit_ephemeral_storage": "2Gi",
"request_cpu": "250m",
"request_memory": "64Mi",
"request_ephemeral_storage": "1Gi",
}
k = KubernetesPodOperator(
namespace="default",
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
labels={"foo": "bar"},
name="test",
task_id="task",
in_cluster=False,
do_xcom_push=False,
container_resources=resources,
)
context = create_context(k)
k.execute(context)
actual_pod = self.api_client.sanitize_for_serialization(k.pod)
self.expected_pod["spec"]["containers"][0]["resources"] = {
"requests": {"memory": "64Mi", "cpu": "250m", "ephemeral-storage": "1Gi"},
"limits": {"memory": "64Mi", "cpu": 0.25, "ephemeral-storage": "2Gi"},
}
assert self.expected_pod == actual_pod

def test_pod_affinity(self):
affinity = {
"nodeAffinity": {
Expand Down