Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions airflow-core/docs/core-concepts/dag-run.rst
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,28 @@ Example of a parameterized Dag:

**Note**: The parameters from ``dag_run.conf`` can only be used in a template field of an operator.

Wait for a Dag Run
------------------

Airflow provides an experimental API to **wait for a Dag run to complete**. This is particularly useful when integrating Airflow into external systems or automation pipelines that need to pause execution until a Dag finishes.

The endpoint blocks (by polling) until the specified Dag run reaches a terminal state: ``success``, ``failed``, or ``canceled``.

This endpoint streams responses using the **NDJSON (Newline-Delimited JSON)** format. Each line in the response is a JSON object representing the state of the Dag run at that moment.

For example:

.. code-block:: none

{"state": "running"}
{"state": "success", "results": {"op": 42}}

This allows clients to monitor the run in real time and optionally collect XCom results from specific tasks.

.. note::

This feature is **experimental** and may change or be removed in future Airflow versions.

Using CLI
^^^^^^^^^^^

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2336,7 +2336,8 @@ paths:
summary: 'Experimental: Wait for a dag run to complete, and return task results
if requested.'
description: "\U0001F6A7 This is an experimental endpoint and may change or\
\ be removed without notice."
\ be removed without notice.Successful response are streamed as newline-delimited\
\ JSON (NDJSON). Each line is a JSON object representing the DAG run state."
operationId: wait_dag_run_until_finished
security:
- OAuth2PasswordBearer: []
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ def trigger_dag_run(
"/{dag_run_id}/wait",
tags=["experimental"],
summary="Experimental: Wait for a dag run to complete, and return task results if requested.",
description="🚧 This is an experimental endpoint and may change or be removed without notice.",
description="🚧 This is an experimental endpoint and may change or be removed without notice.Successful response are streamed as newline-delimited JSON (NDJSON). Each line is a JSON object representing the DAG run state.",
responses={
**create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
status.HTTP_200_OK: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ export const ensureUseDagRunServiceGetDagRunsData = (queryClient: QueryClient, {
}) => queryClient.ensureQueryData({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ dagId, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) });
/**
* Experimental: Wait for a dag run to complete, and return task results if requested.
* 🚧 This is an experimental endpoint and may change or be removed without notice.
* 🚧 This is an experimental endpoint and may change or be removed without notice.Successful response are streamed as newline-delimited JSON (NDJSON). Each line is a JSON object representing the DAG run state.
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
Expand All @@ -342,7 +342,7 @@ export const ensureUseDagRunServiceWaitDagRunUntilFinishedData = (queryClient: Q
}) => queryClient.ensureQueryData({ queryKey: Common.UseDagRunServiceWaitDagRunUntilFinishedKeyFn({ dagId, dagRunId, interval, result }), queryFn: () => DagRunService.waitDagRunUntilFinished({ dagId, dagRunId, interval, result }) });
/**
* Experimental: Wait for a dag run to complete, and return task results if requested.
* 🚧 This is an experimental endpoint and may change or be removed without notice.
* 🚧 This is an experimental endpoint and may change or be removed without notice.Successful response are streamed as newline-delimited JSON (NDJSON). Each line is a JSON object representing the DAG run state.
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
Expand Down
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ export const prefetchUseDagRunServiceGetDagRuns = (queryClient: QueryClient, { d
}) => queryClient.prefetchQuery({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }), queryFn: () => DagRunService.getDagRuns({ dagId, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) });
/**
* Experimental: Wait for a dag run to complete, and return task results if requested.
* 🚧 This is an experimental endpoint and may change or be removed without notice.
* 🚧 This is an experimental endpoint and may change or be removed without notice.Successful response are streamed as newline-delimited JSON (NDJSON). Each line is a JSON object representing the DAG run state.
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
Expand All @@ -342,7 +342,7 @@ export const prefetchUseDagRunServiceWaitDagRunUntilFinished = (queryClient: Que
}) => queryClient.prefetchQuery({ queryKey: Common.UseDagRunServiceWaitDagRunUntilFinishedKeyFn({ dagId, dagRunId, interval, result }), queryFn: () => DagRunService.waitDagRunUntilFinished({ dagId, dagRunId, interval, result }) });
/**
* Experimental: Wait for a dag run to complete, and return task results if requested.
* 🚧 This is an experimental endpoint and may change or be removed without notice.
* 🚧 This is an experimental endpoint and may change or be removed without notice.Successful response are streamed as newline-delimited JSON (NDJSON). Each line is a JSON object representing the DAG run state.
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
Expand Down
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ export const useDagRunServiceGetDagRuns = <TData = Common.DagRunServiceGetDagRun
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }, queryKey), queryFn: () => DagRunService.getDagRuns({ dagId, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) as TData, ...options });
/**
* Experimental: Wait for a dag run to complete, and return task results if requested.
* 🚧 This is an experimental endpoint and may change or be removed without notice.
* 🚧 This is an experimental endpoint and may change or be removed without notice.Successful response are streamed as newline-delimited JSON (NDJSON). Each line is a JSON object representing the DAG run state.
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
Expand All @@ -342,7 +342,7 @@ export const useDagRunServiceWaitDagRunUntilFinished = <TData = Common.DagRunSer
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: Common.UseDagRunServiceWaitDagRunUntilFinishedKeyFn({ dagId, dagRunId, interval, result }, queryKey), queryFn: () => DagRunService.waitDagRunUntilFinished({ dagId, dagRunId, interval, result }) as TData, ...options });
/**
* Experimental: Wait for a dag run to complete, and return task results if requested.
* 🚧 This is an experimental endpoint and may change or be removed without notice.
* 🚧 This is an experimental endpoint and may change or be removed without notice.Successful response are streamed as newline-delimited JSON (NDJSON). Each line is a JSON object representing the DAG run state.
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
Expand Down
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ export const useDagRunServiceGetDagRunsSuspense = <TData = Common.DagRunServiceG
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: Common.UseDagRunServiceGetDagRunsKeyFn({ dagId, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }, queryKey), queryFn: () => DagRunService.getDagRuns({ dagId, dagVersion, endDateGt, endDateGte, endDateLt, endDateLte, limit, logicalDateGt, logicalDateGte, logicalDateLt, logicalDateLte, offset, orderBy, runAfterGt, runAfterGte, runAfterLt, runAfterLte, runIdPattern, runType, startDateGt, startDateGte, startDateLt, startDateLte, state, triggeringUserNamePattern, updatedAtGt, updatedAtGte, updatedAtLt, updatedAtLte }) as TData, ...options });
/**
* Experimental: Wait for a dag run to complete, and return task results if requested.
* 🚧 This is an experimental endpoint and may change or be removed without notice.
* 🚧 This is an experimental endpoint and may change or be removed without notice.Successful response are streamed as newline-delimited JSON (NDJSON). Each line is a JSON object representing the DAG run state.
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
Expand All @@ -342,7 +342,7 @@ export const useDagRunServiceWaitDagRunUntilFinishedSuspense = <TData = Common.D
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: Common.UseDagRunServiceWaitDagRunUntilFinishedKeyFn({ dagId, dagRunId, interval, result }, queryKey), queryFn: () => DagRunService.waitDagRunUntilFinished({ dagId, dagRunId, interval, result }) as TData, ...options });
/**
* Experimental: Wait for a dag run to complete, and return task results if requested.
* 🚧 This is an experimental endpoint and may change or be removed without notice.
* 🚧 This is an experimental endpoint and may change or be removed without notice.Successful response are streamed as newline-delimited JSON (NDJSON). Each line is a JSON object representing the DAG run state.
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1080,7 +1080,7 @@ export class DagRunService {

/**
* Experimental: Wait for a dag run to complete, and return task results if requested.
* 🚧 This is an experimental endpoint and may change or be removed without notice.
* 🚧 This is an experimental endpoint and may change or be removed without notice.Successful response are streamed as newline-delimited JSON (NDJSON). Each line is a JSON object representing the DAG run state.
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
Expand Down Expand Up @@ -1142,7 +1142,7 @@ export class DagRunService {
export class ExperimentalService {
/**
* Experimental: Wait for a dag run to complete, and return task results if requested.
* 🚧 This is an experimental endpoint and may change or be removed without notice.
* 🚧 This is an experimental endpoint and may change or be removed without notice.Successful response are streamed as newline-delimited JSON (NDJSON). Each line is a JSON object representing the DAG run state.
* @param data The data for the request.
* @param data.dagId
* @param data.dagRunId
Expand Down
1 change: 1 addition & 0 deletions clients/python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ Class | Method | HTTP request | Description
*DagRunApi* | [**get_dag_runs**](docs/DagRunApi.md#get_dag_runs) | **GET** /api/v2/dags/{dag_id}/dagRuns | Get Dag Runs
*DagRunApi* | [**get_list_dag_runs_batch**](docs/DagRunApi.md#get_list_dag_runs_batch) | **POST** /api/v2/dags/{dag_id}/dagRuns/list | Get List Dag Runs Batch
*DagRunApi* | [**get_upstream_asset_events**](docs/DagRunApi.md#get_upstream_asset_events) | **GET** /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamAssetEvents | Get Upstream Asset Events
*DagRunApi* | [**wait_dag_run_until_finished**](docs/DagRunApi.md#wait_dag_run_until_finished) | **GET** /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/wait | Experimental: Wait for a dag run to complete, and return task results if requested.
*DagRunApi* | [**patch_dag_run**](docs/DagRunApi.md#patch_dag_run) | **PATCH** /api/v2/dags/{dag_id}/dagRuns/{dag_run_id} | Patch Dag Run
*DagRunApi* | [**trigger_dag_run**](docs/DagRunApi.md#trigger_dag_run) | **POST** /api/v2/dags/{dag_id}/dagRuns | Trigger Dag Run
*DagSourceApi* | [**get_dag_source**](docs/DagSourceApi.md#get_dag_source) | **GET** /api/v2/dagSources/{dag_id} | Get Dag Source
Expand Down