Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/jobs/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def heartbeat(
session.merge(self)
previous_heartbeat = self.latest_heartbeat

if self.state in (JobState.SHUTDOWN, JobState.RESTARTING):
if self.state == JobState.RESTARTING:
# TODO: Make sure it is AIP-44 compliant
self.kill()

Expand Down
1 change: 0 additions & 1 deletion airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@
"restarting": "violet",
"running": "lime",
"scheduled": "tan",
"shutdown": "blue",
"skipped": "hotpink",
"success": "green",
"up_for_reschedule": "turquoise",
Expand Down
10 changes: 0 additions & 10 deletions airflow/utils/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ class JobState(str, Enum):

RUNNING = "running"
SUCCESS = "success"
SHUTDOWN = "shutdown"
RESTARTING = "restarting"
FAILED = "failed"

Expand All @@ -51,7 +50,6 @@ class TaskInstanceState(str, Enum):
QUEUED = "queued" # Executor has enqueued the task
RUNNING = "running" # Task is executing
SUCCESS = "success" # Task completed
SHUTDOWN = "shutdown" # External request to shut down (e.g. marked failed when running)
RESTARTING = "restarting" # External request to restart (e.g. cleared when running)
FAILED = "failed" # Task errored out
UP_FOR_RETRY = "up_for_retry" # Task failed but has retries left
Expand Down Expand Up @@ -95,7 +93,6 @@ class State:
REMOVED = TaskInstanceState.REMOVED
SCHEDULED = TaskInstanceState.SCHEDULED
QUEUED = TaskInstanceState.QUEUED
SHUTDOWN = TaskInstanceState.SHUTDOWN
RESTARTING = TaskInstanceState.RESTARTING
UP_FOR_RETRY = TaskInstanceState.UP_FOR_RETRY
UP_FOR_RESCHEDULE = TaskInstanceState.UP_FOR_RESCHEDULE
Expand All @@ -120,7 +117,6 @@ class State:
TaskInstanceState.QUEUED: "gray",
TaskInstanceState.RUNNING: "lime",
TaskInstanceState.SUCCESS: "green",
TaskInstanceState.SHUTDOWN: "blue",
TaskInstanceState.RESTARTING: "violet",
TaskInstanceState.FAILED: "red",
TaskInstanceState.UP_FOR_RETRY: "gold",
Expand Down Expand Up @@ -169,7 +165,6 @@ def color_fg(cls, state):
TaskInstanceState.SCHEDULED,
TaskInstanceState.QUEUED,
TaskInstanceState.RUNNING,
TaskInstanceState.SHUTDOWN,
TaskInstanceState.RESTARTING,
TaskInstanceState.UP_FOR_RETRY,
TaskInstanceState.UP_FOR_RESCHEDULE,
Expand All @@ -195,11 +190,6 @@ def color_fg(cls, state):
A list of states indicating that a task or dag is a success state.
"""

terminating_states = frozenset([TaskInstanceState.SHUTDOWN, TaskInstanceState.RESTARTING])
"""
A list of states indicating that a task has been terminated.
"""

adoptable_states = frozenset(
[TaskInstanceState.QUEUED, TaskInstanceState.RUNNING, TaskInstanceState.RESTARTING]
)
Expand Down
1 change: 0 additions & 1 deletion airflow/www/jest-setup.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ global.stateColors = {
restarting: "violet",
running: "lime",
scheduled: "tan",
shutdown: "blue",
skipped: "hotpink",
success: "green",
up_for_reschedule: "turquoise",
Expand Down
1 change: 0 additions & 1 deletion airflow/www/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ def get_try_count(try_number: int, state: State):
TaskInstanceState.SCHEDULED,
TaskInstanceState.DEFERRED,
TaskInstanceState.RUNNING,
TaskInstanceState.SHUTDOWN,
TaskInstanceState.RESTARTING,
None,
TaskInstanceState.SUCCESS,
Expand Down
Binary file modified docs/apache-airflow/img/task_lifecycle_diagram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 2 additions & 2 deletions tests/cli/commands/test_jobs_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from airflow.jobs.job import Job
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
from airflow.utils.session import create_session
from airflow.utils.state import State
from airflow.utils.state import JobState, State
from tests.test_utils.db import clear_db_jobs


Expand Down Expand Up @@ -109,7 +109,7 @@ def test_should_ignore_not_running_jobs(self):
for _ in range(3):
scheduler_job = Job()
job_runner = SchedulerJobRunner(job=scheduler_job)
scheduler_job.state = State.SHUTDOWN
scheduler_job.state = JobState.FAILED
session.add(scheduler_job)
scheduler_jobs.append(scheduler_job)
job_runners.append(job_runner)
Expand Down
8 changes: 4 additions & 4 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
from airflow.utils import timezone
from airflow.utils.file import list_py_file_paths
from airflow.utils.session import create_session, provide_session
from airflow.utils.state import DagRunState, State, TaskInstanceState
from airflow.utils.state import DagRunState, JobState, State, TaskInstanceState
from airflow.utils.types import DagRunType
from tests.listeners import dag_listener
from tests.listeners.test_listeners import get_listener_manager
Expand Down Expand Up @@ -4559,7 +4559,7 @@ def test_find_zombies(self, load_examples):

local_job = Job(dag_id=ti.dag_id)
LocalTaskJobRunner(job=local_job, task_instance=ti)
local_job.state = State.SHUTDOWN
local_job.state = TaskInstanceState.FAILED

session.add(local_job)
session.flush()
Expand Down Expand Up @@ -4622,7 +4622,7 @@ def test_zombie_message(self, load_examples):
ti.queued_by_job_id = 999

local_job = Job(dag_id=ti.dag_id)
local_job.state = State.SHUTDOWN
local_job.state = TaskInstanceState.FAILED

session.add(local_job)
session.flush()
Expand Down Expand Up @@ -4683,7 +4683,7 @@ def test_find_zombies_handle_failure_callbacks_are_correctly_passed_to_dag_proce

local_job = Job(dag_id=ti.dag_id)
LocalTaskJobRunner(job=local_job, task_instance=ti)
local_job.state = State.SHUTDOWN
local_job.state = JobState.FAILED
session.add(local_job)
session.flush()

Expand Down
6 changes: 3 additions & 3 deletions tests/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,8 +336,8 @@ def test_dagrun_deadlock(self, session):
dr.update_state(session=session)
assert dr.state == DagRunState.FAILED

def test_dagrun_no_deadlock_with_shutdown(self, session):
dag = DAG("test_dagrun_no_deadlock_with_shutdown", start_date=DEFAULT_DATE)
def test_dagrun_no_deadlock_with_restarting(self, session):
dag = DAG("test_dagrun_no_deadlock_with_restarting", start_date=DEFAULT_DATE)
with dag:
op1 = EmptyOperator(task_id="upstream_task")
op2 = EmptyOperator(task_id="downstream_task")
Expand All @@ -351,7 +351,7 @@ def test_dagrun_no_deadlock_with_shutdown(self, session):
start_date=DEFAULT_DATE,
)
upstream_ti = dr.get_task_instance(task_id="upstream_task")
upstream_ti.set_state(TaskInstanceState.SHUTDOWN, session=session)
upstream_ti.set_state(TaskInstanceState.RESTARTING, session=session)

dr.update_state()
assert dr.state == DagRunState.RUNNING
Expand Down
2 changes: 0 additions & 2 deletions tests/www/views/test_views_cluster_activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ def test_historical_metrics_data(admin_client, session, time_machine):
"restarting": 0,
"running": 0,
"scheduled": 0,
"shutdown": 0,
"skipped": 0,
"success": 2,
"up_for_reschedule": 0,
Expand Down Expand Up @@ -144,7 +143,6 @@ def test_historical_metrics_data_date_filters(admin_client, session):
"restarting": 0,
"running": 0,
"scheduled": 0,
"shutdown": 0,
"skipped": 0,
"success": 0,
"up_for_reschedule": 0,
Expand Down
2 changes: 1 addition & 1 deletion tests/www/views/test_views_home.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def test_home(capture_templates, admin_client):
'"null": "lightblue", "queued": "gray", '
'"removed": "lightgrey", "restarting": "violet", "running": "lime", '
'"scheduled": "tan", '
'"shutdown": "blue", "skipped": "hotpink", '
'"skipped": "hotpink", '
'"success": "green", "up_for_reschedule": "turquoise", '
'"up_for_retry": "gold", "upstream_failed": "orange"};'
)
Expand Down