Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
e9e45a2
Added a confirm dialog modal that appears only when the task is in ru…
kncatalan Sep 15, 2025
fb4e814
Removed unnecessary comments
kncatalan Sep 15, 2025
a6606a5
Added clearTask texts in dags.json in the EN locale. Used translate i…
kncatalan Sep 18, 2025
28ccb4a
(NON-WORKING/ Experimental) added a flag in taskinstance.py. backend …
kncatalan Sep 18, 2025
68cfb78
(EXPERIMENTAL) Removed the confirm pop-up and replaced with checkbox …
kncatalan Sep 19, 2025
4dc0cde
Added mirgin-right auto for the checkbox
kncatalan Sep 19, 2025
19bca7c
Change isRunning to task_confirmed_running. Moved the HTTPException t…
kncatalan Sep 22, 2025
0571c4c
Made the checkbox checked on default. is_running_message set to true …
kncatalan Sep 23, 2025
cebfb32
Added the code 400 to line 688.
kncatalan Sep 23, 2025
dd74fb3
Cleaned up the code and attempted to adress the error in the tests, e…
kncatalan Sep 23, 2025
37e0249
Made sure the date variable is not null and changed the string in the…
kncatalan Sep 26, 2025
96e3910
Changed the hardcoded english into an i18n translate. Added preventRu…
kncatalan Sep 26, 2025
fdcaeb7
Renamed the is_running_message to prevent_running_tasks. Removed prev…
kncatalan Sep 29, 2025
e096164
Added a custom airflow exception for clearing running task instance. …
kncatalan Oct 2, 2025
4a04425
Added retry 0 in use clearTaskInstances.
kncatalan Oct 2, 2025
7230e35
Added a conditional for error toast that states the task is running i…
kncatalan Oct 2, 2025
a86bc34
added check for null in error.detail
kncatalan Oct 2, 2025
3b6eade
removed unnecessary imports in task_instances.py
kncatalan Oct 2, 2025
e0687d1
made the prevent_running_task optional to pass checks
kncatalan Oct 3, 2025
c337e13
Merge branch 'main' into add-modal-confirmation-dialog
kncatalan Oct 3, 2025
400d79e
Changed prevent_running_task to Optional[bool]=None to be optional in…
kncatalan Oct 3, 2025
bef19e4
Attempt to resolve static checks which are unsafe assignment and call…
kncatalan Oct 7, 2025
37ca770
Added flag for QUEUED and SCHEDULED in taskinstances.py
kncatalan Oct 10, 2025
37bf1c6
Added a modal for QUEUED and SCHEDULED states.
kncatalan Oct 10, 2025
b126b15
Added the text of the modal in the en locale.
kncatalan Oct 13, 2025
36363cd
Changed the contents of the modal to include the user that started th…
kncatalan Oct 13, 2025
7c29251
Changed taskinstance.py according to the comments, and changed the mo…
kncatalan Oct 13, 2025
8e5a20e
Attempt to resolve static checks
kncatalan Oct 14, 2025
f5286a3
Attempt to resolve assertion errors of prevent_running_task = false i…
kncatalan Oct 14, 2025
fd12fa1
Attempt to fix the restarting state returned by clear_task_instances …
kncatalan Oct 14, 2025
f26ca41
2nd Attempt to resolve static checks
kncatalan Oct 14, 2025
e3e5aae
Used common:error.defaultMessage for the default error toast. Added u…
kncatalan Oct 16, 2025
09e1a4f
Resolve the dateTimeUtils.getRelativeTime not returning an empty stri…
kncatalan Oct 17, 2025
0a5fa3a
Merge branch 'main' into add-checkbox-prevent-running-clear-task
kncatalan Oct 19, 2025
6185e17
Attempt to resolve 7 eslint static checks error. Shortened conditiona…
kncatalan Oct 20, 2025
7740e2f
Resolving eslint errors in UseClearTaskInstance and ClearTaskInstance…
kncatalan Oct 22, 2025
a4b3eae
Attempt to resolve detail in useClearTaskInstances not triggering typ…
kncatalan Oct 22, 2025
39d91cc
Attempt to resolve static checks.
kncatalan Oct 23, 2025
8589078
Merge branch 'main' into add-checkbox-prevent-running-clear-task
kncatalan Oct 23, 2025
fd0fa5a
Merge branch 'main' into add-checkbox-prevent-running-clear-task
kncatalan Oct 24, 2025
e0b4be2
Merge branch 'main' into add-checkbox-prevent-running-clear-task
kncatalan Oct 28, 2025
54a65ae
Merge branch 'main' into add-checkbox-prevent-running-clear-task
kncatalan Nov 2, 2025
9ae6183
Removed kwargs inspect, added back prevent_running_task, and edited t…
kncatalan Nov 4, 2025
a073aef
Changed error code 400 to 409. Added the pre-commit modification.
kncatalan Nov 5, 2025
73c64e2
Fixed missing detail in the error toast.
kncatalan Nov 6, 2025
3f3473c
Merge branch 'main' into add-checkbox-prevent-running-clear-task
kncatalan Nov 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ class ClearTaskInstancesBody(StrictBaseModel):
description="(Experimental) Run on the latest bundle version of the dag after "
"clearing the task instances.",
)
prevent_running_task: bool = False

@model_validator(mode="before")
@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7083,6 +7083,12 @@ paths:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'409':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Conflict
'422':
description: Validation Error
content:
Expand Down Expand Up @@ -9605,6 +9611,10 @@ components:
description: (Experimental) Run on the latest bundle version of the dag
after clearing the task instances.
default: false
prevent_running_task:
type: boolean
title: Prevent Running Task
default: false
additionalProperties: false
type: object
title: ClearTaskInstancesBody
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
_patch_ti_validate_request,
)
from airflow.api_fastapi.logging.decorators import action_logging
from airflow.exceptions import TaskNotFound
from airflow.exceptions import AirflowClearRunningTaskException, TaskNotFound
from airflow.models import Base, DagRun
from airflow.models.taskinstance import TaskInstance as TI, clear_task_instances
from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH
Expand Down Expand Up @@ -711,7 +711,7 @@ def get_mapped_task_instance_try_details(

@task_instances_router.post(
"/clearTaskInstances",
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND, status.HTTP_409_CONFLICT]),
dependencies=[
Depends(action_logging()),
Depends(requires_access_dag(method="PUT", access_entity=DagAccessEntity.TASK_INSTANCE)),
Expand Down Expand Up @@ -805,12 +805,16 @@ def post_clear_task_instances(
)

if not dry_run:
clear_task_instances(
task_instances,
session,
DagRunState.QUEUED if reset_dag_runs else False,
run_on_latest_version=body.run_on_latest_version,
)
try:
clear_task_instances(
task_instances,
session,
DagRunState.QUEUED if reset_dag_runs else False,
run_on_latest_version=body.run_on_latest_version,
prevent_running_task=body.prevent_running_task,
)
except AirflowClearRunningTaskException as e:
raise HTTPException(status.HTTP_409_CONFLICT, str(e)) from e

return TaskInstanceCollectionResponse(
task_instances=[TaskInstanceResponse.model_validate(ti) for ti in task_instances],
Expand Down
4 changes: 4 additions & 0 deletions airflow-core/src/airflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,3 +535,7 @@ def __getattr__(name: str):
return AirflowDagCycleException

raise AttributeError(f"module '{__name__}' has no attribute '{name}'")


class AirflowClearRunningTaskException(AirflowException):
"""Raise when the user attempts to clear currently running tasks."""
14 changes: 12 additions & 2 deletions airflow-core/src/airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ def clear_task_instances(
session: Session,
dag_run_state: DagRunState | Literal[False] = DagRunState.QUEUED,
run_on_latest_version: bool = False,
prevent_running_task: bool | None = None,
) -> None:
"""
Clear a set of task instances, but make sure the running ones get killed.
Expand All @@ -213,16 +214,25 @@ def clear_task_instances(
:meta private:
"""
task_instance_ids: list[str] = []
from airflow.exceptions import AirflowClearRunningTaskException
from airflow.models.dagbag import DBDagBag

scheduler_dagbag = DBDagBag(load_op_links=False)
for ti in tis:
task_instance_ids.append(ti.id)
ti.prepare_db_for_next_try(session)

if ti.state == TaskInstanceState.RUNNING:
# If a task is cleared when running, set its state to RESTARTING so that
# the task is terminated and becomes eligible for retry.
if prevent_running_task:
raise AirflowClearRunningTaskException(
"AirflowClearRunningTaskException: Disable 'prevent_running_task' to proceed, or wait until the task is not running, queued, or scheduled state."
)
# Prevents the task from re-running and clearing when prevent_running_task from the frontend and the tas is running is True.

ti.state = TaskInstanceState.RESTARTING
# If a task is cleared when running and the prevent_running_task is false,
# set its state to RESTARTING so that
# the task is terminated and becomes eligible for retry.
else:
dr = ti.dag_run
if run_on_latest_version:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1333,6 +1333,11 @@ export const $ClearTaskInstancesBody = {
title: 'Run On Latest Version',
description: '(Experimental) Run on the latest bundle version of the dag after clearing the task instances.',
default: false
},
prevent_running_task: {
type: 'boolean',
title: 'Prevent Running Task',
default: false
}
},
additionalProperties: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2557,6 +2557,7 @@ export class TaskInstanceService {
401: 'Unauthorized',
403: 'Forbidden',
404: 'Not Found',
409: 'Conflict',
422: 'Validation Error'
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ export type ClearTaskInstancesBody = {
* (Experimental) Run on the latest bundle version of the dag after clearing the task instances.
*/
run_on_latest_version?: boolean;
prevent_running_task?: boolean;
};

/**
Expand Down Expand Up @@ -5436,6 +5437,10 @@ export type $OpenApiTs = {
* Not Found
*/
404: HTTPExceptionResponse;
/**
* Conflict
*/
409: HTTPExceptionResponse;
/**
* Validation Error
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,12 @@
"past": "Past",
"queueNew": "Queue up new tasks",
"runOnLatestVersion": "Run with latest bundle version",
"upstream": "Upstream"
"upstream": "Upstream",
"preventRunningTasks": "Prevent rerun if task is running"
},
"confirmationDialog": {
"title": "Cannot Clear Task Instance",
"description": "Task is currently in a {{state}} state started by user {{user}} at {{time}}. \nThe user is unable to clear this task until it is done running or a user unchecks the \"Prevent rerun of running tasks\" option in the clear task dialog."
}
},
"search": {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*!
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { useEffect, useState, useCallback } from "react";
import { VStack, Icon, Text, Spinner } from "@chakra-ui/react";
import { GoAlertFill } from "react-icons/go";
import { useTranslation } from "react-i18next";
import { Button, Dialog } from "src/components/ui";
import { useClearTaskInstancesDryRun } from "src/queries/useClearTaskInstancesDryRun";
import { getRelativeTime } from "src/utils/datetimeUtils";

type Props = {
readonly dagDetails?: {
dagId: string;
dagRunId: string;
downstream?: boolean;
future?: boolean;
mapIndex?: number;
onlyFailed?: boolean;
past?: boolean;
taskId: string;
upstream?: boolean;
};
readonly onClose: () => void;
readonly onConfirm?: () => void;
readonly open: boolean;
readonly preventRunningTask: boolean;
};

const ClearTaskInstanceConfirmationDialog = ({
dagDetails,
onClose,
onConfirm,
open,
preventRunningTask,
}: Props) => {
const { t: translate } = useTranslation();
const { data, isFetching } = useClearTaskInstancesDryRun({
dagId: dagDetails?.dagId ?? "",
options: {
enabled: open && Boolean(dagDetails),
gcTime: 0,
refetchOnMount: "always",
refetchOnWindowFocus: false,
staleTime: 0,
},
requestBody: {
dag_run_id: dagDetails?.dagRunId ?? "",
include_downstream: dagDetails?.downstream,
include_future: dagDetails?.future,
include_past: dagDetails?.past,
include_upstream: dagDetails?.upstream,
only_failed: dagDetails?.onlyFailed,
task_ids: [[dagDetails?.taskId ?? "", dagDetails?.mapIndex ?? 0]],
},
});

const [isReady, setIsReady] = useState(false);

const handleConfirm = useCallback(() => {
if (onConfirm) onConfirm();
onClose();
}, [onConfirm, onClose]);

const taskInstances = data?.task_instances ?? [];
const [firstInstance] = taskInstances;
const taskCurrentState = firstInstance?.state;

useEffect(() => {
if (!isFetching && open && data) {
const isInTriggeringState =
taskCurrentState === "queued" || taskCurrentState === "scheduled";

if (!preventRunningTask || !isInTriggeringState) {
handleConfirm();
} else {
setIsReady(true);
}
}
}, [isFetching, data, open, handleConfirm, taskCurrentState, preventRunningTask]);

return (
<Dialog.Root lazyMount onOpenChange={onClose} open={open}>
<Dialog.Content backdrop>
{isFetching ? (
<VStack align="center" gap={3} justify="center" py={8}>
<Spinner size="lg" />
<Text color="fg.solid" fontSize="md">
{translate("common:task.documentation")}
</Text>
</VStack>
) : isReady ? (
<>
<Dialog.Header>
<VStack align="start" gap={4}>
<Dialog.Title>
<Icon color="tomato" size="lg" pr="2">
<GoAlertFill />
</Icon>
{translate("dags:runAndTaskActions.confirmationDialog.title")}
</Dialog.Title>
<Dialog.Description>
{taskInstances.length > 0 && (
<>
{translate(
"dags:runAndTaskActions.confirmationDialog.description",
{
state: taskCurrentState,
time:
firstInstance?.start_date !== null && firstInstance?.start_date !== undefined
? getRelativeTime(firstInstance.start_date)
: undefined,
user:
(firstInstance?.unixname?.trim().length ?? 0) > 0
? firstInstance?.unixname
: "unknown user",
}
)}
</>
)}
</Dialog.Description>
</VStack>
</Dialog.Header>
<Dialog.Footer>
<Button colorPalette="blue" onClick={onClose}>
{translate("common:modal.confirm")}
</Button>
</Dialog.Footer>
</>
) : null}
</Dialog.Content>
</Dialog.Root>
);
};

export default ClearTaskInstanceConfirmationDialog;
Loading
Loading