Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
e9e45a2
Added a confirm dialog modal that appears only when the task is in ru…
kncatalan Sep 15, 2025
fb4e814
Removed unnecessary comments
kncatalan Sep 15, 2025
a6606a5
Added clearTask texts in dags.json in the EN locale. Used translate i…
kncatalan Sep 18, 2025
28ccb4a
(NON-WORKING/ Experimental) added a flag in taskinstance.py. backend …
kncatalan Sep 18, 2025
68cfb78
(EXPERIMENTAL) Removed the confirm pop-up and replaced with checkbox …
kncatalan Sep 19, 2025
4dc0cde
Added mirgin-right auto for the checkbox
kncatalan Sep 19, 2025
19bca7c
Change isRunning to task_confirmed_running. Moved the HTTPException t…
kncatalan Sep 22, 2025
0571c4c
Made the checkbox checked on default. is_running_message set to true …
kncatalan Sep 23, 2025
cebfb32
Added the code 400 to line 688.
kncatalan Sep 23, 2025
dd74fb3
Cleaned up the code and attempted to adress the error in the tests, e…
kncatalan Sep 23, 2025
37e0249
Made sure the date variable is not null and changed the string in the…
kncatalan Sep 26, 2025
96e3910
Changed the hardcoded english into an i18n translate. Added preventRu…
kncatalan Sep 26, 2025
fdcaeb7
Renamed the is_running_message to prevent_running_tasks. Removed prev…
kncatalan Sep 29, 2025
e096164
Added a custom airflow exception for clearing running task instance. …
kncatalan Oct 2, 2025
4a04425
Added retry 0 in use clearTaskInstances.
kncatalan Oct 2, 2025
7230e35
Added a conditional for error toast that states the task is running i…
kncatalan Oct 2, 2025
a86bc34
added check for null in error.detail
kncatalan Oct 2, 2025
3b6eade
removed unnecessary imports in task_instances.py
kncatalan Oct 2, 2025
e0687d1
made the prevent_running_task optional to pass checks
kncatalan Oct 3, 2025
c337e13
Merge branch 'main' into add-modal-confirmation-dialog
kncatalan Oct 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ class ClearTaskInstancesBody(StrictBaseModel):
description="(Experimental) Run on the latest bundle version of the dag after "
"clearing the task instances.",
)
prevent_running_task: bool = False

@model_validator(mode="before")
@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
)
from airflow.api_fastapi.logging.decorators import action_logging
from airflow.exceptions import TaskNotFound
from airflow.exceptions import AirflowClearRunningTaskException
from airflow.models import Base, DagRun
from airflow.models.taskinstance import TaskInstance as TI, clear_task_instances
from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH
Expand Down Expand Up @@ -689,7 +690,7 @@ def get_mapped_task_instance_try_details(

@task_instances_router.post(
"/clearTaskInstances",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND, status.HTTP_400_BAD_REQUEST]),
dependencies=[
Depends(action_logging()),
Depends(requires_access_dag(method="PUT", access_entity=DagAccessEntity.TASK_INSTANCE)),
Expand Down Expand Up @@ -774,12 +775,16 @@ def post_clear_task_instances(
)

if not dry_run:
clear_task_instances(
task_instances,
session,
DagRunState.QUEUED if reset_dag_runs else False,
run_on_latest_version=body.run_on_latest_version,
)
try:
clear_task_instances(
task_instances,
session,
DagRunState.QUEUED if reset_dag_runs else False,
run_on_latest_version=body.run_on_latest_version,
prevent_running_task=body.prevent_running_task,
)
except AirflowClearRunningTaskException as e:
raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e)) from e

return TaskInstanceCollectionResponse(
task_instances=task_instances,
Expand Down
3 changes: 3 additions & 0 deletions airflow-core/src/airflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,3 +535,6 @@ def __getattr__(name: str):
return AirflowDagCycleException

raise AttributeError(f"module '{__name__}' has no attribute '{name}'")

class AirflowClearRunningTaskException(AirflowException):
"""Raise when the user attempts to clear currently running tasks."""
16 changes: 13 additions & 3 deletions airflow-core/src/airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
from airflow.utils.span_status import SpanStatus
from airflow.utils.sqlalchemy import ExecutorConfigType, ExtendedJSON, UtcDateTime
from airflow.utils.state import DagRunState, State, TaskInstanceState
from typing import Optional

TR = TaskReschedule

Expand Down Expand Up @@ -195,6 +196,7 @@ def clear_task_instances(
session: Session,
dag_run_state: DagRunState | Literal[False] = DagRunState.QUEUED,
run_on_latest_version: bool = False,
prevent_running_task: Optional[bool] = False,
) -> None:
"""
Clear a set of task instances, but make sure the running ones get killed.
Expand All @@ -215,15 +217,23 @@ def clear_task_instances(
"""
task_instance_ids: list[str] = []
from airflow.models.dagbag import DBDagBag
from airflow.exceptions import AirflowClearRunningTaskException

scheduler_dagbag = DBDagBag(load_op_links=False)
for ti in tis:
task_instance_ids.append(ti.id)
ti.prepare_db_for_next_try(session)

if ti.state == TaskInstanceState.RUNNING:
# If a task is cleared when running, set its state to RESTARTING so that
# the task is terminated and becomes eligible for retry.
ti.state = TaskInstanceState.RESTARTING
if prevent_running_task:
raise AirflowClearRunningTaskException("Task is running, stopping attempt to clear.")
# Prevents the task from re-running and clearing when prevent_running_task is True.

else:
ti.state = TaskInstanceState.RESTARTING
# If a task is cleared when running and the prevent_running_task is false,
# set its state to RESTARTING so that
# the task is terminated and becomes eligible for retry.
else:
dr = ti.dag_run
if run_on_latest_version:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export class ApiError extends Error {
public readonly status: number;
public readonly statusText: string;
public readonly body: unknown;
public readonly detail: string;
public readonly request: ApiRequestOptions;

constructor(request: ApiRequestOptions, response: ApiResult, message: string) {
Expand All @@ -16,6 +17,7 @@ export class ApiError extends Error {
this.status = response.status;
this.statusText = response.statusText;
this.body = response.body;
this.detail = response.body.detail;
this.request = request;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2569,6 +2569,7 @@ export class TaskInstanceService {
body: data.requestBody,
mediaType: 'application/json',
errors: {
400: 'Bad Request',
401: 'Unauthorized',
403: 'Forbidden',
404: 'Not Found',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@
"past": "Past",
"queueNew": "Queue up new tasks",
"runOnLatestVersion": "Run with latest bundle version",
"upstream": "Upstream"
"upstream": "Upstream",
"preventRunningTasks": "Prevent rerun if task is running"
}
},
"search": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ const ClearTaskInstanceDialog = ({ onClose, open, taskInstance }: Props) => {
const upstream = selectedOptions.includes("upstream");
const downstream = selectedOptions.includes("downstream");
const [runOnLatestVersion, setRunOnLatestVersion] = useState(false);
const [preventRunningTask, setPreventRunningTask] = useState(true);

const [note, setNote] = useState<string | null>(taskInstance.note);
const { isPending: isPendingPatchDagRun, mutate: mutatePatchTaskInstance } = usePatchTaskInstance({
Expand Down Expand Up @@ -170,6 +171,13 @@ const ClearTaskInstanceDialog = ({ onClose, open, taskInstance }: Props) => {
{translate("dags:runAndTaskActions.options.runOnLatestVersion")}
</Checkbox>
) : undefined}
<Checkbox
checked={preventRunningTask}
style={{ marginRight: "auto"}}
onCheckedChange={(event) => setPreventRunningTask(Boolean(event.checked))}
>
{translate("dags:runAndTaskActions.options.preventRunningTasks")}
</Checkbox>
<Button
colorPalette="brand"
disabled={affectedTasks.total_entries === 0}
Expand All @@ -187,6 +195,7 @@ const ClearTaskInstanceDialog = ({ onClose, open, taskInstance }: Props) => {
only_failed: onlyFailed,
run_on_latest_version: runOnLatestVersion,
task_ids: [[taskId, mapIndex]],
...(preventRunningTask ? { prevent_running_task: true } : {}),
},
});
if (note !== taskInstance.note) {
Expand All @@ -209,4 +218,4 @@ const ClearTaskInstanceDialog = ({ onClose, open, taskInstance }: Props) => {
);
};

export default ClearTaskInstanceDialog;
export default ClearTaskInstanceDialog;
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import { toaster } from "src/components/ui";

import { useClearTaskInstancesDryRunKey } from "./useClearTaskInstancesDryRun";
import { usePatchTaskInstanceDryRunKey } from "./usePatchTaskInstanceDryRun";
import { ApiError } from "openapi/requests";

export const useClearTaskInstances = ({
dagId,
Expand All @@ -46,12 +47,22 @@ export const useClearTaskInstances = ({
const queryClient = useQueryClient();
const { t: translate } = useTranslation("dags");

const onError = (error: Error) => {
toaster.create({
description: error.message,
title: translate("dags:runAndTaskActions.clear.error", { type: translate("taskInstance_one") }),
type: "error",
});
const onError = (error: ApiError) => {
if ( error.detail != null && error.detail == "Task is running, stopping attempt to clear." ){
toaster.create({
description: error.detail,
title: translate("dags:runAndTaskActions.clear.error", { type: translate("common:taskInstance_one") }),
type: "error",
});
}

else{
toaster.create({
description: error.message,
title: translate("dags:runAndTaskActions.clear.error", { type: translate("common:taskInstance_one") }),
type: "error",
});
}
};

const onSuccess = async (
Expand Down Expand Up @@ -103,5 +114,8 @@ export const useClearTaskInstances = ({
return useTaskInstanceServicePostClearTaskInstances({
onError,
onSuccess,
// This function uses the mutation function of React
// For showing the error toast immediately, set retry to 0
retry: 0
});
};
30 changes: 30 additions & 0 deletions airflow-core/src/airflow/ui/src/utils/datetimeUtils.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,33 @@
expect(renderDuration(0.000_01)).toBe(undefined);
});
});

import { vi } from "vitest";

describe("getRelativeTime", () => {
const fixedNow = new Date("2024-03-14T10:00:10.000Z");

beforeAll(() => {
vi.useFakeTimers();
vi.setSystemTime(fixedNow);
});

afterAll(() => {
vi.useRealTimers();
});

it("returns relative time for a valid date", () => {
const date = "2024-03-14T10:00:00.000Z";
expect(getRelativeTime(date)).toBe("a few seconds ago");

Check failure on line 76 in airflow-core/src/airflow/ui/src/utils/datetimeUtils.test.ts

View workflow job for this annotation

GitHub Actions / Basic tests / React UI tests

src/utils/datetimeUtils.test.ts > getRelativeTime > returns relative time for a valid date

ReferenceError: getRelativeTime is not defined ❯ src/utils/datetimeUtils.test.ts:76:5

Check failure on line 76 in airflow-core/src/airflow/ui/src/utils/datetimeUtils.test.ts

View workflow job for this annotation

GitHub Actions / Basic tests / React UI tests

src/utils/datetimeUtils.test.ts > getRelativeTime > returns relative time for a valid date

ReferenceError: getRelativeTime is not defined ❯ src/utils/datetimeUtils.test.ts:76:5
});

it("returns an empty string for null or undefined dates", () => {
expect(getRelativeTime(null)).toBe("");

Check failure on line 80 in airflow-core/src/airflow/ui/src/utils/datetimeUtils.test.ts

View workflow job for this annotation

GitHub Actions / Basic tests / React UI tests

src/utils/datetimeUtils.test.ts > getRelativeTime > returns an empty string for null or undefined dates

ReferenceError: getRelativeTime is not defined ❯ src/utils/datetimeUtils.test.ts:80:5

Check failure on line 80 in airflow-core/src/airflow/ui/src/utils/datetimeUtils.test.ts

View workflow job for this annotation

GitHub Actions / Basic tests / React UI tests

src/utils/datetimeUtils.test.ts > getRelativeTime > returns an empty string for null or undefined dates

ReferenceError: getRelativeTime is not defined ❯ src/utils/datetimeUtils.test.ts:80:5
expect(getRelativeTime(undefined)).toBe("");
});

it("handles future dates", () => {
const futureDate = "2024-03-14T10:00:20.000Z";
expect(getRelativeTime(futureDate)).toBe("in a few seconds");

Check failure on line 86 in airflow-core/src/airflow/ui/src/utils/datetimeUtils.test.ts

View workflow job for this annotation

GitHub Actions / Basic tests / React UI tests

src/utils/datetimeUtils.test.ts > getRelativeTime > handles future dates

ReferenceError: getRelativeTime is not defined ❯ src/utils/datetimeUtils.test.ts:86:5

Check failure on line 86 in airflow-core/src/airflow/ui/src/utils/datetimeUtils.test.ts

View workflow job for this annotation

GitHub Actions / Basic tests / React UI tests

src/utils/datetimeUtils.test.ts > getRelativeTime > handles future dates

ReferenceError: getRelativeTime is not defined ❯ src/utils/datetimeUtils.test.ts:86:5
});
});
9 changes: 9 additions & 0 deletions airflow-core/src/airflow/ui/src/utils/datetimeUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import dayjs from "dayjs";
import dayjsDuration from "dayjs/plugin/duration";
import tz from "dayjs/plugin/timezone";
import relativeTime from "dayjs/plugin/relativeTime";

dayjs.extend(dayjsDuration);
dayjs.extend(relativeTime);
dayjs.extend(tz);

export const DEFAULT_DATETIME_FORMAT = "YYYY-MM-DD HH:mm:ss";
Expand Down Expand Up @@ -59,3 +61,10 @@ export const formatDate = (

return dayjs(date).tz(timezone).format(format);
};

export const getRelativeTime = (date: string | null | undefined): string => {
if (date != null && !date) {
return "";
}
return dayjs(date).fromNow();
};
Loading