From a7c767d4830c56980cf9033dc567b2f9bbc96b6f Mon Sep 17 00:00:00 2001 From: Brent Bovenzi Date: Fri, 11 Oct 2024 11:51:12 +0100 Subject: [PATCH] Add health info to public fastapi --- .../endpoints/health_endpoint.py | 2 + airflow/api_fastapi/openapi/v1-generated.yaml | 97 +++++++++++++ airflow/api_fastapi/serializers/monitor.py | 52 +++++++ airflow/api_fastapi/views/public/__init__.py | 2 + airflow/api_fastapi/views/public/monitor.py | 30 ++++ airflow/ui/openapi-gen/queries/common.ts | 13 ++ airflow/ui/openapi-gen/queries/prefetch.ts | 11 ++ airflow/ui/openapi-gen/queries/queries.ts | 19 +++ airflow/ui/openapi-gen/queries/suspense.ts | 19 +++ .../ui/openapi-gen/requests/schemas.gen.ts | 134 ++++++++++++++++++ .../ui/openapi-gen/requests/services.gen.ts | 15 ++ airflow/ui/openapi-gen/requests/types.gen.ts | 53 +++++++ 12 files changed, 447 insertions(+) create mode 100644 airflow/api_fastapi/serializers/monitor.py create mode 100644 airflow/api_fastapi/views/public/monitor.py diff --git a/airflow/api_connexion/endpoints/health_endpoint.py b/airflow/api_connexion/endpoints/health_endpoint.py index bdcf3bd93a1e1c..12f2548c0e1283 100644 --- a/airflow/api_connexion/endpoints/health_endpoint.py +++ b/airflow/api_connexion/endpoints/health_endpoint.py @@ -20,11 +20,13 @@ from airflow.api.common.airflow_health import get_airflow_health from airflow.api_connexion.schemas.health_schema import health_schema +from airflow.utils.api_migration import mark_fastapi_migration_done if TYPE_CHECKING: from airflow.api_connexion.types import APIResponse +@mark_fastapi_migration_done def get_health() -> APIResponse: """Return the health of the airflow scheduler, metadatabase and triggerer.""" airflow_health_status = get_airflow_health() diff --git a/airflow/api_fastapi/openapi/v1-generated.yaml b/airflow/api_fastapi/openapi/v1-generated.yaml index 7debfbb1008afd..b7738453137225 100644 --- a/airflow/api_fastapi/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/openapi/v1-generated.yaml @@ -679,8 +679,33 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + /public/monitor/health: + get: + tags: + - Monitor + summary: Get Health + operationId: get_health + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/HealthInfoSchema' components: schemas: + BaseInfoSchema: + properties: + status: + anyOf: + - type: string + - type: 'null' + title: Status + type: object + required: + - status + title: BaseInfoSchema + description: Base status field for metadatabase and scheduler. ConnectionResponse: properties: conn_id: @@ -1272,6 +1297,24 @@ components: - dataset_triggered title: DAGRunTypes description: DAG Run Types for responses. + DagProcessorInfoSchema: + properties: + status: + anyOf: + - type: string + - type: 'null' + title: Status + latest_dag_processor_heartbeat: + anyOf: + - type: string + - type: 'null' + title: Latest Dag Processor Heartbeat + type: object + required: + - status + - latest_dag_processor_heartbeat + title: DagProcessorInfoSchema + description: Schema for DagProcessor info. DagRunState: type: string enum: @@ -1346,6 +1389,24 @@ components: title: Detail type: object title: HTTPValidationError + HealthInfoSchema: + properties: + metadatabase: + $ref: '#/components/schemas/BaseInfoSchema' + scheduler: + $ref: '#/components/schemas/SchedulerInfoSchema' + triggerer: + $ref: '#/components/schemas/TriggererInfoSchema' + dag_processor: + $ref: '#/components/schemas/DagProcessorInfoSchema' + type: object + required: + - metadatabase + - scheduler + - triggerer + - dag_processor + title: HealthInfoSchema + description: Schema for the Health endpoint. HistoricalMetricDataResponse: properties: dag_run_types: @@ -1361,6 +1422,24 @@ components: - task_instance_states title: HistoricalMetricDataResponse description: Historical Metric Data serializer for responses. + SchedulerInfoSchema: + properties: + status: + anyOf: + - type: string + - type: 'null' + title: Status + latest_scheduler_heartbeat: + anyOf: + - type: string + - type: 'null' + title: Latest Scheduler Heartbeat + type: object + required: + - status + - latest_scheduler_heartbeat + title: SchedulerInfoSchema + description: Schema for Scheduler info. TaskInstantState: properties: no_status: @@ -1419,6 +1498,24 @@ components: - deferred title: TaskInstantState description: TaskInstance serializer for responses. + TriggererInfoSchema: + properties: + status: + anyOf: + - type: string + - type: 'null' + title: Status + latest_triggerer_heartbeat: + anyOf: + - type: string + - type: 'null' + title: Latest Triggerer Heartbeat + type: object + required: + - status + - latest_triggerer_heartbeat + title: TriggererInfoSchema + description: Schema for Triggerer info. ValidationError: properties: loc: diff --git a/airflow/api_fastapi/serializers/monitor.py b/airflow/api_fastapi/serializers/monitor.py new file mode 100644 index 00000000000000..0734321a45fd51 --- /dev/null +++ b/airflow/api_fastapi/serializers/monitor.py @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from pydantic import BaseModel + + +class BaseInfoSchema(BaseModel): + """Base status field for metadatabase and scheduler.""" + + status: str | None + + +class SchedulerInfoSchema(BaseInfoSchema): + """Schema for Scheduler info.""" + + latest_scheduler_heartbeat: str | None + + +class TriggererInfoSchema(BaseInfoSchema): + """Schema for Triggerer info.""" + + latest_triggerer_heartbeat: str | None + + +class DagProcessorInfoSchema(BaseInfoSchema): + """Schema for DagProcessor info.""" + + latest_dag_processor_heartbeat: str | None + + +class HealthInfoSchema(BaseModel): + """Schema for the Health endpoint.""" + + metadatabase: BaseInfoSchema + scheduler: SchedulerInfoSchema + triggerer: TriggererInfoSchema + dag_processor: DagProcessorInfoSchema diff --git a/airflow/api_fastapi/views/public/__init__.py b/airflow/api_fastapi/views/public/__init__.py index 9d90a0966802c2..de0a0d0bb4a55d 100644 --- a/airflow/api_fastapi/views/public/__init__.py +++ b/airflow/api_fastapi/views/public/__init__.py @@ -20,6 +20,7 @@ from airflow.api_fastapi.views.public.connections import connections_router from airflow.api_fastapi.views.public.dag_run import dag_run_router from airflow.api_fastapi.views.public.dags import dags_router +from airflow.api_fastapi.views.public.monitor import monitor_router from airflow.api_fastapi.views.public.variables import variables_router from airflow.api_fastapi.views.router import AirflowRouter @@ -30,3 +31,4 @@ public_router.include_router(connections_router) public_router.include_router(variables_router) public_router.include_router(dag_run_router) +public_router.include_router(monitor_router) diff --git a/airflow/api_fastapi/views/public/monitor.py b/airflow/api_fastapi/views/public/monitor.py new file mode 100644 index 00000000000000..07916f6e06d532 --- /dev/null +++ b/airflow/api_fastapi/views/public/monitor.py @@ -0,0 +1,30 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from airflow.api.common.airflow_health import get_airflow_health +from airflow.api_fastapi.serializers.monitor import HealthInfoSchema +from airflow.api_fastapi.views.router import AirflowRouter + +monitor_router = AirflowRouter(tags=["Monitor"], prefix="/monitor") + + +@monitor_router.get("/health") +async def get_health() -> HealthInfoSchema: + airflow_health_status = get_airflow_health() + return HealthInfoSchema.model_validate(airflow_health_status) diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index aaff196c0791d5..c47cdbb4de9f6a 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -7,6 +7,7 @@ import { DagRunService, DagService, DashboardService, + MonitorService, VariableService, } from "../requests/services.gen"; import { DagRunState } from "../requests/types.gen"; @@ -185,6 +186,18 @@ export const UseDagRunServiceGetDagRunKeyFn = ( }, queryKey?: Array, ) => [useDagRunServiceGetDagRunKey, ...(queryKey ?? [{ dagId, dagRunId }])]; +export type MonitorServiceGetHealthDefaultResponse = Awaited< + ReturnType +>; +export type MonitorServiceGetHealthQueryResult< + TData = MonitorServiceGetHealthDefaultResponse, + TError = unknown, +> = UseQueryResult; +export const useMonitorServiceGetHealthKey = "MonitorServiceGetHealth"; +export const UseMonitorServiceGetHealthKeyFn = (queryKey?: Array) => [ + useMonitorServiceGetHealthKey, + ...(queryKey ?? []), +]; export type DagServicePatchDagsMutationResult = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/prefetch.ts b/airflow/ui/openapi-gen/queries/prefetch.ts index 3e194302f4be04..eaab751fcdd86f 100644 --- a/airflow/ui/openapi-gen/queries/prefetch.ts +++ b/airflow/ui/openapi-gen/queries/prefetch.ts @@ -7,6 +7,7 @@ import { DagRunService, DagService, DashboardService, + MonitorService, VariableService, } from "../requests/services.gen"; import { DagRunState } from "../requests/types.gen"; @@ -229,3 +230,13 @@ export const prefetchUseDagRunServiceGetDagRun = ( queryKey: Common.UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }), queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }), }); +/** + * Get Health + * @returns HealthInfoSchema Successful Response + * @throws ApiError + */ +export const prefetchUseMonitorServiceGetHealth = (queryClient: QueryClient) => + queryClient.prefetchQuery({ + queryKey: Common.UseMonitorServiceGetHealthKeyFn(), + queryFn: () => MonitorService.getHealth(), + }); diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 19bb17b342a848..4796a9a9531b77 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -12,6 +12,7 @@ import { DagRunService, DagService, DashboardService, + MonitorService, VariableService, } from "../requests/services.gen"; import { DAGPatchBody, DagRunState } from "../requests/types.gen"; @@ -295,6 +296,24 @@ export const useDagRunServiceGetDagRun = < queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }) as TData, ...options, }); +/** + * Get Health + * @returns HealthInfoSchema Successful Response + * @throws ApiError + */ +export const useMonitorServiceGetHealth = < + TData = Common.MonitorServiceGetHealthDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useQuery({ + queryKey: Common.UseMonitorServiceGetHealthKeyFn(queryKey), + queryFn: () => MonitorService.getHealth() as TData, + ...options, + }); /** * Patch Dags * Patch multiple DAGs. diff --git a/airflow/ui/openapi-gen/queries/suspense.ts b/airflow/ui/openapi-gen/queries/suspense.ts index 79ad479f0a42fb..732a4fe2ba2d21 100644 --- a/airflow/ui/openapi-gen/queries/suspense.ts +++ b/airflow/ui/openapi-gen/queries/suspense.ts @@ -7,6 +7,7 @@ import { DagRunService, DagService, DashboardService, + MonitorService, VariableService, } from "../requests/services.gen"; import { DagRunState } from "../requests/types.gen"; @@ -290,3 +291,21 @@ export const useDagRunServiceGetDagRunSuspense = < queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }) as TData, ...options, }); +/** + * Get Health + * @returns HealthInfoSchema Successful Response + * @throws ApiError + */ +export const useMonitorServiceGetHealthSuspense = < + TData = Common.MonitorServiceGetHealthDefaultResponse, + TError = unknown, + TQueryKey extends Array = unknown[], +>( + queryKey?: TQueryKey, + options?: Omit, "queryKey" | "queryFn">, +) => + useSuspenseQuery({ + queryKey: Common.UseMonitorServiceGetHealthKeyFn(queryKey), + queryFn: () => MonitorService.getHealth() as TData, + ...options, + }); diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 18df5284651b7b..8c43174f27b794 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -1,5 +1,25 @@ // This file is auto-generated by @hey-api/openapi-ts +export const $BaseInfoSchema = { + properties: { + status: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Status", + }, + }, + type: "object", + required: ["status"], + title: "BaseInfoSchema", + description: "Base status field for metadatabase and scheduler.", +} as const; + export const $ConnectionResponse = { properties: { conn_id: { @@ -973,6 +993,37 @@ export const $DAGRunTypes = { description: "DAG Run Types for responses.", } as const; +export const $DagProcessorInfoSchema = { + properties: { + status: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Status", + }, + latest_dag_processor_heartbeat: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Latest Dag Processor Heartbeat", + }, + }, + type: "object", + required: ["status", "latest_dag_processor_heartbeat"], + title: "DagProcessorInfoSchema", + description: "Schema for DagProcessor info.", +} as const; + export const $DagRunState = { type: "string", enum: ["queued", "running", "success", "failed"], @@ -1059,6 +1110,27 @@ export const $HTTPValidationError = { title: "HTTPValidationError", } as const; +export const $HealthInfoSchema = { + properties: { + metadatabase: { + $ref: "#/components/schemas/BaseInfoSchema", + }, + scheduler: { + $ref: "#/components/schemas/SchedulerInfoSchema", + }, + triggerer: { + $ref: "#/components/schemas/TriggererInfoSchema", + }, + dag_processor: { + $ref: "#/components/schemas/DagProcessorInfoSchema", + }, + }, + type: "object", + required: ["metadatabase", "scheduler", "triggerer", "dag_processor"], + title: "HealthInfoSchema", + description: "Schema for the Health endpoint.", +} as const; + export const $HistoricalMetricDataResponse = { properties: { dag_run_types: { @@ -1077,6 +1149,37 @@ export const $HistoricalMetricDataResponse = { description: "Historical Metric Data serializer for responses.", } as const; +export const $SchedulerInfoSchema = { + properties: { + status: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Status", + }, + latest_scheduler_heartbeat: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Latest Scheduler Heartbeat", + }, + }, + type: "object", + required: ["status", "latest_scheduler_heartbeat"], + title: "SchedulerInfoSchema", + description: "Schema for Scheduler info.", +} as const; + export const $TaskInstantState = { properties: { no_status: { @@ -1152,6 +1255,37 @@ export const $TaskInstantState = { description: "TaskInstance serializer for responses.", } as const; +export const $TriggererInfoSchema = { + properties: { + status: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Status", + }, + latest_triggerer_heartbeat: { + anyOf: [ + { + type: "string", + }, + { + type: "null", + }, + ], + title: "Latest Triggerer Heartbeat", + }, + }, + type: "object", + required: ["status", "latest_triggerer_heartbeat"], + title: "TriggererInfoSchema", + description: "Schema for Triggerer info.", +} as const; + export const $ValidationError = { properties: { loc: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 9a126aef25fbc3..a318a0b2c34ddc 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -27,6 +27,7 @@ import type { GetVariableResponse, GetDagRunData, GetDagRunResponse, + GetHealthResponse, } from "./types.gen"; export class AssetService { @@ -392,3 +393,17 @@ export class DagRunService { }); } } + +export class MonitorService { + /** + * Get Health + * @returns HealthInfoSchema Successful Response + * @throws ApiError + */ + public static getHealth(): CancelablePromise { + return __request(OpenAPI, { + method: "GET", + url: "/public/monitor/health", + }); + } +} diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 45bfa51aec9c4e..d52a9978225b9d 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1,5 +1,12 @@ // This file is auto-generated by @hey-api/openapi-ts +/** + * Base status field for metadatabase and scheduler. + */ +export type BaseInfoSchema = { + status: string | null; +}; + /** * Connection serializer for responses. */ @@ -162,6 +169,14 @@ export type DAGRunTypes = { dataset_triggered: number; }; +/** + * Schema for DagProcessor info. + */ +export type DagProcessorInfoSchema = { + status: string | null; + latest_dag_processor_heartbeat: string | null; +}; + /** * All possible states that a DagRun can be in. * @@ -216,6 +231,16 @@ export type HTTPValidationError = { detail?: Array; }; +/** + * Schema for the Health endpoint. + */ +export type HealthInfoSchema = { + metadatabase: BaseInfoSchema; + scheduler: SchedulerInfoSchema; + triggerer: TriggererInfoSchema; + dag_processor: DagProcessorInfoSchema; +}; + /** * Historical Metric Data serializer for responses. */ @@ -225,6 +250,14 @@ export type HistoricalMetricDataResponse = { task_instance_states: TaskInstantState; }; +/** + * Schema for Scheduler info. + */ +export type SchedulerInfoSchema = { + status: string | null; + latest_scheduler_heartbeat: string | null; +}; + /** * TaskInstance serializer for responses. */ @@ -244,6 +277,14 @@ export type TaskInstantState = { deferred: number; }; +/** + * Schema for Triggerer info. + */ +export type TriggererInfoSchema = { + status: string | null; + latest_triggerer_heartbeat: string | null; +}; + export type ValidationError = { loc: Array; msg: string; @@ -355,6 +396,8 @@ export type GetDagRunData = { export type GetDagRunResponse = DAGRunResponse; +export type GetHealthResponse = HealthInfoSchema; + export type $OpenApiTs = { "/ui/next_run_assets/{dag_id}": { get: { @@ -658,4 +701,14 @@ export type $OpenApiTs = { }; }; }; + "/public/monitor/health": { + get: { + res: { + /** + * Successful Response + */ + 200: HealthInfoSchema; + }; + }; + }; };