Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,8 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any:
raise
finally:
self._clean(event=event, context=context, result=xcom_sidecar_output)
if self.do_xcom_push:
return xcom_sidecar_output
Comment on lines +938 to +939
Copy link
Contributor

@johnslavik johnslavik Dec 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be illegal syntax in a future Python version. We'll need to take that return out of finally.
https://peps.python.org/pep-0765/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AutomationDev85 the more I revisit the core the more I think this patch was not good. Actually you wanted to push XCom but with this you are masking the exception raise.

Can you re-work? I assume you to manually xcom_push() and then raise in case of error, else return XCom in this way only on success but not in finally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johnslavik thanks for identifying this issue!! I created a PR to fix this issue: #58998

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume cause is that the ruff rule https://docs.astral.sh/ruff/rules/return-in-try-except-finally/ (SIM107) is not enabled in the repo, that should have catched this. Will put this on my bucket list to enable... or if somebody else wants to make a PR I assume this is an important check we are missing atm.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will put this on my bucket list to enable... or if somebody else wants to make a PR I assume this is an important check we are missing atm.

#59019

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. Thanks @johnslavik !


def _clean(self, event: dict[str, Any], result: dict | None, context: Context) -> None:
if self.pod is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2797,16 +2797,18 @@ def test_async_kpo_wait_termination_before_cleanup_on_success(
}

k = KubernetesPodOperator(task_id="task", deferrable=True, do_xcom_push=do_xcom_push)
k.trigger_reentry({}, success_event)
result = k.trigger_reentry({}, success_event)

# check if it gets the pod
mocked_hook.return_value.get_pod.assert_called_once_with(TEST_NAME, TEST_NAMESPACE)

# assert that the xcom are extracted/not extracted
if do_xcom_push:
mock_extract_xcom.assert_called_once()
assert result == mock_extract_xcom.return_value
else:
mock_extract_xcom.assert_not_called()
assert result is None

# check if it waits for the pod to complete
assert read_pod_mock.call_count == 3
Expand Down