Skip to content

Conversation

@amoghrajesh
Copy link
Contributor

@amoghrajesh amoghrajesh commented Dec 13, 2024

related: #44414

Server side changes (execution api):

  1. Handling TIRescheduleStatePayload in ti_update_state -> covered by unit test: test_ti_update_state_to_reschedule
  2. Defining a datamodel for TIRescheduleStatePayload and adding it to the discriminator: ti_state_discriminator

Client side changes (task sdk):

HTTP client:

Added a new function defer that sends a patch request to the task-instances/{id}/state execution api with payload: RescheduleTask

Comms:

Defining a new data model to send a request to patch ti as "up_for_reschedule" from task runner to supervisor: RescheduleTask (Added to ToSupervisor)

Supervisor:

  1. Extended handle_requests to receive requests from task runner and forwarding the message to http client to call reschedule.

TaskRunner:

Task runner executes: ti.task.execute and raises AirflowRescheduleException for rescheduling. This sends a request to supervisor using SUPERVISOR_COMMS

Example DAG with which this was tested:

from datetime import datetime, timedelta
from airflow.exceptions import AirflowRescheduleException
from airflow.decorators import dag, task
from airflow.utils import timezone


@dag()
def reschedule_dag():
    @task
    def raise_reschedule_exception(**kwargs):
        raise AirflowRescheduleException(reschedule_date=timezone.datetime(2025, 1, 1))

    raise_reschedule_exception()


reschedule_dag()

^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@amoghrajesh amoghrajesh marked this pull request as ready for review December 13, 2024 10:58
@amoghrajesh amoghrajesh requested review from ashb and kaxil December 13, 2024 10:58
@amoghrajesh amoghrajesh added the area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK label Dec 13, 2024
@amoghrajesh amoghrajesh self-assigned this Dec 13, 2024
@amoghrajesh amoghrajesh changed the title AIP-72: Handling up_for_retry task instance states AIP-72: Handling up_for_reschedule task instance states Dec 13, 2024
@amoghrajesh amoghrajesh changed the title AIP-72: Handling up_for_reschedule task instance states AIP-72: Handling "up_for_reschedule" task instance states Dec 13, 2024
@amoghrajesh amoghrajesh requested a review from ashb December 16, 2024 12:45
@amoghrajesh amoghrajesh requested a review from kaxil December 17, 2024 07:39
@amoghrajesh amoghrajesh requested a review from XD-DENG as a code owner December 17, 2024 16:13
@amoghrajesh amoghrajesh requested a review from kaxil December 18, 2024 07:01
Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving (lgtm high-level), but please address the comments

@amoghrajesh
Copy link
Contributor Author

amoghrajesh commented Dec 18, 2024

@kaxil merging this one. I think I have handled the comments on this

@amoghrajesh amoghrajesh merged commit 084218b into apache:main Dec 18, 2024
50 checks passed
@amoghrajesh amoghrajesh deleted the AIP72-task-reschedule branch December 18, 2024 14:54
got686-yandex pushed a commit to got686-yandex/airflow that referenced this pull request Jan 30, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK

Development

Successfully merging this pull request may close these issues.

4 participants