Skip to content

Commit

Permalink
Use "remote" pod when patching KPO pod as "checked" (#23676)
Browse files Browse the repository at this point in the history
When patching as "checked", we have to use the current version of the pod otherwise we may get an error when trying to patch it, e.g.:

```
Operation cannot be fulfilled on pods \"test-kubernetes-pod-db9eedb7885c40099dd40cd4edc62415\": the object has been modified; please apply your changes to the latest version and try again"
```

This error would not cause a failure of the task, since errors in `cleanup` are suppressed.  However, it would fail to patch.

I believe one scenario when the pod may be updated is when retrieving xcom, since the sidecar is terminated after extracting the value.

Concerning some changes in the tests re the "already_checked" label, it was added to a few "expected pods" recently, when we changed it to patch even in the case of a successful pod.

Since we are changing the "patch" code to patch with the latest read on the pod that we have (i.e. using the `remote_pod` variable), and no longer the pod object stored on `k.pod`, the label no longer shows up in those tests (that's because in k.pod isn't actually a read of the remote pod, but just happens to get mutated in the patch function before it is used to actually patch the pod).

Further, since the `remote_pod` is a local variable, we can't observe it in tests.  So we have to read the pod using k8s api. _But_, our "find pod" function excludes "already checked" pods!  So we have to make this configurable.

So, now we have a proper integration test for the "already_checked" behavior (there was already a unit test).
  • Loading branch information
dstandish authored May 26, 2022
1 parent ef3f530 commit 6bbe015
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 9 deletions.
14 changes: 9 additions & 5 deletions airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,9 @@ def client(self) -> CoreV1Api:
kwargs.update(in_cluster=self.in_cluster)
return kube_client.get_kube_client(**kwargs)

def find_pod(self, namespace, context) -> Optional[k8s.V1Pod]:
def find_pod(self, namespace, context, *, exclude_checked=True) -> Optional[k8s.V1Pod]:
"""Returns an already-running pod for this task instance if one exists."""
label_selector = self._build_find_pod_label_selector(context)
label_selector = self._build_find_pod_label_selector(context, exclude_checked=exclude_checked)
pod_list = self.client.list_namespaced_pod(
namespace=namespace,
label_selector=label_selector,
Expand Down Expand Up @@ -412,7 +412,7 @@ def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):
pod_phase = remote_pod.status.phase if hasattr(remote_pod, 'status') else None
if not self.is_delete_operator_pod:
with _suppress(Exception):
self.patch_already_checked(pod)
self.patch_already_checked(remote_pod)
if pod_phase != PodPhase.SUCCEEDED:
if self.log_events_on_failure:
with _suppress(Exception):
Expand All @@ -436,10 +436,14 @@ def process_pod_deletion(self, pod):
else:
self.log.info("skipping deleting pod: %s", pod.metadata.name)

def _build_find_pod_label_selector(self, context: Optional[dict] = None) -> str:
def _build_find_pod_label_selector(self, context: Optional[dict] = None, *, exclude_checked=True) -> str:
labels = self._get_ti_pod_labels(context, include_try_number=False)
label_strings = [f'{label_id}={label}' for label_id, label in sorted(labels.items())]
return ','.join(label_strings) + f',{self.POD_CHECKED_KEY}!=True,!airflow-worker'
labels_value = ','.join(label_strings)
if exclude_checked:
labels_value += f',{self.POD_CHECKED_KEY}!=True'
labels_value += ',!airflow-worker'
return labels_value

def _set_name(self, name):
if name is None:
Expand Down
55 changes: 52 additions & 3 deletions kubernetes_tests/test_kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,8 @@ def test_config_path_move(self):
context = create_context(k)
k.execute(context)
expected_pod = copy(self.expected_pod)
expected_pod['metadata']['labels']['already_checked'] = 'True'
actual_pod = self.api_client.sanitize_for_serialization(k.pod)
assert expected_pod == actual_pod
assert actual_pod == expected_pod

def test_working_pod(self):
k = KubernetesPodOperator(
Expand Down Expand Up @@ -210,6 +209,57 @@ def test_delete_operator_pod(self):
assert self.expected_pod['spec'] == actual_pod['spec']
assert self.expected_pod['metadata']['labels'] == actual_pod['metadata']['labels']

def test_already_checked_on_success(self):
"""
When ``is_delete_operator_pod=False``, pod should have 'already_checked'
label, whether pod is successful or not.
"""
pod_name = "test-" + str(random.randint(0, 1000000))
k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo 10"],
labels={"foo": "bar"},
name=pod_name,
task_id="task" + self.get_current_task_name(),
in_cluster=False,
do_xcom_push=False,
is_delete_operator_pod=False,
)
context = create_context(k)
k.execute(context)
actual_pod = k.find_pod('default', context, exclude_checked=False)
actual_pod = self.api_client.sanitize_for_serialization(actual_pod)
assert actual_pod['metadata']['labels']['already_checked'] == 'True'

def test_already_checked_on_failure(self):
"""
When ``is_delete_operator_pod=False``, pod should have 'already_checked'
label, whether pod is successful or not.
"""
pod_name = "test-" + str(random.randint(0, 1000000))
k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["lalala"],
labels={"foo": "bar"},
name=pod_name,
task_id="task" + self.get_current_task_name(),
in_cluster=False,
do_xcom_push=False,
is_delete_operator_pod=False,
)
context = create_context(k)
with pytest.raises(AirflowException):
k.execute(context)
actual_pod = k.find_pod('default', context, exclude_checked=False)
actual_pod = self.api_client.sanitize_for_serialization(actual_pod)
status = next(iter(filter(lambda x: x['name'] == 'base', actual_pod['status']['containerStatuses'])))
assert status['state']['terminated']['reason'] == 'Error'
assert actual_pod['metadata']['labels']['already_checked'] == 'True'

def test_pod_hostnetwork(self):
k = KubernetesPodOperator(
namespace='default',
Expand Down Expand Up @@ -763,7 +813,6 @@ def test_full_pod_spec(self):
'kubernetes_pod_operator': 'True',
'task_id': mock.ANY,
'try_number': '1',
'already_checked': 'True',
}
assert k.pod.spec.containers[0].env == [k8s.V1EnvVar(name="env_name", value="value")]
assert result == {"hello": "world"}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,6 @@ def test_volume_mount(self):
expected_pod['spec']['volumes'] = [
{'name': 'test-volume', 'persistentVolumeClaim': {'claimName': 'test-volume'}}
]
expected_pod['metadata']['labels']['already_checked'] = 'True'
assert expected_pod == actual_pod

def test_run_as_user_root(self):
Expand Down

0 comments on commit 6bbe015

Please sign in to comment.