-
Notifications
You must be signed in to change notification settings - Fork 16.4k
AIP-72: Add support for get_current_context in Task SDK
#45486
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
06b7acd to
66526e0
Compare
66526e0 to
9627d5a
Compare
9f96a41 to
407002b
Compare
uranusjr
reviewed
Jan 9, 2025
uranusjr
approved these changes
Jan 9, 2025
closes apache#45234 I am putting the logic for `set_current_context` in `execution_time/context.py`. I didn't want to put `_CURRENT_CONTEXT` in `task_sdk/src/airflow/sdk/definitions/contextmanager.py` to avoid execution logic in a user-facing module but I couldn't think of another way to store it from execution & allow retrieving (via `get_current_context` in the Standard Provider) in their Task. Upcoming PRs: - Move most of the internal stuff in Task SDK to a separate module. - Use `create_runtime_ti` fixture more widely in tests --- Tested with the following DAG: ```py import pendulum from airflow.decorators import dag, task from airflow.providers.standard.operators.python import get_current_context @dag( schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, ) def x_get_context(): @task def template_test(data_interval_end): context = get_current_context() # Will print `2024-10-10 00:00:00+00:00`. # Note how we didn't pass this value when calling the task. Instead # it was passed by the decorator from the context print(f"data_interval_end: {data_interval_end}") # Will print the full context dict print(f"context: {context}") template_test() x_get_context() ``` <img width="1703" alt="image" src="https://github.com/user-attachments/assets/2763963a-d299-412f-bee3-3b20904ca7c8" />
407002b to
14de0a6
Compare
kaxil
added a commit
to astronomer/airflow
that referenced
this pull request
Jan 9, 2025
I had added `create_runtime_ti` in apache#45486, this PR modifies the fixture a little bit and uses it more broadly.
kaxil
added a commit
to astronomer/airflow
that referenced
this pull request
Jan 9, 2025
I had added `create_runtime_ti` in apache#45486, this PR modifies the fixture a little bit and uses it more broadly.
kaxil
added a commit
that referenced
this pull request
Jan 9, 2025
I had added `create_runtime_ti` in #45486, this PR modifies the fixture a little bit and uses it more broadly.
amoghrajesh
reviewed
Jan 9, 2025
Comment on lines
+101
to
+113
| This fixture sets up a `RuntimeTaskInstance` with default or custom `TIRunContext` and `StartupDetails`, | ||
| making it easy to simulate task execution scenarios in tests. | ||
|
|
||
| Example usage: :: | ||
|
|
||
| def test_custom_task_instance(create_runtime_ti): | ||
| class MyTaskOperator(BaseOperator): | ||
| def execute(self, context): | ||
| assert context["dag_run"].run_id == "test_run" | ||
|
|
||
| task = MyTaskOperator(task_id="test_task") | ||
| ti = create_runtime_ti(task, context_from_server=make_ti_context(run_id="test_run")) | ||
| # Further test logic... |
Contributor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Amazing!
karenbraganz
pushed a commit
to karenbraganz/airflow
that referenced
this pull request
Jan 13, 2025
closes apache#45234 I am putting the logic for `set_current_context` in `execution_time/context.py`. I didn't want to put `_CURRENT_CONTEXT` in `task_sdk/src/airflow/sdk/definitions/contextmanager.py` to avoid execution logic in a user-facing module but I couldn't think of another way to store it from execution & allow retrieving (via `get_current_context` in the Standard Provider) in their Task. Upcoming PRs: - Move most of the internal stuff in Task SDK to a separate module. - Use `create_runtime_ti` fixture more widely in tests --- Tested with the following DAG: ```py import pendulum from airflow.decorators import dag, task from airflow.providers.standard.operators.python import get_current_context @dag( schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, ) def x_get_context(): @task def template_test(data_interval_end): context = get_current_context() # Will print `2024-10-10 00:00:00+00:00`. # Note how we didn't pass this value when calling the task. Instead # it was passed by the decorator from the context print(f"data_interval_end: {data_interval_end}") # Will print the full context dict print(f"context: {context}") template_test() x_get_context() ``` <img width="1703" alt="image" src="https://github.com/user-attachments/assets/2763963a-d299-412f-bee3-3b20904ca7c8" />
karenbraganz
pushed a commit
to karenbraganz/airflow
that referenced
this pull request
Jan 13, 2025
I had added `create_runtime_ti` in apache#45486, this PR modifies the fixture a little bit and uses it more broadly.
HariGS-DB
pushed a commit
to HariGS-DB/airflow
that referenced
this pull request
Jan 16, 2025
closes apache#45234 I am putting the logic for `set_current_context` in `execution_time/context.py`. I didn't want to put `_CURRENT_CONTEXT` in `task_sdk/src/airflow/sdk/definitions/contextmanager.py` to avoid execution logic in a user-facing module but I couldn't think of another way to store it from execution & allow retrieving (via `get_current_context` in the Standard Provider) in their Task. Upcoming PRs: - Move most of the internal stuff in Task SDK to a separate module. - Use `create_runtime_ti` fixture more widely in tests --- Tested with the following DAG: ```py import pendulum from airflow.decorators import dag, task from airflow.providers.standard.operators.python import get_current_context @dag( schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, ) def x_get_context(): @task def template_test(data_interval_end): context = get_current_context() # Will print `2024-10-10 00:00:00+00:00`. # Note how we didn't pass this value when calling the task. Instead # it was passed by the decorator from the context print(f"data_interval_end: {data_interval_end}") # Will print the full context dict print(f"context: {context}") template_test() x_get_context() ``` <img width="1703" alt="image" src="https://github.com/user-attachments/assets/2763963a-d299-412f-bee3-3b20904ca7c8" />
HariGS-DB
pushed a commit
to HariGS-DB/airflow
that referenced
this pull request
Jan 16, 2025
I had added `create_runtime_ti` in apache#45486, this PR modifies the fixture a little bit and uses it more broadly.
got686-yandex
pushed a commit
to got686-yandex/airflow
that referenced
this pull request
Jan 30, 2025
closes apache#45234 I am putting the logic for `set_current_context` in `execution_time/context.py`. I didn't want to put `_CURRENT_CONTEXT` in `task_sdk/src/airflow/sdk/definitions/contextmanager.py` to avoid execution logic in a user-facing module but I couldn't think of another way to store it from execution & allow retrieving (via `get_current_context` in the Standard Provider) in their Task. Upcoming PRs: - Move most of the internal stuff in Task SDK to a separate module. - Use `create_runtime_ti` fixture more widely in tests --- Tested with the following DAG: ```py import pendulum from airflow.decorators import dag, task from airflow.providers.standard.operators.python import get_current_context @dag( schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, ) def x_get_context(): @task def template_test(data_interval_end): context = get_current_context() # Will print `2024-10-10 00:00:00+00:00`. # Note how we didn't pass this value when calling the task. Instead # it was passed by the decorator from the context print(f"data_interval_end: {data_interval_end}") # Will print the full context dict print(f"context: {context}") template_test() x_get_context() ``` <img width="1703" alt="image" src="https://github.com/user-attachments/assets/2763963a-d299-412f-bee3-3b20904ca7c8" />
got686-yandex
pushed a commit
to got686-yandex/airflow
that referenced
this pull request
Jan 30, 2025
I had added `create_runtime_ti` in apache#45486, this PR modifies the fixture a little bit and uses it more broadly.
kosteev
pushed a commit
to GoogleCloudPlatform/composer-airflow
that referenced
this pull request
May 28, 2025
I had added `create_runtime_ti` in apache/airflow#45486, this PR modifies the fixture a little bit and uses it more broadly. GitOrigin-RevId: 0e114c2a70642c17b0be7ff76c5d86f4145253b7
kosteev
pushed a commit
to GoogleCloudPlatform/composer-airflow
that referenced
this pull request
Sep 23, 2025
I had added `create_runtime_ti` in apache/airflow#45486, this PR modifies the fixture a little bit and uses it more broadly. GitOrigin-RevId: 0e114c2a70642c17b0be7ff76c5d86f4145253b7
kosteev
pushed a commit
to GoogleCloudPlatform/composer-airflow
that referenced
this pull request
Oct 21, 2025
I had added `create_runtime_ti` in apache/airflow#45486, this PR modifies the fixture a little bit and uses it more broadly. GitOrigin-RevId: 0e114c2a70642c17b0be7ff76c5d86f4145253b7
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Labels
area:task-execution-interface-aip72
AIP-72: Task Execution Interface (TEI) aka Task SDK
area:task-sdk
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
closes #45234
I am putting the logic for
set_current_contextinexecution_time/context.py. I didn't want to put_CURRENT_CONTEXTintask_sdk/src/airflow/sdk/definitions/contextmanager.pyto avoid execution logic in a user-facing module but I couldn't think of another way to store it from execution & allow retrieving (viaget_current_contextin the Standard Provider) in their Task.Upcoming PRs:
create_runtime_tifixture more widely in testsTested with the following DAG:
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in newsfragments.