Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 0 additions & 30 deletions RELEASE_NOTES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,6 @@ aligning with the broader asset-aware execution model introduced in Airflow 3.0.
Behaviour change in ``xcom_pull``
"""""""""""""""""""""""""""""""""

**Pulling without setting ``task_ids``**:

In Airflow 2, the ``xcom_pull()`` method allowed pulling XComs by key without specifying task_ids, despite the fact that the underlying
DB model defines task_id as part of the XCom primary key. This created ambiguity: if two tasks pushed XComs with the same key,
``xcom_pull()`` would pull whichever one happened to be first, leading to unpredictable behavior.
Expand All @@ -354,34 +352,6 @@ Should be updated to::
kwargs["ti"].xcom_pull(task_ids="task1", key="key")


**Return Type Change for Single Task ID**:

In Airflow 2, when using ``xcom_pull()`` with a single task ID in a list (e.g., ``task_ids=["task1"]``), it would return a ``LazyXComSelectSequence``
object containing one value. In Airflow 3.0.0, this behavior was changed to return the value directly.

So, if you previously used:

.. code-block:: python

xcom_values = kwargs["ti"].xcom_pull(task_ids=["task1"], key="key")
xcom_value = xcom_values[0] # Access the first value

You would now get the value directly, rather than a sequence containing one value.

.. code-block:: python

xcom_value = kwargs["ti"].xcom_pull(task_ids=["task1"], key="key")

The previous behaviour (returning list when passed a list) will be restored in Airflow 3.0.1 to maintain backward compatibility.

However, it is recommended to be explicit about your intentions when using ``task_ids`` (after the fix in 3.0.1):

- If you want a single value, use ``task_ids="task1"``
- If you want a sequence, use ``task_ids=["task1"]``

This makes the code more explicit and easier to understand.


Removed Configuration Keys
"""""""""""""""""""""""""""

Expand Down
4 changes: 2 additions & 2 deletions reproducible_build.yaml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
release-notes-hash: 77a6fba681cf21973ca9712136d1b51a
source-date-epoch: 1745327923
release-notes-hash: df3b67b987fd909d16f6158df78b3813
source-date-epoch: 1745660315
6 changes: 5 additions & 1 deletion task-sdk/src/airflow/sdk/execution_time/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,9 @@ def xcom_pull(
if run_id is None:
run_id = self.run_id

single_task_requested = isinstance(task_ids, (str, type(None)))
single_map_index_requested = isinstance(map_indexes, (int, type(None), ArgNotSet))

if task_ids is None:
# default to the current task if not provided
task_ids = [self.task_id]
Expand Down Expand Up @@ -363,8 +366,9 @@ def xcom_pull(
else:
xcoms.append(value)

if len(xcoms) == 1:
if single_task_requested and single_map_index_requested:
return xcoms[0]

return xcoms

def xcom_push(self, key: str, value: Any):
Expand Down
50 changes: 49 additions & 1 deletion task-sdk/tests/task_sdk/execution_time/test_task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1267,7 +1267,7 @@ def test_xcom_pull(
):
"""
Test that a task makes an expected call to the Supervisor to pull XCom values
based on various task_ids and map_indexes configurations.
based on various task_ids, map_indexes, and xcom_values configurations.
"""
map_indexes_kwarg = {} if map_indexes is NOTSET else {"map_indexes": map_indexes}
task_ids_kwarg = {} if task_ids is NOTSET else {"task_ids": task_ids}
Expand Down Expand Up @@ -1313,6 +1313,54 @@ def execute(self, context):
),
)

@pytest.mark.parametrize(
"task_ids, map_indexes, expected_value",
[
pytest.param("task_a", 0, {"a": 1, "b": 2}, id="task_id is str, map_index is int"),
pytest.param("task_a", [0], [{"a": 1, "b": 2}], id="task_id is str, map_index is list"),
pytest.param("task_a", None, {"a": 1, "b": 2}, id="task_id is str, map_index is None"),
pytest.param("task_a", NOTSET, {"a": 1, "b": 2}, id="task_id is str, map_index is ArgNotSet"),
pytest.param(["task_a"], 0, [{"a": 1, "b": 2}], id="task_id is list, map_index is int"),
pytest.param(["task_a"], [0], [{"a": 1, "b": 2}], id="task_id is list, map_index is list"),
pytest.param(["task_a"], None, [{"a": 1, "b": 2}], id="task_id is list, map_index is None"),
pytest.param(
["task_a"], NOTSET, [{"a": 1, "b": 2}], id="task_id is list, map_index is ArgNotSet"
),
pytest.param(None, 0, {"a": 1, "b": 2}, id="task_id is None, map_index is int"),
pytest.param(None, [0], [{"a": 1, "b": 2}], id="task_id is None, map_index is list"),
pytest.param(None, None, {"a": 1, "b": 2}, id="task_id is None, map_index is None"),
pytest.param(None, NOTSET, {"a": 1, "b": 2}, id="task_id is None, map_index is ArgNotSet"),
],
)
def test_xcom_pull_return_values(
self,
create_runtime_ti,
mock_supervisor_comms,
task_ids,
map_indexes,
expected_value,
):
"""
Tests return value of xcom_pull under various combinations of task_ids and map_indexes.
The above test covers the expected calls to supervisor comms.
"""

class CustomOperator(BaseOperator):
def execute(self, context):
print("This is a custom operator")

test_task_id = "pull_task"
task = CustomOperator(task_id=test_task_id)
runtime_ti = create_runtime_ti(task=task)

value = {"a": 1, "b": 2}
# API server returns serialised value for xcom result, staging it in that way
xcom_value = BaseXCom.serialize_value(value)
mock_supervisor_comms.get_message.return_value = XComResult(key="key", value=xcom_value)

returned_xcom = runtime_ti.xcom_pull(key="key", task_ids=task_ids, map_indexes=map_indexes)
assert returned_xcom == expected_value

def test_get_param_from_context(
self, mocked_parse, make_ti_context, mock_supervisor_comms, create_runtime_ti
):
Expand Down