diff --git a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 7c6e0d8852e5f3..4301a54f02922a 100644 --- a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -482,6 +482,7 @@ def _change_state( self.running.remove(key) except KeyError: self.log.debug("TI key not in running, not adding to event_buffer: %s", key) + return # If we don't have a TI state, look it up from the db. event_buffer expects the TI state if state is None: diff --git a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index 495c09ea23e7c3..42d2a97dffed15 100644 --- a/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -273,16 +273,6 @@ def process_status( (pod_name, namespace, TaskInstanceState.FAILED, annotations, resource_version) ) elif status == "Succeeded": - # We get multiple events once the pod hits a terminal state, and we only want to - # send it along to the scheduler once. - # If our event type is DELETED, or the pod has a deletion timestamp, we've already - # seen the initial Succeeded event and sent it along to the scheduler. - if event["type"] == "DELETED" or pod.metadata.deletion_timestamp: - self.log.info( - "Skipping event for Succeeded pod %s - event for this pod already sent to executor", - pod_name, - ) - return self.log.info("Event: %s Succeeded, annotations: %s", pod_name, annotations_string) self.watcher_queue.put((pod_name, namespace, None, annotations, resource_version)) elif status == "Running": diff --git a/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py index 5240bf0faecb2c..07b42ee3fc22a2 100644 --- a/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/tests/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -784,6 +784,21 @@ def test_change_state_adopted(self, mock_delete_pod, mock_get_kube_client, mock_ finally: executor.end() + @pytest.mark.db_test + @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") + @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") + def test_change_state_key_not_in_running(self, mock_get_kube_client, mock_kubernetes_job_watcher): + executor = self.kubernetes_executor + executor.start() + try: + key = ("dag_id", "task_id", "run_id", "try_number1") + executor.running = set() + executor._change_state(key, State.SUCCESS, "pod_name", "default") + assert executor.event_buffer.get(key) is None + assert executor.running == set() + finally: + executor.end() + @pytest.mark.db_test @pytest.mark.parametrize( "multi_namespace_mode_namespace_list, watchers_keys", @@ -1858,14 +1873,6 @@ def test_process_status_succeeded(self): # We don't know the TI state, so we send in None self.assert_watcher_queue_called_once_with_state(None) - def test_process_status_succeeded_dedup_timestamp(self): - self.pod.status.phase = "Succeeded" - self.pod.metadata.deletion_timestamp = timezone.utcnow() - self.events.append({"type": "MODIFIED", "object": self.pod}) - - self._run() - self.watcher.watcher_queue.put.assert_not_called() - @pytest.mark.parametrize( "ti_state", [