Skip to content

Commit

Permalink
AIP84: Check standalone_dag_processor config in get_airflow_health() …
Browse files Browse the repository at this point in the history
…and update health endpoint (#44383)

* AIP84 health endpoint skip dag processor

* AIP84 health endpoint skip dag processor

* fixing test_airflow_health tests and adding new tests

* AIP84 health endpoint skip dag processor

* AIP84 health endpoint skip dag processor

* fixing test_airflow_health tests and adding new tests

* fix static tests

* Using only one model class HealthInfoSchema

* fixing static checks

* implement review comments

* fix static tests
  • Loading branch information
vatsrahul1001 authored Nov 29, 2024
1 parent b882246 commit c4d44e7
Show file tree
Hide file tree
Showing 9 changed files with 205 additions and 32 deletions.
40 changes: 23 additions & 17 deletions airflow/api/common/airflow_health.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from typing import Any

from airflow.configuration import conf
from airflow.jobs.dag_processor_job_runner import DagProcessorJobRunner
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
from airflow.jobs.triggerer_job_runner import TriggererJobRunner
Expand All @@ -28,13 +29,13 @@

def get_airflow_health() -> dict[str, Any]:
"""Get the health for Airflow metadatabase, scheduler and triggerer."""
dag_processor_enabled = conf.getboolean("scheduler", "standalone_dag_processor")
metadatabase_status = HEALTHY
latest_scheduler_heartbeat = None
latest_triggerer_heartbeat = None
latest_dag_processor_heartbeat = None

scheduler_status = UNHEALTHY
triggerer_status: str | None = UNHEALTHY
dag_processor_status: str | None = UNHEALTHY

try:
latest_scheduler_job = SchedulerJobRunner.most_recent_job()
Expand All @@ -58,18 +59,6 @@ def get_airflow_health() -> dict[str, Any]:
except Exception:
metadatabase_status = UNHEALTHY

try:
latest_dag_processor_job = DagProcessorJobRunner.most_recent_job()

if latest_dag_processor_job:
latest_dag_processor_heartbeat = latest_dag_processor_job.latest_heartbeat.isoformat()
if latest_dag_processor_job.is_alive():
dag_processor_status = HEALTHY
else:
dag_processor_status = None
except Exception:
metadatabase_status = UNHEALTHY

airflow_health_status = {
"metadatabase": {"status": metadatabase_status},
"scheduler": {
Expand All @@ -80,10 +69,27 @@ def get_airflow_health() -> dict[str, Any]:
"status": triggerer_status,
"latest_triggerer_heartbeat": latest_triggerer_heartbeat,
},
"dag_processor": {
}

if dag_processor_enabled:
latest_dag_processor_heartbeat = None
dag_processor_status: str | None = UNHEALTHY

try:
latest_dag_processor_job = DagProcessorJobRunner.most_recent_job()

if latest_dag_processor_job:
latest_dag_processor_heartbeat = latest_dag_processor_job.latest_heartbeat.isoformat()
if latest_dag_processor_job.is_alive():
dag_processor_status = HEALTHY
else:
dag_processor_status = None
except Exception:
metadatabase_status = UNHEALTHY

airflow_health_status["dag_processor"] = {
"status": dag_processor_status,
"latest_dag_processor_heartbeat": latest_dag_processor_heartbeat,
},
}
}

return airflow_health_status
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/datamodels/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,4 @@ class HealthInfoSchema(BaseModel):
metadatabase: BaseInfoSchema
scheduler: SchedulerInfoSchema
triggerer: TriggererInfoSchema
dag_processor: DagProcessorInfoSchema
dag_processor: DagProcessorInfoSchema | None = None
5 changes: 3 additions & 2 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7586,13 +7586,14 @@ components:
triggerer:
$ref: '#/components/schemas/TriggererInfoSchema'
dag_processor:
$ref: '#/components/schemas/DagProcessorInfoSchema'
anyOf:
- $ref: '#/components/schemas/DagProcessorInfoSchema'
- type: 'null'
type: object
required:
- metadatabase
- scheduler
- triggerer
- dag_processor
title: HealthInfoSchema
description: Schema for the Health endpoint.
HistoricalMetricDataResponse:
Expand Down
4 changes: 2 additions & 2 deletions airflow/api_fastapi/core_api/routes/public/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
monitor_router = AirflowRouter(tags=["Monitor"], prefix="/monitor")


@monitor_router.get("/health")
def get_health() -> HealthInfoSchema:
@monitor_router.get("/health", response_model=HealthInfoSchema, response_model_exclude_unset=True)
def get_health():
airflow_health_status = get_airflow_health()
return HealthInfoSchema.model_validate(airflow_health_status)
11 changes: 9 additions & 2 deletions airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2886,11 +2886,18 @@ export const $HealthInfoSchema = {
$ref: "#/components/schemas/TriggererInfoSchema",
},
dag_processor: {
$ref: "#/components/schemas/DagProcessorInfoSchema",
anyOf: [
{
$ref: "#/components/schemas/DagProcessorInfoSchema",
},
{
type: "null",
},
],
},
},
type: "object",
required: ["metadatabase", "scheduler", "triggerer", "dag_processor"],
required: ["metadatabase", "scheduler", "triggerer"],
title: "HealthInfoSchema",
description: "Schema for the Health endpoint.",
} as const;
Expand Down
2 changes: 1 addition & 1 deletion airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ export type HealthInfoSchema = {
metadatabase: BaseInfoSchema;
scheduler: SchedulerInfoSchema;
triggerer: TriggererInfoSchema;
dag_processor: DagProcessorInfoSchema;
dag_processor?: DagProcessorInfoSchema | null;
};

/**
Expand Down
5 changes: 2 additions & 3 deletions airflow/ui/src/pages/Dashboard/Health/Health.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,14 @@ export const Health = () => {
status={data?.triggerer.status}
title="Triggerer"
/>
{/* TODO: Update this to match the API when we move the config check to the API level */}
{data?.dag_processor.status === undefined ? undefined : (
{data?.dag_processor ? (
<HealthTag
isLoading={isLoading}
latestHeartbeat={data.dag_processor.latest_dag_processor_heartbeat}
status={data.dag_processor.status}
title="Dag Processor"
/>
)}
) : undefined}
</HStack>
</Box>
);
Expand Down
116 changes: 112 additions & 4 deletions tests/api/common/test_airflow_health.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,19 @@
get_airflow_health,
)

from tests_common.test_utils.config import conf_vars

pytestmark = pytest.mark.db_test


@patch("airflow.api.common.airflow_health.SchedulerJobRunner.most_recent_job", return_value=None)
@patch("airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job", return_value=None)
@patch("airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job", return_value=None)
@conf_vars({("scheduler", "standalone_dag_processor"): "True"})
def test_get_airflow_health_only_metadatabase_healthy(
latest_scheduler_job_mock, latest_triggerer_job_mock, latest_dag_processor_job_mock
latest_scheduler_job_mock,
latest_triggerer_job_mock,
latest_dag_processor_job_mock,
):
health_status = get_airflow_health()
expected_status = {
Expand All @@ -50,8 +55,11 @@ def test_get_airflow_health_only_metadatabase_healthy(
@patch("airflow.api.common.airflow_health.SchedulerJobRunner.most_recent_job", return_value=Exception)
@patch("airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job", return_value=Exception)
@patch("airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job", return_value=Exception)
@conf_vars({("scheduler", "standalone_dag_processor"): "True"})
def test_get_airflow_health_metadatabase_unhealthy(
latest_scheduler_job_mock, latest_triggerer_job_mock, latest_dag_processor_job_mock
latest_scheduler_job_mock,
latest_triggerer_job_mock,
latest_dag_processor_job_mock,
):
health_status = get_airflow_health()

Expand All @@ -65,6 +73,45 @@ def test_get_airflow_health_metadatabase_unhealthy(
assert health_status == expected_status


@patch("airflow.api.common.airflow_health.SchedulerJobRunner.most_recent_job", return_value=None)
@patch("airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job", return_value=None)
@patch("airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job", return_value=None)
@conf_vars({("scheduler", "standalone_dag_processor"): "False"})
def test_get_airflow_health_only_metadatabase_healthy_with_dag_processor_disabled(
latest_scheduler_job_mock,
latest_triggerer_job_mock,
latest_dag_processor_job_mock,
):
health_status = get_airflow_health()
expected_status = {
"metadatabase": {"status": HEALTHY},
"scheduler": {"status": UNHEALTHY, "latest_scheduler_heartbeat": None},
"triggerer": {"status": None, "latest_triggerer_heartbeat": None},
}

assert health_status == expected_status


@patch("airflow.api.common.airflow_health.SchedulerJobRunner.most_recent_job", return_value=Exception)
@patch("airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job", return_value=Exception)
@patch("airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job", return_value=Exception)
@conf_vars({("scheduler", "standalone_dag_processor"): "False"})
def test_get_airflow_health_metadatabase_unhealthy_with_dag_processor_disabled(
latest_scheduler_job_mock,
latest_triggerer_job_mock,
latest_dag_processor_job_mock,
):
health_status = get_airflow_health()

expected_status = {
"metadatabase": {"status": UNHEALTHY},
"scheduler": {"status": UNHEALTHY, "latest_scheduler_heartbeat": None},
"triggerer": {"status": UNHEALTHY, "latest_triggerer_heartbeat": None},
}

assert health_status == expected_status


LATEST_SCHEDULER_JOB_MOCK = MagicMock()
LATEST_SCHEDULER_JOB_MOCK.latest_heartbeat = datetime.now()
LATEST_SCHEDULER_JOB_MOCK.is_alive = MagicMock(return_value=True)
Expand All @@ -76,8 +123,11 @@ def test_get_airflow_health_metadatabase_unhealthy(
)
@patch("airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job", return_value=None)
@patch("airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job", return_value=None)
@conf_vars({("scheduler", "standalone_dag_processor"): "True"})
def test_get_airflow_health_scheduler_healthy_no_triggerer(
latest_scheduler_job_mock, latest_triggerer_job_mock, latest_dag_processor_job_mock
latest_scheduler_job_mock,
latest_triggerer_job_mock,
latest_dag_processor_job_mock,
):
health_status = get_airflow_health()

Expand All @@ -94,6 +144,32 @@ def test_get_airflow_health_scheduler_healthy_no_triggerer(
assert health_status == expected_status


@patch(
"airflow.api.common.airflow_health.SchedulerJobRunner.most_recent_job",
return_value=LATEST_SCHEDULER_JOB_MOCK,
)
@patch("airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job", return_value=None)
@patch("airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job", return_value=None)
@conf_vars({("scheduler", "standalone_dag_processor"): "False"})
def test_get_airflow_health_scheduler_healthy_no_triggerer__with_dag_processor_disabled(
latest_scheduler_job_mock,
latest_triggerer_job_mock,
latest_dag_processor_job_mock,
):
health_status = get_airflow_health()

expected_status = {
"metadatabase": {"status": HEALTHY},
"scheduler": {
"status": HEALTHY,
"latest_scheduler_heartbeat": LATEST_SCHEDULER_JOB_MOCK.latest_heartbeat.isoformat(),
},
"triggerer": {"status": None, "latest_triggerer_heartbeat": None},
}

assert health_status == expected_status


LATEST_TRIGGERER_JOB_MOCK = MagicMock()
LATEST_TRIGGERER_JOB_MOCK.latest_heartbeat = datetime.now()
LATEST_TRIGGERER_JOB_MOCK.is_alive = MagicMock(return_value=True)
Expand All @@ -112,8 +188,11 @@ def test_get_airflow_health_scheduler_healthy_no_triggerer(
"airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job",
return_value=LATEST_DAG_PROCESSOR_JOB_MOCK,
)
@conf_vars({("scheduler", "standalone_dag_processor"): "True"})
def test_get_airflow_health_triggerer_healthy_no_scheduler_job_record(
latest_scheduler_job_mock, latest_triggerer_job_mock, latest_dag_processor_job_mock
latest_scheduler_job_mock,
latest_triggerer_job_mock,
latest_dag_processor_job_mock,
):
health_status = get_airflow_health()

Expand All @@ -131,3 +210,32 @@ def test_get_airflow_health_triggerer_healthy_no_scheduler_job_record(
}

assert health_status == expected_status


@patch("airflow.api.common.airflow_health.SchedulerJobRunner.most_recent_job", return_value=None)
@patch(
"airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job",
return_value=LATEST_TRIGGERER_JOB_MOCK,
)
@patch(
"airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job",
return_value=LATEST_DAG_PROCESSOR_JOB_MOCK,
)
@conf_vars({("scheduler", "standalone_dag_processor"): "False"})
def test_get_airflow_health_triggerer_healthy_no_scheduler_job_record_with_dag_processor_disabled(
latest_scheduler_job_mock,
latest_triggerer_job_mock,
latest_dag_processor_job_mock,
):
health_status = get_airflow_health()

expected_status = {
"metadatabase": {"status": HEALTHY},
"scheduler": {"status": UNHEALTHY, "latest_scheduler_heartbeat": None},
"triggerer": {
"status": HEALTHY,
"latest_triggerer_heartbeat": LATEST_TRIGGERER_JOB_MOCK.latest_heartbeat.isoformat(),
},
}

assert health_status == expected_status
52 changes: 52 additions & 0 deletions tests/api_fastapi/core_api/routes/public/test_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,55 @@ def test_unhealthy_metadatabase_status(self, most_recent_job_mock, test_client):

assert body["metadatabase"]["status"] == "unhealthy"
assert body["scheduler"]["latest_scheduler_heartbeat"] is None

@mock.patch("airflow.api_fastapi.core_api.routes.public.monitor.get_airflow_health")
def test_health_with_dag_processor(self, mock_get_airflow_health, test_client):
mock_get_airflow_health.return_value = {
"metadatabase": {"status": HEALTHY},
"scheduler": {
"status": HEALTHY,
"latest_scheduler_heartbeat": "2024-11-23T11:09:16.663124+00:00",
},
"triggerer": {
"status": HEALTHY,
"latest_triggerer_heartbeat": "2024-11-23T11:09:15.815483+00:00",
},
"dag_processor": {
"status": HEALTHY,
"latest_dag_processor_heartbeat": "2024-11-23T11:09:15.815483+00:00",
},
}

response = test_client.get("/public/monitor/health")

assert response.status_code == 200
body = response.json()

assert "dag_processor" in body
assert body["metadatabase"]["status"] == HEALTHY
assert body["scheduler"]["status"] == HEALTHY
assert body["triggerer"]["status"] == HEALTHY

@mock.patch("airflow.api_fastapi.core_api.routes.public.monitor.get_airflow_health")
def test_health_without_dag_processor(self, mock_get_airflow_health, test_client):
mock_get_airflow_health.return_value = {
"metadatabase": {"status": HEALTHY},
"scheduler": {
"status": HEALTHY,
"latest_scheduler_heartbeat": "2024-11-23T11:09:16.663124+00:00",
},
"triggerer": {
"status": HEALTHY,
"latest_triggerer_heartbeat": "2024-11-23T11:09:15.815483+00:00",
},
}

response = test_client.get("/public/monitor/health")

assert response.status_code == 200
body = response.json()

assert "dag_processor" not in body
assert body["metadatabase"]["status"] == HEALTHY
assert body["scheduler"]["status"] == HEALTHY
assert body["triggerer"]["status"] == HEALTHY

0 comments on commit c4d44e7

Please sign in to comment.