-
Notifications
You must be signed in to change notification settings - Fork 16.4k
AIP-72: Allow pushing and pulling XCom from Task Context #45075
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ed34297 to
3a60948
Compare
kaxil
added a commit
that referenced
this pull request
Dec 20, 2024
It fixes the following bug
```python
{"timestamp":"2024-12-20T10:38:56.890735","logger":"task","error_detail":
[{"exc_type":"RecursionError","exc_value":"maximum recursion depth exceeded in comparison","syntax_error":null,"is_cause":false,"frames":
[
{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":382,"name":"main"},
{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":317,"name":"run"},
{"filename":"/opt/airflow/airflow/models/baseoperator.py","lineno":378,"name":"wrapper"},
{"filename":"/opt/airflow/providers/src/airflow/providers/standard/operators/python.py","lineno":182,"name":"execute"},
{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/definitions/baseoperator.py","lineno":660,"name":"__setattr__"},
{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/definitions/baseoperator.py","lineno":1126,"name":"_set_xcomargs_dependency"},
{"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":132,"name":"apply_upstream_relationship"},
{"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":118,"name":"iter_xcom_references"},
{"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":121,"name":"iter_xcom_references"},
{"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":118,"name":"iter_xcom_references"},
...
```
To reproduce just run `tutorial_dag` or the following minimal dag:
```python
import pendulum
from airflow.models.dag import DAG
from airflow.providers.standard.operators.python import PythonOperator
with DAG(
"sdk_tutorial_dag",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
) as dag:
dag.doc_md = __doc__
def extract(**kwargs):
ti = kwargs["ti"]
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
ti.xcom_push("order_data", data_string)
extract_task = PythonOperator(
task_id="extract",
python_callable=extract,
)
extract_task
```
I need this fix for #45075 (part of the getting [Task Context working with AIP-72](#44481))
dc62eb8 to
cc78190
Compare
kaxil
commented
Dec 24, 2024
ashb
approved these changes
Dec 24, 2024
Member
ashb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few nits
Closed
4 tasks
HariGS-DB
pushed a commit
to HariGS-DB/airflow
that referenced
this pull request
Jan 16, 2025
Part of apache#44481 There is a lot of cleanup to do but I wanted to get a basic DAG that uses XCom working first. Example DAG used: `tutorial_dag` ```py from __future__ import annotations # [START tutorial] # [START import_module] import json import textwrap import pendulum # The DAG object; we'll need this to instantiate a DAG from airflow.models.dag import DAG # Operators; we need this to operate! from airflow.providers.standard.operators.python import PythonOperator # [END import_module] # [START instantiate_dag] with DAG( "tutorial_dag", # [START default_args] # These args will get passed on to each operator # You can override them on a per-task basis during operator initialization default_args={"retries": 2}, # [END default_args] description="DAG tutorial", schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"], ) as dag: # [END instantiate_dag] # [START documentation] dag.doc_md = __doc__ # [END documentation] # [START extract_function] def extract(**kwargs): ti = kwargs["ti"] data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}' ti.xcom_push("order_data", data_string) # [END extract_function] # [START transform_function] def transform(**kwargs): ti = kwargs["ti"] extract_data_string = ti.xcom_pull(task_ids="extract", key="order_data") order_data = json.loads(extract_data_string) total_order_value = 0 for value in order_data.values(): total_order_value += value total_value = {"total_order_value": total_order_value} total_value_json_string = json.dumps(total_value) ti.xcom_push("total_order_value", total_value_json_string) # [END transform_function] # [START load_function] def load(**kwargs): ti = kwargs["ti"] total_value_string = ti.xcom_pull(task_ids="transform", key="total_order_value") total_order_value = json.loads(total_value_string) print(total_order_value) # [END load_function] # [START main_flow] extract_task = PythonOperator( task_id="extract", python_callable=extract, ) extract_task.doc_md = textwrap.dedent( """\ #### Extract task A simple Extract task to get data ready for the rest of the data pipeline. In this case, getting data is simulated by reading from a hardcoded JSON string. This data is then put into xcom, so that it can be processed by the next task. """ ) transform_task = PythonOperator( task_id="transform", python_callable=transform, ) transform_task.doc_md = textwrap.dedent( """\ #### Transform task A simple Transform task which takes in the collection of order data from xcom and computes the total order value. This computed value is then put into xcom, so that it can be processed by the next task. """ ) load_task = PythonOperator( task_id="load", python_callable=load, ) load_task.doc_md = textwrap.dedent( """\ #### Load task A simple Load task which takes in the result of the Transform task, by reading it from xcom and instead of saving it to end user review, just prints it out. """ ) extract_task >> transform_task >> load_task ``` --- <img width="1703" alt="image" src="https://github.com/user-attachments/assets/10025ef4-0410-4c2a-9bb6-1e68f51a8805" /> <img width="1710" alt="image" src="https://github.com/user-attachments/assets/201b61c0-3998-4b06-b0d4-2145120321f8" /> --- <img width="1721" alt="image" src="https://github.com/user-attachments/assets/dd9c50e3-20c5-4762-99f9-c02a8c16732e" />
got686-yandex
pushed a commit
to got686-yandex/airflow
that referenced
this pull request
Jan 30, 2025
It fixes the following bug
```python
{"timestamp":"2024-12-20T10:38:56.890735","logger":"task","error_detail":
[{"exc_type":"RecursionError","exc_value":"maximum recursion depth exceeded in comparison","syntax_error":null,"is_cause":false,"frames":
[
{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":382,"name":"main"},
{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":317,"name":"run"},
{"filename":"/opt/airflow/airflow/models/baseoperator.py","lineno":378,"name":"wrapper"},
{"filename":"/opt/airflow/providers/src/airflow/providers/standard/operators/python.py","lineno":182,"name":"execute"},
{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/definitions/baseoperator.py","lineno":660,"name":"__setattr__"},
{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/definitions/baseoperator.py","lineno":1126,"name":"_set_xcomargs_dependency"},
{"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":132,"name":"apply_upstream_relationship"},
{"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":118,"name":"iter_xcom_references"},
{"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":121,"name":"iter_xcom_references"},
{"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":118,"name":"iter_xcom_references"},
...
```
To reproduce just run `tutorial_dag` or the following minimal dag:
```python
import pendulum
from airflow.models.dag import DAG
from airflow.providers.standard.operators.python import PythonOperator
with DAG(
"sdk_tutorial_dag",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
) as dag:
dag.doc_md = __doc__
def extract(**kwargs):
ti = kwargs["ti"]
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
ti.xcom_push("order_data", data_string)
extract_task = PythonOperator(
task_id="extract",
python_callable=extract,
)
extract_task
```
I need this fix for apache#45075 (part of the getting [Task Context working with AIP-72](apache#44481))
got686-yandex
pushed a commit
to got686-yandex/airflow
that referenced
this pull request
Jan 30, 2025
Part of apache#44481 There is a lot of cleanup to do but I wanted to get a basic DAG that uses XCom working first. Example DAG used: `tutorial_dag` ```py from __future__ import annotations # [START tutorial] # [START import_module] import json import textwrap import pendulum # The DAG object; we'll need this to instantiate a DAG from airflow.models.dag import DAG # Operators; we need this to operate! from airflow.providers.standard.operators.python import PythonOperator # [END import_module] # [START instantiate_dag] with DAG( "tutorial_dag", # [START default_args] # These args will get passed on to each operator # You can override them on a per-task basis during operator initialization default_args={"retries": 2}, # [END default_args] description="DAG tutorial", schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"], ) as dag: # [END instantiate_dag] # [START documentation] dag.doc_md = __doc__ # [END documentation] # [START extract_function] def extract(**kwargs): ti = kwargs["ti"] data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}' ti.xcom_push("order_data", data_string) # [END extract_function] # [START transform_function] def transform(**kwargs): ti = kwargs["ti"] extract_data_string = ti.xcom_pull(task_ids="extract", key="order_data") order_data = json.loads(extract_data_string) total_order_value = 0 for value in order_data.values(): total_order_value += value total_value = {"total_order_value": total_order_value} total_value_json_string = json.dumps(total_value) ti.xcom_push("total_order_value", total_value_json_string) # [END transform_function] # [START load_function] def load(**kwargs): ti = kwargs["ti"] total_value_string = ti.xcom_pull(task_ids="transform", key="total_order_value") total_order_value = json.loads(total_value_string) print(total_order_value) # [END load_function] # [START main_flow] extract_task = PythonOperator( task_id="extract", python_callable=extract, ) extract_task.doc_md = textwrap.dedent( """\ #### Extract task A simple Extract task to get data ready for the rest of the data pipeline. In this case, getting data is simulated by reading from a hardcoded JSON string. This data is then put into xcom, so that it can be processed by the next task. """ ) transform_task = PythonOperator( task_id="transform", python_callable=transform, ) transform_task.doc_md = textwrap.dedent( """\ #### Transform task A simple Transform task which takes in the collection of order data from xcom and computes the total order value. This computed value is then put into xcom, so that it can be processed by the next task. """ ) load_task = PythonOperator( task_id="load", python_callable=load, ) load_task.doc_md = textwrap.dedent( """\ #### Load task A simple Load task which takes in the result of the Transform task, by reading it from xcom and instead of saving it to end user review, just prints it out. """ ) extract_task >> transform_task >> load_task ``` --- <img width="1703" alt="image" src="https://github.com/user-attachments/assets/10025ef4-0410-4c2a-9bb6-1e68f51a8805" /> <img width="1710" alt="image" src="https://github.com/user-attachments/assets/201b61c0-3998-4b06-b0d4-2145120321f8" /> --- <img width="1721" alt="image" src="https://github.com/user-attachments/assets/dd9c50e3-20c5-4762-99f9-c02a8c16732e" />
kosteev
pushed a commit
to GoogleCloudPlatform/composer-airflow
that referenced
this pull request
May 28, 2025
It fixes the following bug
```python
{"timestamp":"2024-12-20T10:38:56.890735","logger":"task","error_detail":
[{"exc_type":"RecursionError","exc_value":"maximum recursion depth exceeded in comparison","syntax_error":null,"is_cause":false,"frames":
[
{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":382,"name":"main"},
{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":317,"name":"run"},
{"filename":"/opt/airflow/airflow/models/baseoperator.py","lineno":378,"name":"wrapper"},
{"filename":"/opt/airflow/providers/src/airflow/providers/standard/operators/python.py","lineno":182,"name":"execute"},
{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/definitions/baseoperator.py","lineno":660,"name":"__setattr__"},
{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/definitions/baseoperator.py","lineno":1126,"name":"_set_xcomargs_dependency"},
{"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":132,"name":"apply_upstream_relationship"},
{"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":118,"name":"iter_xcom_references"},
{"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":121,"name":"iter_xcom_references"},
{"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":118,"name":"iter_xcom_references"},
...
```
To reproduce just run `tutorial_dag` or the following minimal dag:
```python
import pendulum
from airflow.models.dag import DAG
from airflow.providers.standard.operators.python import PythonOperator
with DAG(
"sdk_tutorial_dag",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
) as dag:
dag.doc_md = __doc__
def extract(**kwargs):
ti = kwargs["ti"]
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
ti.xcom_push("order_data", data_string)
extract_task = PythonOperator(
task_id="extract",
python_callable=extract,
)
extract_task
```
I need this fix for apache/airflow#45075 (part of the getting [Task Context working with AIP-72](apache/airflow#44481))
GitOrigin-RevId: 0465cdd5ce883d62d9c4d01ca7251e5781477cbd
kosteev
pushed a commit
to GoogleCloudPlatform/composer-airflow
that referenced
this pull request
Sep 23, 2025
It fixes the following bug
```python
{"timestamp":"2024-12-20T10:38:56.890735","logger":"task","error_detail":
[{"exc_type":"RecursionError","exc_value":"maximum recursion depth exceeded in comparison","syntax_error":null,"is_cause":false,"frames":
[
{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":382,"name":"main"},
{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":317,"name":"run"},
{"filename":"/opt/airflow/airflow/models/baseoperator.py","lineno":378,"name":"wrapper"},
{"filename":"/opt/airflow/providers/src/airflow/providers/standard/operators/python.py","lineno":182,"name":"execute"},
{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/definitions/baseoperator.py","lineno":660,"name":"__setattr__"},
{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/definitions/baseoperator.py","lineno":1126,"name":"_set_xcomargs_dependency"},
{"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":132,"name":"apply_upstream_relationship"},
{"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":118,"name":"iter_xcom_references"},
{"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":121,"name":"iter_xcom_references"},
{"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":118,"name":"iter_xcom_references"},
...
```
To reproduce just run `tutorial_dag` or the following minimal dag:
```python
import pendulum
from airflow.models.dag import DAG
from airflow.providers.standard.operators.python import PythonOperator
with DAG(
"sdk_tutorial_dag",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
) as dag:
dag.doc_md = __doc__
def extract(**kwargs):
ti = kwargs["ti"]
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
ti.xcom_push("order_data", data_string)
extract_task = PythonOperator(
task_id="extract",
python_callable=extract,
)
extract_task
```
I need this fix for apache/airflow#45075 (part of the getting [Task Context working with AIP-72](apache/airflow#44481))
GitOrigin-RevId: 0465cdd5ce883d62d9c4d01ca7251e5781477cbd
kosteev
pushed a commit
to GoogleCloudPlatform/composer-airflow
that referenced
this pull request
Oct 21, 2025
It fixes the following bug
```python
{"timestamp":"2024-12-20T10:38:56.890735","logger":"task","error_detail":
[{"exc_type":"RecursionError","exc_value":"maximum recursion depth exceeded in comparison","syntax_error":null,"is_cause":false,"frames":
[
{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":382,"name":"main"},
{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":317,"name":"run"},
{"filename":"/opt/airflow/airflow/models/baseoperator.py","lineno":378,"name":"wrapper"},
{"filename":"/opt/airflow/providers/src/airflow/providers/standard/operators/python.py","lineno":182,"name":"execute"},
{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/definitions/baseoperator.py","lineno":660,"name":"__setattr__"},
{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/definitions/baseoperator.py","lineno":1126,"name":"_set_xcomargs_dependency"},
{"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":132,"name":"apply_upstream_relationship"},
{"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":118,"name":"iter_xcom_references"},
{"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":121,"name":"iter_xcom_references"},
{"filename":"/opt/airflow/airflow/models/xcom_arg.py","lineno":118,"name":"iter_xcom_references"},
...
```
To reproduce just run `tutorial_dag` or the following minimal dag:
```python
import pendulum
from airflow.models.dag import DAG
from airflow.providers.standard.operators.python import PythonOperator
with DAG(
"sdk_tutorial_dag",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
) as dag:
dag.doc_md = __doc__
def extract(**kwargs):
ti = kwargs["ti"]
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
ti.xcom_push("order_data", data_string)
extract_task = PythonOperator(
task_id="extract",
python_callable=extract,
)
extract_task
```
I need this fix for apache/airflow#45075 (part of the getting [Task Context working with AIP-72](apache/airflow#44481))
GitOrigin-RevId: 0465cdd5ce883d62d9c4d01ca7251e5781477cbd
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Part of #44481
There is a lot of cleanup to do but I wanted to get a basic DAG that uses XCom working first.
Example DAG used:
tutorial_dag^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in newsfragments.