Skip to content

Support pulling multiple XCom values #45243

@kaxil

Description

@kaxil

Currently, we only support pulling single XCom value from ti.xcom_pull in the Task SDK.

Also port tests from

def test_xcom_pull(self, dag_maker):
"""Test xcom_pull, using different filtering methods."""
with dag_maker(dag_id="test_xcom") as dag:
task_1 = EmptyOperator(task_id="test_xcom_1")
task_2 = EmptyOperator(task_id="test_xcom_2")
dagrun = dag_maker.create_dagrun(start_date=timezone.datetime(2016, 6, 1, 0, 0, 0))
ti1 = dagrun.get_task_instance(task_1.task_id)
# Push a value
ti1.xcom_push(key="foo", value="bar")
# Push another value with the same key (but by a different task)
XCom.set(key="foo", value="baz", task_id=task_2.task_id, dag_id=dag.dag_id, run_id=dagrun.run_id)
# Pull with no arguments
result = ti1.xcom_pull()
assert result is None
# Pull the value pushed most recently by any task.
result = ti1.xcom_pull(key="foo")
assert result in "baz"
# Pull the value pushed by the first task
result = ti1.xcom_pull(task_ids="test_xcom_1", key="foo")
assert result == "bar"
# Pull the value pushed by the second task
result = ti1.xcom_pull(task_ids="test_xcom_2", key="foo")
assert result == "baz"
# Pull the values pushed by both tasks & Verify Order of task_ids pass & values returned
result = ti1.xcom_pull(task_ids=["test_xcom_1", "test_xcom_2"], key="foo")
assert result == ["bar", "baz"]
def test_xcom_pull_mapped(self, dag_maker, session):
with dag_maker(dag_id="test_xcom", session=session):
# Use the private _expand() method to avoid the empty kwargs check.
# We don't care about how the operator runs here, only its presence.
task_1 = EmptyOperator.partial(task_id="task_1")._expand(EXPAND_INPUT_EMPTY, strict=False)
EmptyOperator(task_id="task_2")
dagrun = dag_maker.create_dagrun(start_date=timezone.datetime(2016, 6, 1, 0, 0, 0))
ti_1_0 = dagrun.get_task_instance("task_1", session=session)
ti_1_0.map_index = 0
ti_1_1 = session.merge(TI(task_1, run_id=dagrun.run_id, map_index=1, state=ti_1_0.state))
session.flush()
ti_1_0.xcom_push(key=XCOM_RETURN_KEY, value="a", session=session)
ti_1_1.xcom_push(key=XCOM_RETURN_KEY, value="b", session=session)
ti_2 = dagrun.get_task_instance("task_2", session=session)
assert set(ti_2.xcom_pull(["task_1"], session=session)) == {"a", "b"} # Ordering not guaranteed.
assert ti_2.xcom_pull(["task_1"], map_indexes=0, session=session) == ["a"]
assert ti_2.xcom_pull(map_indexes=[0, 1], session=session) == ["a", "b"]
assert ti_2.xcom_pull("task_1", map_indexes=[1, 0], session=session) == ["b", "a"]
assert ti_2.xcom_pull(["task_1"], map_indexes=[0, 1], session=session) == ["a", "b"]
assert ti_2.xcom_pull("task_1", map_indexes=1, session=session) == "b"
assert list(ti_2.xcom_pull("task_1", session=session)) == ["a", "b"]
def test_xcom_pull_after_success(self, create_task_instance):
"""
tests xcom set/clear relative to a task in a 'success' rerun scenario
"""
key = "xcom_key"
value = "xcom_value"
ti = create_task_instance(
dag_id="test_xcom",
schedule="@monthly",
task_id="test_xcom",
pool="test_xcom",
serialized=True,
)
ti.run(mark_success=True)
ti.xcom_push(key=key, value=value)
assert ti.xcom_pull(task_ids="test_xcom", key=key) == value
ti.run()
# Check that we do not clear Xcom until the task is certain to execute
assert ti.xcom_pull(task_ids="test_xcom", key=key) == value
# Xcom shouldn't be cleared if the task doesn't execute, even if dependencies are ignored
ti.run(ignore_all_deps=True, mark_success=True)
assert ti.xcom_pull(task_ids="test_xcom", key=key) == value
# Xcom IS finally cleared once task has executed
ti.run(ignore_all_deps=True)
assert ti.xcom_pull(task_ids="test_xcom", key=key) is None
def test_xcom_pull_after_deferral(self, create_task_instance, session):
"""
tests xcom will not clear before a task runs its next method after deferral.
"""
key = "xcom_key"
value = "xcom_value"
ti = create_task_instance(
dag_id="test_xcom",
schedule="@monthly",
task_id="test_xcom",
pool="test_xcom",
)
ti.run(mark_success=True)
ti.xcom_push(key=key, value=value)
ti.next_method = "execute"
session.merge(ti)
session.commit()
ti.run(ignore_all_deps=True)
assert ti.xcom_pull(task_ids="test_xcom", key=key) == value
def test_xcom_pull_different_logical_date(self, create_task_instance):
"""
tests xcom fetch behavior with different logical dates, using
both xcom_pull with "include_prior_dates" and without
"""
key = "xcom_key"
value = "xcom_value"
ti = create_task_instance(
dag_id="test_xcom",
schedule="@monthly",
task_id="test_xcom",
pool="test_xcom",
)
exec_date = ti.dag_run.logical_date
ti.run(mark_success=True)
ti.xcom_push(key=key, value=value)
assert ti.xcom_pull(task_ids="test_xcom", key=key) == value
ti.run()
exec_date += datetime.timedelta(days=1)
triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST} if AIRFLOW_V_3_0_PLUS else {}
dr = ti.task.dag.create_dagrun(
run_id="test2",
data_interval=(exec_date, exec_date),
state=None,
**triggered_by_kwargs,
)
ti = TI(task=ti.task, run_id=dr.run_id)
ti.run()
# We have set a new logical date (and did not pass in
# 'include_prior_dates'which means this task should now have a cleared
# xcom value
assert ti.xcom_pull(task_ids="test_xcom", key=key) is None
# We *should* get a value using 'include_prior_dates'
assert ti.xcom_pull(task_ids="test_xcom", key=key, include_prior_dates=True) == value
def test_xcom_pull_different_run_ids(self, create_task_instance):
"""
tests xcom fetch behavior w/different run ids
"""
key = "xcom_key"
task_id = "test_xcom"
diff_run_id = "diff_run_id"
same_run_id_value = "xcom_value_same_run_id"
diff_run_id_value = "xcom_value_different_run_id"
ti_same_run_id = create_task_instance(
dag_id="test_xcom",
task_id=task_id,
)
ti_same_run_id.run(mark_success=True)
ti_same_run_id.xcom_push(key=key, value=same_run_id_value)
ti_diff_run_id = create_task_instance(
dag_id="test_xcom",
task_id=task_id,
run_id=diff_run_id,
)
ti_diff_run_id.run(mark_success=True)
ti_diff_run_id.xcom_push(key=key, value=diff_run_id_value)
assert (
ti_same_run_id.xcom_pull(run_id=ti_same_run_id.dag_run.run_id, task_ids=task_id, key=key)
== same_run_id_value
)
assert (
ti_same_run_id.xcom_pull(run_id=ti_diff_run_id.dag_run.run_id, task_ids=task_id, key=key)
== diff_run_id_value
)
def test_xcom_push_flag(self, dag_maker):
"""
Tests the option for Operators to push XComs
"""
value = "hello"
task_id = "test_no_xcom_push"
with dag_maker(dag_id="test_xcom", serialized=True):
# nothing saved to XCom
task = PythonOperator(
task_id=task_id,
python_callable=lambda: value,
do_xcom_push=False,
)
ti = dag_maker.create_dagrun(logical_date=timezone.utcnow()).task_instances[0]
ti.task = task
ti.run()
assert ti.xcom_pull(task_ids=task_id, key=XCOM_RETURN_KEY) is None

Metadata

Metadata

Assignees

Type

No type

Projects

Status

Done

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions