Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion airflow/providers/cncf/kubernetes/operators/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,9 @@ def execute_sync(self, context: Context):
)

if self.do_xcom_push:
self.pod_manager.await_xcom_sidecar_container_start(pod=self.pod)
self.pod_manager.await_xcom_sidecar_container_start(
pod=self.pod, startup_timeout=self.startup_timeout_seconds
)
result = self.extract_xcom(pod=self.pod)
istio_enabled = self.is_istio_enabled(self.pod)
self.remote_pod = self.pod_manager.await_pod_completion(
Expand Down
15 changes: 14 additions & 1 deletion airflow/providers/cncf/kubernetes/utils/pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -721,14 +721,27 @@ def read_pod(self, pod: V1Pod) -> V1Pod:
except HTTPError as e:
raise AirflowException(f"There was an error reading the kubernetes API: {e}")

def await_xcom_sidecar_container_start(self, pod: V1Pod) -> None:
def await_xcom_sidecar_container_start(self, pod: V1Pod, startup_timeout: int = 120) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add test cases for this change? Like basically call this, sleep for certain seconds and then check if the exception came up

"""
Await xcom sidecar container to start running.

:param pod:
:param startup_timeout: Timeout (in seconds) for startup of the pod
"""
self.log.info("Checking if xcom sidecar container is started.")
start_time = time.time()
for attempt in itertools.count():
if self.container_is_running(pod, PodDefaults.SIDECAR_CONTAINER_NAME):
self.log.info("The xcom sidecar container is started.")
break
if not attempt:
self.log.warning("The xcom sidecar container is not yet started.")
if time.time() - start_time >= startup_timeout:
msg = (
f"Xcom sidecar container took longer than {startup_timeout} seconds to start. "
"Check the container events in kubernetes to determine why."
)
Comment on lines +740 to +743
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just dump the events as well? That would be a better indicator than users manually checking in k8s

raise AirflowException(msg)
time.sleep(1)

def extract_xcom(self, pod: V1Pod) -> str:
Expand Down
2 changes: 1 addition & 1 deletion tests/providers/cncf/kubernetes/operators/test_pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -1507,7 +1507,7 @@ def test_get_logs_but_not_for_base_container(
# check that KPO waits for the base container to complete before proceeding to extract XCom
mock_await_container_completion.assert_called_once_with(pod=pod, container_name="base")
# check that we wait for the xcom sidecar to start before extracting XCom
mock_await_xcom_sidecar.assert_called_once_with(pod=pod)
mock_await_xcom_sidecar.assert_called_once_with(pod=pod, startup_timeout=120)

@patch(HOOK_CLASS, new=MagicMock)
@patch(KUB_OP_PATH.format("find_pod"))
Expand Down