Skip to content

Airflow 3. xcom_pull different behavior between Airflow 2 and 3 #49540

@kosteev

Description

@kosteev

Apache Airflow version

3.0.0

If "Other Airflow 2 version" selected, which one?

No response

What happened?

With the following DAG:

from __future__ import annotations

import logging
from datetime import datetime

from airflow.decorators import task

from airflow.models.dag import DAG


log = logging.getLogger(__name__)


with DAG(
    "xcom_test",
    schedule="* * * * *",  # Override to match your needs,
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=["example", "sheets"],
) as dag:
  @task
  def xcom_string():
    return "string"

  @task
  def xcom_dict():
    return {"a": "b"}


  @task
  def xcom_list():
    return [("a", "b"), ("c", "d")]


  @task
  def read_xcom(**kwargs):
    xcom_string_task_value = kwargs["task_instance"].xcom_pull("xcom_string")
    print("xcom_string_task", type(xcom_string_task_value), xcom_string_task_value)

    xcom_string_task_value_ids = kwargs["task_instance"].xcom_pull(task_ids=["xcom_string"], key="return_value")
    print("xcom_string_task_ids", type(xcom_string_task_value_ids), xcom_string_task_value_ids, xcom_string_task_value_ids[0])

    xcom_dict_task_value = kwargs["task_instance"].xcom_pull("xcom_dict")
    print("xcom_dict_task", type(xcom_dict_task_value), xcom_dict_task_value)

    xcom_dict_task_value_ids = kwargs["task_instance"].xcom_pull(task_ids=["xcom_dict"], key="return_value")
    print("xcom_dict_task_ids", type(xcom_dict_task_value_ids), xcom_dict_task_value_ids, xcom_dict_task_value_ids[0])

    xcom_list_task_value = kwargs["task_instance"].xcom_pull("xcom_list")
    print("xcom_list_task", type(xcom_list_task_value), xcom_list_task_value)

    xcom_list_task_value_ids = kwargs["task_instance"].xcom_pull(task_ids=["xcom_list"], key="return_value")
    print("xcom_list_task_ids", type(xcom_list_task_value_ids), xcom_list_task_value_ids, xcom_list_task_value_ids[0])


  xcom_string = xcom_string()
  xcom_dict = xcom_dict()
  xcom_list = xcom_list()

  read_xcom_task = read_xcom()
  xcom_string >> xcom_dict >> xcom_list >> read_xcom_task

In Airflow 2, the output is:

[2025-04-22, 08:13:20 UTC] {logging_mixin.py:190} INFO - xcom_string_task <class 'str'> string
[2025-04-22, 08:13:20 UTC] {logging_mixin.py:190} INFO - xcom_string_task_ids <class 'airflow.models.xcom.LazyXComSelectSequence'> LazySelectSequence([1 item]) string
[2025-04-22, 08:13:20 UTC] {logging_mixin.py:190} INFO - xcom_dict_task <class 'dict'> {'a': 'b'}
[2025-04-22, 08:13:20 UTC] {logging_mixin.py:190} INFO - xcom_dict_task_ids <class 'airflow.models.xcom.LazyXComSelectSequence'> LazySelectSequence([1 item]) {'a': 'b'}
[2025-04-22, 08:13:20 UTC] {logging_mixin.py:190} INFO - xcom_list_task <class 'list'> [['a', 'b'], ['c', 'd']]
[2025-04-22, 08:13:20 UTC] {logging_mixin.py:190} INFO - xcom_list_task_ids <class 'airflow.models.xcom.LazyXComSelectSequence'> LazySelectSequence([1 item]) [['a', 'b'], ['c', 'd']]
[2025-04-22, 08:13:20 UTC] {python.py:240} INFO - Done. Returned value was: None

In Airflow 3:

Image

What you think should happen instead?

The same behavior in Airflow 2 and 3.
For the case (task_ids=["{task_id}"]), the difference is that in Airflow 2 the result will be list of values, in Airflow 3 the result will be the value itself.

How to reproduce

See section above

Operating System

Linux

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:corekind:bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yetpriority:mediumBug that should be fixed before next release but would not block a release

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions