-
Notifications
You must be signed in to change notification settings - Fork 16.4k
AIP 72: Handling "deferrable" tasks in execution_api and task SDK #44241
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
ashb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A good start!
kaxil
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approving pre-emptively as the code looks good but the tests needs a bit more work.
1de3561 to
1051fc5
Compare
|
Squashed commits into 1 and resolved conflicts. Doing one final pass now |
1051fc5 to
c2b6003
Compare
187c6ee to
5bdb4e1
Compare
5bdb4e1 to
4bd9d05
Compare
…ache#44241) closes: apache#44137 Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
…ache#44241) closes: apache#44137 Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
closes: #44137
This PR is trying to port the "deferral" logic from airflow 2 to the airflow 3 (execution api + task sdk)
Summary of changes:
Server side changes (execution api):
TIDeferredStatePayloadinti_update_state-> covered by unit test:test_ti_update_state_to_deferreda. Didn't piggy back on
ti.defer_task()as it extracts the trigger out ofTaskDeferredexception. It is much more expensive to send across multiple models likeTaskInstance,TaskDeferred,Triggerinstead of just the required minimal propertiesb.
returningand not proceeding with query execution as we already do it above https://github.com/apache/airflow/pull/44241/files#diff-d44a72566870079ee943e24bac2af74fb84c426c54d210561a251549a7078ed7L129Client side changes (task sdk):
HTTP client:
Added a new function defer that sends a patch request to the
task-instances/{id}/state executionapi with payload:PatchTIToDeferredComms:
Defining a new data model to send a request to patch ti as "deferred" from task runner to supervisor:
PatchTIToDeferred(Added to ToSupervisor)Supervisor:
_final_stateto support@propertyfinal_statewhich is final state of a TIa. Added a setter to set values for this final state for cases like deferred so that the finish is not called for tasks those aren't in terminal stage: https://github.com/apache/airflow/pull/44241/files#diff-c2651fdee1a25e091e2a9d4f937f8032ca3d289d0de76f38ed88aee5df0f880dL392-L394
finish()whenfinal_stateis notTerminalTIStatehandle_requeststo receive requests from task runner and forwarding the message to http client to call deferTaskRunner:
Task runner executes: ti.task.execute and raises
TaskDeferredfor deferral. This sends a request to supervisor usingSUPERVISOR_COMMSHow was this tested?
test_handle_requestscovers the supervisor + client side of things along with a mock of the message from task runnertest_ti_update_state_to_deferredcovers the scenario for execution APItest_run_deferred_basictests if the DAG raised a task exception and sent the right message across the SUPERVISOR_COMMS or not.Additional sanity tests
Testing the "task runner" entries and exits (client side)
Wrote a async operator DAG:

Breakpoint inside task_runner.py#run
Validated the exception entrypoint with all the variables to be set as required by running the test:

test_run_basicTesting the DB state in the server side (execution API)
Used the test:
test_ti_update_state_to_deferredDebugger inside
ti_update_statemethodValidated the

TaskDeferredand related logicChecked state of the DB on execution

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in newsfragments.