Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/docs/img/diagram_task_lifecycle.md5sum
Original file line number Diff line number Diff line change
@@ -1 +1 @@
081791ede03f02f9a4f0284eb1a85974
f129c61777c2b56652b2c68d36c309ee
Binary file modified airflow-core/docs/img/diagram_task_lifecycle.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion airflow-core/docs/img/diagram_task_lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def generate_task_lifecycle_diagram():
cond_fail_mark >> Edge(label="NO") >> cond_clear_mark
cond_fail_mark >> Edge(label="YES") >> state_failed
cond_clear_mark >> Edge(label="NO") >> cond_task_error
cond_clear_mark >> Edge(label="YES") >> state_restarting >> state_up_for_retry
cond_clear_mark >> Edge(label="YES") >> state_restarting >> state_none
cond_task_error >> Edge(label="NO") >> cond_task_complete_2
cond_task_error >> Edge(label="YES") >> cond_retriable
cond_retriable >> Edge(label="NO") >> state_failed
Expand Down
13 changes: 13 additions & 0 deletions airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,7 @@ def process_executor_events(
TaskInstanceState.SUCCESS,
TaskInstanceState.QUEUED,
TaskInstanceState.RUNNING,
TaskInstanceState.RESTARTING,
):
tis_with_right_state.append(ti_key)

Expand Down Expand Up @@ -859,6 +860,7 @@ def process_executor_events(
TaskInstanceState.SCHEDULED,
TaskInstanceState.QUEUED,
TaskInstanceState.RUNNING,
TaskInstanceState.RESTARTING,
)
ti_requeued = (
ti.queued_by_job_id != job_id # Another scheduler has queued this task again
Expand Down Expand Up @@ -916,6 +918,17 @@ def process_executor_events(
),
)
executor.send_callback(request)

# Handle cleared tasks that were successfully terminated by executor
if ti.state == TaskInstanceState.RESTARTING and state == TaskInstanceState.SUCCESS:
cls.logger().info(
"Task %s was cleared and successfully terminated. Setting to scheduled for retry.", ti
)
# Adjust max_tries to allow retry beyond normal limits (like clearing does)
ti.max_tries = ti.try_number + ti.task.retries
ti.set_state(None)
continue

ti.handle_failure(error=msg, session=session)

return len(event_buffer)
Expand Down
67 changes: 67 additions & 0 deletions airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,73 @@ def test_process_executor_events(self, mock_stats_incr, mock_task_callback, dag_
any_order=True,
)

@mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest", spec=TaskCallbackRequest)
@mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr")
def test_process_executor_events_restarting_cleared_task(
self, mock_stats_incr, mock_task_callback, dag_maker
):
"""
Test processing of RESTARTING task instances by scheduler's _process_executor_events.

Simulates the complete flow when a running task is cleared:
1. Task is RUNNING and has exhausted retries (try_number > max_tries)
2. User clears the task → state becomes RESTARTING
3. Executor successfully terminates the task → reports SUCCESS
4. Scheduler processes the event and sets task to None (scheduled)
5. max_tries is adjusted to allow retry beyond normal limits

This test prevents regression of issue #55045 where RESTARTING tasks
would get stuck due to scheduler not processing executor events.
"""
dag_id = "test_restarting_max_tries"
task_id = "test_task"

session = settings.Session()
with dag_maker(dag_id=dag_id, fileloc="/test_path1/", max_active_runs=1):
task1 = EmptyOperator(task_id=task_id, retries=2)
ti1 = dag_maker.create_dagrun().get_task_instance(task1.task_id)

# Set up exhausted task scenario: try_number > max_tries
ti1.state = TaskInstanceState.RESTARTING # Simulates cleared running task
ti1.try_number = 4 # Already tried 4 times
ti1.max_tries = 3 # Originally only allowed 3 tries
session.merge(ti1)
session.commit()

# Verify task is in RESTARTING state and eligible for retry
assert ti1.state == TaskInstanceState.RESTARTING
assert ti1.is_eligible_to_retry() is True, "RESTARTING should bypass max_tries"

# Set up scheduler and executor
executor = MockExecutor(do_update=False)
task_callback = mock.MagicMock(spec=TaskCallbackRequest)
mock_task_callback.return_value = task_callback
scheduler_job = Job(executor=executor)
job_runner = SchedulerJobRunner(scheduler_job)

# Simulate executor reporting task completion (this triggers the bug scenario)
executor.event_buffer[ti1.key] = State.SUCCESS, None

# Process the executor event
job_runner._process_executor_events(executor=executor, session=session)
ti1.refresh_from_db(session=session)

assert ti1.state is None, "Task should be set to None (scheduled) state after RESTARTING processing"

# Verify max_tries was adjusted to allow retry
expected_max_tries = 4 + 2
assert ti1.max_tries == expected_max_tries, (
f"max_tries should be adjusted to {expected_max_tries}, got {ti1.max_tries}"
)

# Verify task is now eligible for retry despite being previously exhausted
assert ti1.is_eligible_to_retry() is True, (
"Task should be eligible for retry after max_tries adjustment"
)

# Verify try_number wasn't changed (scheduler doesn't increment it here)
assert ti1.try_number == 4, "try_number should remain unchanged"

@mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest")
@mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr")
def test_process_executor_events_with_no_callback(self, mock_stats_incr, mock_task_callback, dag_maker):
Expand Down