Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate health info to fastapi #42938

Merged
merged 2 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions airflow/api_connexion/endpoints/health_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@

from airflow.api.common.airflow_health import get_airflow_health
from airflow.api_connexion.schemas.health_schema import health_schema
from airflow.utils.api_migration import mark_fastapi_migration_done

if TYPE_CHECKING:
from airflow.api_connexion.types import APIResponse


@mark_fastapi_migration_done
def get_health() -> APIResponse:
"""Return the health of the airflow scheduler, metadatabase and triggerer."""
airflow_health_status = get_airflow_health()
Expand Down
103 changes: 100 additions & 3 deletions airflow/api_fastapi/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -944,8 +944,33 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/monitor/health:
get:
tags:
- Monitor
summary: Get Health
operationId: get_health
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/HealthInfoSchema'
components:
schemas:
BaseInfoSchema:
properties:
status:
anyOf:
- type: string
- type: 'null'
title: Status
type: object
required:
- status
title: BaseInfoSchema
description: Base status field for metadatabase and scheduler.
ConnectionCollectionResponse:
properties:
connections:
Expand Down Expand Up @@ -1553,6 +1578,24 @@ components:
- dataset_triggered
title: DAGRunTypes
description: DAG Run Types for responses.
DagProcessorInfoSchema:
properties:
status:
anyOf:
- type: string
- type: 'null'
title: Status
latest_dag_processor_heartbeat:
anyOf:
- type: string
- type: 'null'
title: Latest Dag Processor Heartbeat
type: object
required:
- status
- latest_dag_processor_heartbeat
title: DagProcessorInfoSchema
description: Schema for DagProcessor info.
DagRunState:
type: string
enum:
Expand Down Expand Up @@ -1627,22 +1670,58 @@ components:
title: Detail
type: object
title: HTTPValidationError
HealthInfoSchema:
properties:
metadatabase:
$ref: '#/components/schemas/BaseInfoSchema'
scheduler:
$ref: '#/components/schemas/SchedulerInfoSchema'
triggerer:
$ref: '#/components/schemas/TriggererInfoSchema'
dag_processor:
$ref: '#/components/schemas/DagProcessorInfoSchema'
type: object
required:
- metadatabase
- scheduler
- triggerer
- dag_processor
title: HealthInfoSchema
description: Schema for the Health endpoint.
HistoricalMetricDataResponse:
properties:
dag_run_types:
$ref: '#/components/schemas/DAGRunTypes'
dag_run_states:
$ref: '#/components/schemas/DAGRunStates'
task_instance_states:
$ref: '#/components/schemas/TaskInstantState'
$ref: '#/components/schemas/TaskInstanceState'
type: object
required:
- dag_run_types
- dag_run_states
- task_instance_states
title: HistoricalMetricDataResponse
description: Historical Metric Data serializer for responses.
TaskInstantState:
SchedulerInfoSchema:
properties:
status:
anyOf:
- type: string
- type: 'null'
title: Status
latest_scheduler_heartbeat:
anyOf:
- type: string
- type: 'null'
title: Latest Scheduler Heartbeat
type: object
required:
- status
- latest_scheduler_heartbeat
title: SchedulerInfoSchema
description: Schema for Scheduler info.
TaskInstanceState:
properties:
no_status:
type: integer
Expand Down Expand Up @@ -1698,8 +1777,26 @@ components:
- upstream_failed
- skipped
- deferred
title: TaskInstantState
title: TaskInstanceState
description: TaskInstance serializer for responses.
TriggererInfoSchema:
properties:
status:
anyOf:
- type: string
- type: 'null'
title: Status
latest_triggerer_heartbeat:
anyOf:
- type: string
- type: 'null'
title: Latest Triggerer Heartbeat
type: object
required:
- status
- latest_triggerer_heartbeat
title: TriggererInfoSchema
description: Schema for Triggerer info.
ValidationError:
properties:
loc:
Expand Down
4 changes: 2 additions & 2 deletions airflow/api_fastapi/serializers/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class DAGRunStates(BaseModel):
failed: int


class TaskInstantState(BaseModel):
class TaskInstanceState(BaseModel):
"""TaskInstance serializer for responses."""

no_status: int
Expand All @@ -60,4 +60,4 @@ class HistoricalMetricDataResponse(BaseModel):

dag_run_types: DAGRunTypes
dag_run_states: DAGRunStates
task_instance_states: TaskInstantState
task_instance_states: TaskInstanceState
52 changes: 52 additions & 0 deletions airflow/api_fastapi/serializers/monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from pydantic import BaseModel


class BaseInfoSchema(BaseModel):
"""Base status field for metadatabase and scheduler."""

status: str | None


class SchedulerInfoSchema(BaseInfoSchema):
"""Schema for Scheduler info."""

latest_scheduler_heartbeat: str | None


class TriggererInfoSchema(BaseInfoSchema):
"""Schema for Triggerer info."""

latest_triggerer_heartbeat: str | None


class DagProcessorInfoSchema(BaseInfoSchema):
"""Schema for DagProcessor info."""

latest_dag_processor_heartbeat: str | None


class HealthInfoSchema(BaseModel):
"""Schema for the Health endpoint."""

metadatabase: BaseInfoSchema
scheduler: SchedulerInfoSchema
triggerer: TriggererInfoSchema
dag_processor: DagProcessorInfoSchema
2 changes: 2 additions & 0 deletions airflow/api_fastapi/views/public/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from airflow.api_fastapi.views.public.connections import connections_router
from airflow.api_fastapi.views.public.dag_run import dag_run_router
from airflow.api_fastapi.views.public.dags import dags_router
from airflow.api_fastapi.views.public.monitor import monitor_router
from airflow.api_fastapi.views.public.variables import variables_router
from airflow.api_fastapi.views.router import AirflowRouter

Expand All @@ -30,3 +31,4 @@
public_router.include_router(connections_router)
public_router.include_router(variables_router)
public_router.include_router(dag_run_router)
public_router.include_router(monitor_router)
30 changes: 30 additions & 0 deletions airflow/api_fastapi/views/public/monitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from airflow.api.common.airflow_health import get_airflow_health
from airflow.api_fastapi.serializers.monitor import HealthInfoSchema
from airflow.api_fastapi.views.router import AirflowRouter

monitor_router = AirflowRouter(tags=["Monitor"], prefix="/monitor")


@monitor_router.get("/health")
async def get_health() -> HealthInfoSchema:
airflow_health_status = get_airflow_health()
return HealthInfoSchema.model_validate(airflow_health_status)
13 changes: 13 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
DagRunService,
DagService,
DashboardService,
MonitorService,
VariableService,
} from "../requests/services.gen";
import { DagRunState } from "../requests/types.gen";
Expand Down Expand Up @@ -209,6 +210,18 @@ export const UseDagRunServiceGetDagRunKeyFn = (
},
queryKey?: Array<unknown>,
) => [useDagRunServiceGetDagRunKey, ...(queryKey ?? [{ dagId, dagRunId }])];
export type MonitorServiceGetHealthDefaultResponse = Awaited<
ReturnType<typeof MonitorService.getHealth>
>;
export type MonitorServiceGetHealthQueryResult<
TData = MonitorServiceGetHealthDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
export const useMonitorServiceGetHealthKey = "MonitorServiceGetHealth";
export const UseMonitorServiceGetHealthKeyFn = (queryKey?: Array<unknown>) => [
useMonitorServiceGetHealthKey,
...(queryKey ?? []),
];
export type VariableServicePostVariableMutationResult = Awaited<
ReturnType<typeof VariableService.postVariable>
>;
Expand Down
11 changes: 11 additions & 0 deletions airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
DagRunService,
DagService,
DashboardService,
MonitorService,
VariableService,
} from "../requests/services.gen";
import { DagRunState } from "../requests/types.gen";
Expand Down Expand Up @@ -259,3 +260,13 @@ export const prefetchUseDagRunServiceGetDagRun = (
queryKey: Common.UseDagRunServiceGetDagRunKeyFn({ dagId, dagRunId }),
queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }),
});
/**
* Get Health
* @returns HealthInfoSchema Successful Response
* @throws ApiError
*/
export const prefetchUseMonitorServiceGetHealth = (queryClient: QueryClient) =>
queryClient.prefetchQuery({
queryKey: Common.UseMonitorServiceGetHealthKeyFn(),
queryFn: () => MonitorService.getHealth(),
});
19 changes: 19 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
DagRunService,
DagService,
DashboardService,
MonitorService,
VariableService,
} from "../requests/services.gen";
import { DAGPatchBody, DagRunState, VariableBody } from "../requests/types.gen";
Expand Down Expand Up @@ -331,6 +332,24 @@ export const useDagRunServiceGetDagRun = <
queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }) as TData,
...options,
});
/**
* Get Health
* @returns HealthInfoSchema Successful Response
* @throws ApiError
*/
export const useMonitorServiceGetHealth = <
TData = Common.MonitorServiceGetHealthDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useQuery<TData, TError>({
queryKey: Common.UseMonitorServiceGetHealthKeyFn(queryKey),
queryFn: () => MonitorService.getHealth() as TData,
...options,
});
/**
* Post Variable
* Create a variable.
Expand Down
19 changes: 19 additions & 0 deletions airflow/ui/openapi-gen/queries/suspense.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
DagRunService,
DagService,
DashboardService,
MonitorService,
VariableService,
} from "../requests/services.gen";
import { DagRunState } from "../requests/types.gen";
Expand Down Expand Up @@ -326,3 +327,21 @@ export const useDagRunServiceGetDagRunSuspense = <
queryFn: () => DagRunService.getDagRun({ dagId, dagRunId }) as TData,
...options,
});
/**
* Get Health
* @returns HealthInfoSchema Successful Response
* @throws ApiError
*/
export const useMonitorServiceGetHealthSuspense = <
TData = Common.MonitorServiceGetHealthDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useSuspenseQuery<TData, TError>({
queryKey: Common.UseMonitorServiceGetHealthKeyFn(queryKey),
queryFn: () => MonitorService.getHealth() as TData,
...options,
});
Loading