Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP84: Check standalone_dag_processor config in get_airflow_health() and update health endpoint #44383

Merged
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
57c411c
AIP84 health endpoint skip dag processor
vatsrahul1001 Nov 26, 2024
b2ed185
AIP84 health endpoint skip dag processor
vatsrahul1001 Nov 26, 2024
54ac2c3
fixing test_airflow_health tests and adding new tests
vatsrahul1001 Nov 26, 2024
c70bc8a
AIP84 health endpoint skip dag processor
vatsrahul1001 Nov 26, 2024
14fd129
AIP84 health endpoint skip dag processor
vatsrahul1001 Nov 26, 2024
4932f69
fixing test_airflow_health tests and adding new tests
vatsrahul1001 Nov 26, 2024
ecf8713
Merge branch 'AIP84-health-endpoint-skip-dag-processor' of github.com…
vatsrahul1001 Nov 26, 2024
e59cf5f
fix static tests
vatsrahul1001 Nov 26, 2024
0c6dace
Merge branch 'main' into AIP84-health-endpoint-skip-dag-processor
vatsrahul1001 Nov 26, 2024
7b2903b
merge from main
vatsrahul1001 Nov 27, 2024
46ad731
Merge branch 'main' into AIP84-health-endpoint-skip-dag-processor
vatsrahul1001 Nov 27, 2024
c91b356
Using only one model class HealthInfoSchema
vatsrahul1001 Nov 27, 2024
2b84b85
Merge branch 'AIP84-health-endpoint-skip-dag-processor' of github.com…
vatsrahul1001 Nov 27, 2024
d742cdb
Merge branch 'main' into AIP84-health-endpoint-skip-dag-processor
vatsrahul1001 Nov 27, 2024
e269147
fixing static checks
vatsrahul1001 Nov 27, 2024
08f4a67
Merge branch 'AIP84-health-endpoint-skip-dag-processor' of github.com…
vatsrahul1001 Nov 27, 2024
db9d816
Merge branch 'main' into AIP84-health-endpoint-skip-dag-processor
vatsrahul1001 Nov 27, 2024
c50f2ca
implement review comments
vatsrahul1001 Nov 28, 2024
5839e06
Merge branch 'main' into AIP84-health-endpoint-skip-dag-processor
vatsrahul1001 Nov 28, 2024
47d3013
fix static tests
vatsrahul1001 Nov 28, 2024
3d27d9e
Merge branch 'AIP84-health-endpoint-skip-dag-processor' of github.com…
vatsrahul1001 Nov 28, 2024
c144062
Merge branch 'main' into AIP84-health-endpoint-skip-dag-processor
vatsrahul1001 Nov 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 23 additions & 17 deletions airflow/api/common/airflow_health.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from typing import Any

from airflow.configuration import conf
from airflow.jobs.dag_processor_job_runner import DagProcessorJobRunner
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
from airflow.jobs.triggerer_job_runner import TriggererJobRunner
Expand All @@ -28,13 +29,13 @@

def get_airflow_health() -> dict[str, Any]:
"""Get the health for Airflow metadatabase, scheduler and triggerer."""
dag_processor_enabled = conf.getboolean("scheduler", "standalone_dag_processor")
metadatabase_status = HEALTHY
latest_scheduler_heartbeat = None
latest_triggerer_heartbeat = None
latest_dag_processor_heartbeat = None

scheduler_status = UNHEALTHY
triggerer_status: str | None = UNHEALTHY
dag_processor_status: str | None = UNHEALTHY

try:
latest_scheduler_job = SchedulerJobRunner.most_recent_job()
Expand All @@ -58,18 +59,6 @@ def get_airflow_health() -> dict[str, Any]:
except Exception:
metadatabase_status = UNHEALTHY

try:
latest_dag_processor_job = DagProcessorJobRunner.most_recent_job()

if latest_dag_processor_job:
latest_dag_processor_heartbeat = latest_dag_processor_job.latest_heartbeat.isoformat()
if latest_dag_processor_job.is_alive():
dag_processor_status = HEALTHY
else:
dag_processor_status = None
except Exception:
metadatabase_status = UNHEALTHY

airflow_health_status = {
"metadatabase": {"status": metadatabase_status},
"scheduler": {
Expand All @@ -80,10 +69,27 @@ def get_airflow_health() -> dict[str, Any]:
"status": triggerer_status,
"latest_triggerer_heartbeat": latest_triggerer_heartbeat,
},
"dag_processor": {
}

if dag_processor_enabled:
latest_dag_processor_heartbeat = None
dag_processor_status: str | None = UNHEALTHY

try:
latest_dag_processor_job = DagProcessorJobRunner.most_recent_job()

if latest_dag_processor_job:
latest_dag_processor_heartbeat = latest_dag_processor_job.latest_heartbeat.isoformat()
if latest_dag_processor_job.is_alive():
dag_processor_status = HEALTHY
else:
dag_processor_status = None
except Exception:
metadatabase_status = UNHEALTHY

airflow_health_status["dag_processor"] = {
"status": dag_processor_status,
"latest_dag_processor_heartbeat": latest_dag_processor_heartbeat,
},
}
}

return airflow_health_status
8 changes: 7 additions & 1 deletion airflow/api_fastapi/core_api/datamodels/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
# under the License.
from __future__ import annotations

from pydantic import model_serializer

from airflow.api_fastapi.core_api.base import BaseModel


Expand Down Expand Up @@ -49,4 +51,8 @@ class HealthInfoSchema(BaseModel):
metadatabase: BaseInfoSchema
scheduler: SchedulerInfoSchema
triggerer: TriggererInfoSchema
dag_processor: DagProcessorInfoSchema
dag_processor: DagProcessorInfoSchema | None = None
pierrejeambrun marked this conversation as resolved.
Show resolved Hide resolved

@model_serializer()
def serialize(self):
return {k: v for k, v in self if not (k == "dag_processor" and v is None)}
vatsrahul1001 marked this conversation as resolved.
Show resolved Hide resolved
5 changes: 3 additions & 2 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7437,13 +7437,14 @@ components:
triggerer:
$ref: '#/components/schemas/TriggererInfoSchema'
dag_processor:
$ref: '#/components/schemas/DagProcessorInfoSchema'
anyOf:
- $ref: '#/components/schemas/DagProcessorInfoSchema'
- type: 'null'
type: object
required:
- metadatabase
- scheduler
- triggerer
- dag_processor
title: HealthInfoSchema
description: Schema for the Health endpoint.
HistoricalMetricDataResponse:
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/routes/public/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@
@monitor_router.get("/health")
def get_health() -> HealthInfoSchema:
airflow_health_status = get_airflow_health()
return HealthInfoSchema.model_validate(airflow_health_status)
return HealthInfoSchema.model_validate(airflow_health_status, from_attributes=True)
pierrejeambrun marked this conversation as resolved.
Show resolved Hide resolved
11 changes: 9 additions & 2 deletions airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2823,11 +2823,18 @@ export const $HealthInfoSchema = {
$ref: "#/components/schemas/TriggererInfoSchema",
},
dag_processor: {
$ref: "#/components/schemas/DagProcessorInfoSchema",
anyOf: [
{
$ref: "#/components/schemas/DagProcessorInfoSchema",
},
{
type: "null",
},
],
pierrejeambrun marked this conversation as resolved.
Show resolved Hide resolved
},
},
type: "object",
required: ["metadatabase", "scheduler", "triggerer", "dag_processor"],
required: ["metadatabase", "scheduler", "triggerer"],
title: "HealthInfoSchema",
description: "Schema for the Health endpoint.",
} as const;
Expand Down
2 changes: 1 addition & 1 deletion airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ export type HealthInfoSchema = {
metadatabase: BaseInfoSchema;
scheduler: SchedulerInfoSchema;
triggerer: TriggererInfoSchema;
dag_processor: DagProcessorInfoSchema;
dag_processor?: DagProcessorInfoSchema | null;
pierrejeambrun marked this conversation as resolved.
Show resolved Hide resolved
};

/**
Expand Down
9 changes: 4 additions & 5 deletions airflow/ui/src/pages/Dashboard/Health/Health.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,14 @@ export const Health = () => {
status={data?.triggerer.status}
title="Triggerer"
/>
{/* TODO: Update this to match the API when we move the config check to the API level */}
{data?.dag_processor.status === undefined ? undefined : (
{data && "dag_processor" in data ? (
pierrejeambrun marked this conversation as resolved.
Show resolved Hide resolved
<HealthTag
isLoading={isLoading}
latestHeartbeat={data.dag_processor.latest_dag_processor_heartbeat}
status={data.dag_processor.status}
latestHeartbeat={data.dag_processor?.latest_dag_processor_heartbeat}
status={data.dag_processor?.status}
pierrejeambrun marked this conversation as resolved.
Show resolved Hide resolved
title="Dag Processor"
/>
)}
) : undefined}
pierrejeambrun marked this conversation as resolved.
Show resolved Hide resolved
</HStack>
</Box>
);
Expand Down
122 changes: 118 additions & 4 deletions tests/api/common/test_airflow_health.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@
@patch("airflow.api.common.airflow_health.SchedulerJobRunner.most_recent_job", return_value=None)
@patch("airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job", return_value=None)
@patch("airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job", return_value=None)
@patch("airflow.api.common.airflow_health.conf.getboolean", return_value=True)
pierrejeambrun marked this conversation as resolved.
Show resolved Hide resolved
def test_get_airflow_health_only_metadatabase_healthy(
latest_scheduler_job_mock, latest_triggerer_job_mock, latest_dag_processor_job_mock
latest_scheduler_job_mock,
latest_triggerer_job_mock,
latest_dag_processor_job_mock,
dag_processor_enabled_mock,
):
health_status = get_airflow_health()
expected_status = {
Expand All @@ -50,8 +54,12 @@ def test_get_airflow_health_only_metadatabase_healthy(
@patch("airflow.api.common.airflow_health.SchedulerJobRunner.most_recent_job", return_value=Exception)
@patch("airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job", return_value=Exception)
@patch("airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job", return_value=Exception)
@patch("airflow.api.common.airflow_health.conf.getboolean", return_value=True)
def test_get_airflow_health_metadatabase_unhealthy(
latest_scheduler_job_mock, latest_triggerer_job_mock, latest_dag_processor_job_mock
latest_scheduler_job_mock,
latest_triggerer_job_mock,
latest_dag_processor_job_mock,
dag_processor_enabled_mock,
):
health_status = get_airflow_health()

Expand All @@ -65,6 +73,47 @@ def test_get_airflow_health_metadatabase_unhealthy(
assert health_status == expected_status


@patch("airflow.api.common.airflow_health.SchedulerJobRunner.most_recent_job", return_value=None)
@patch("airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job", return_value=None)
@patch("airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job", return_value=None)
@patch("airflow.api.common.airflow_health.conf.getboolean", return_value=False)
def test_get_airflow_health_only_metadatabase_healthy_with_dag_processor_disabled(
latest_scheduler_job_mock,
latest_triggerer_job_mock,
latest_dag_processor_job_mock,
dag_processor_enabled_mock,
):
health_status = get_airflow_health()
expected_status = {
"metadatabase": {"status": HEALTHY},
"scheduler": {"status": UNHEALTHY, "latest_scheduler_heartbeat": None},
"triggerer": {"status": None, "latest_triggerer_heartbeat": None},
}

assert health_status == expected_status


@patch("airflow.api.common.airflow_health.SchedulerJobRunner.most_recent_job", return_value=Exception)
@patch("airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job", return_value=Exception)
@patch("airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job", return_value=Exception)
@patch("airflow.api.common.airflow_health.conf.getboolean", return_value=False)
def test_get_airflow_health_metadatabase_unhealthy_with_dag_processor_disabled(
latest_scheduler_job_mock,
latest_triggerer_job_mock,
latest_dag_processor_job_mock,
dag_processor_enabled_mock,
):
health_status = get_airflow_health()

expected_status = {
"metadatabase": {"status": UNHEALTHY},
"scheduler": {"status": UNHEALTHY, "latest_scheduler_heartbeat": None},
"triggerer": {"status": UNHEALTHY, "latest_triggerer_heartbeat": None},
}

assert health_status == expected_status


LATEST_SCHEDULER_JOB_MOCK = MagicMock()
LATEST_SCHEDULER_JOB_MOCK.latest_heartbeat = datetime.now()
LATEST_SCHEDULER_JOB_MOCK.is_alive = MagicMock(return_value=True)
Expand All @@ -76,8 +125,12 @@ def test_get_airflow_health_metadatabase_unhealthy(
)
@patch("airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job", return_value=None)
@patch("airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job", return_value=None)
@patch("airflow.api.common.airflow_health.conf.getboolean", return_value=True)
def test_get_airflow_health_scheduler_healthy_no_triggerer(
latest_scheduler_job_mock, latest_triggerer_job_mock, latest_dag_processor_job_mock
latest_scheduler_job_mock,
latest_triggerer_job_mock,
latest_dag_processor_job_mock,
dag_processor_enabled_mock,
):
health_status = get_airflow_health()

Expand All @@ -94,6 +147,33 @@ def test_get_airflow_health_scheduler_healthy_no_triggerer(
assert health_status == expected_status


@patch(
"airflow.api.common.airflow_health.SchedulerJobRunner.most_recent_job",
return_value=LATEST_SCHEDULER_JOB_MOCK,
)
@patch("airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job", return_value=None)
@patch("airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job", return_value=None)
@patch("airflow.api.common.airflow_health.conf.getboolean", return_value=False)
def test_get_airflow_health_scheduler_healthy_no_triggerer__with_dag_processor_disabled(
latest_scheduler_job_mock,
latest_triggerer_job_mock,
latest_dag_processor_job_mock,
dag_processor_enabled_mock,
):
health_status = get_airflow_health()

expected_status = {
"metadatabase": {"status": HEALTHY},
"scheduler": {
"status": HEALTHY,
"latest_scheduler_heartbeat": LATEST_SCHEDULER_JOB_MOCK.latest_heartbeat.isoformat(),
},
"triggerer": {"status": None, "latest_triggerer_heartbeat": None},
}

assert health_status == expected_status


LATEST_TRIGGERER_JOB_MOCK = MagicMock()
LATEST_TRIGGERER_JOB_MOCK.latest_heartbeat = datetime.now()
LATEST_TRIGGERER_JOB_MOCK.is_alive = MagicMock(return_value=True)
Expand All @@ -112,8 +192,12 @@ def test_get_airflow_health_scheduler_healthy_no_triggerer(
"airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job",
return_value=LATEST_DAG_PROCESSOR_JOB_MOCK,
)
@patch("airflow.api.common.airflow_health.conf.getboolean", return_value=True)
def test_get_airflow_health_triggerer_healthy_no_scheduler_job_record(
latest_scheduler_job_mock, latest_triggerer_job_mock, latest_dag_processor_job_mock
latest_scheduler_job_mock,
latest_triggerer_job_mock,
latest_dag_processor_job_mock,
dag_processor_enabled_mock,
):
health_status = get_airflow_health()

Expand All @@ -131,3 +215,33 @@ def test_get_airflow_health_triggerer_healthy_no_scheduler_job_record(
}

assert health_status == expected_status


@patch("airflow.api.common.airflow_health.SchedulerJobRunner.most_recent_job", return_value=None)
@patch(
"airflow.api.common.airflow_health.TriggererJobRunner.most_recent_job",
return_value=LATEST_TRIGGERER_JOB_MOCK,
)
@patch(
"airflow.api.common.airflow_health.DagProcessorJobRunner.most_recent_job",
return_value=LATEST_DAG_PROCESSOR_JOB_MOCK,
)
@patch("airflow.api.common.airflow_health.conf.getboolean", return_value=False)
def test_get_airflow_health_triggerer_healthy_no_scheduler_job_record_with_dag_processor_disabled(
latest_scheduler_job_mock,
latest_triggerer_job_mock,
latest_dag_processor_job_mock,
dag_processor_enabled_mock,
):
health_status = get_airflow_health()

expected_status = {
"metadatabase": {"status": HEALTHY},
"scheduler": {"status": UNHEALTHY, "latest_scheduler_heartbeat": None},
"triggerer": {
"status": HEALTHY,
"latest_triggerer_heartbeat": LATEST_TRIGGERER_JOB_MOCK.latest_heartbeat.isoformat(),
},
}

assert health_status == expected_status
54 changes: 54 additions & 0 deletions tests/api_fastapi/core_api/routes/public/test_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,57 @@ def test_unhealthy_metadatabase_status(self, most_recent_job_mock, test_client):

assert "unhealthy" == body["metadatabase"]["status"]
assert body["scheduler"]["latest_scheduler_heartbeat"] is None


class TestGetHealthDagProcessorEnabled(TestMonitorEndpoint):
pierrejeambrun marked this conversation as resolved.
Show resolved Hide resolved
@mock.patch("airflow.api_fastapi.core_api.routes.public.monitor.get_airflow_health")
def test_health_with_dag_processor(self, mock_get_airflow_health, test_client):
mock_get_airflow_health.return_value = {
"metadatabase": {"status": HEALTHY},
"scheduler": {
"status": HEALTHY,
"latest_scheduler_heartbeat": "2024-11-23T11:09:16.663124+00:00",
},
"triggerer": {
"status": HEALTHY,
"latest_triggerer_heartbeat": "2024-11-23T11:09:15.815483+00:00",
},
"dag_processor": {
"status": HEALTHY,
"latest_dag_processor_heartbeat": "2024-11-23T11:09:15.815483+00:00",
},
}

response = test_client.get("/public/monitor/health")

assert response.status_code == 200
body = response.json()

assert "dag_processor" in body
assert body["metadatabase"]["status"] == HEALTHY
assert body["scheduler"]["status"] == HEALTHY
assert body["triggerer"]["status"] == HEALTHY

@mock.patch("airflow.api_fastapi.core_api.routes.public.monitor.get_airflow_health")
def test_health_without_dag_processor(self, mock_get_airflow_health, test_client):
mock_get_airflow_health.return_value = {
"metadatabase": {"status": HEALTHY},
"scheduler": {
"status": HEALTHY,
"latest_scheduler_heartbeat": "2024-11-23T11:09:16.663124+00:00",
},
"triggerer": {
"status": HEALTHY,
"latest_triggerer_heartbeat": "2024-11-23T11:09:15.815483+00:00",
},
}

response = test_client.get("/public/monitor/health")

assert response.status_code == 200
body = response.json()

assert "dag_processor" not in body
assert body["metadatabase"]["status"] == HEALTHY
assert body["scheduler"]["status"] == HEALTHY
assert body["triggerer"]["status"] == HEALTHY
Loading