-
Notifications
You must be signed in to change notification settings - Fork 16.4k
ECS Executor - add support to adopt orphaned tasks. #36803
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
21f2677
c26a4b1
6765f95
00b17be
11ec507
63b10c5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,7 +26,7 @@ | |
| import time | ||
| from collections import defaultdict, deque | ||
| from copy import deepcopy | ||
| from typing import TYPE_CHECKING | ||
| from typing import TYPE_CHECKING, Sequence | ||
|
|
||
| from botocore.exceptions import ClientError, NoCredentialsError | ||
|
|
||
|
|
@@ -47,12 +47,13 @@ | |
| exponential_backoff_retry, | ||
| ) | ||
| from airflow.providers.amazon.aws.hooks.ecs import EcsHook | ||
| from airflow.stats import Stats | ||
| from airflow.utils import timezone | ||
| from airflow.utils.helpers import merge_dicts | ||
| from airflow.utils.state import State | ||
|
|
||
| if TYPE_CHECKING: | ||
| from airflow.models.taskinstance import TaskInstanceKey | ||
| from airflow.models.taskinstance import TaskInstance, TaskInstanceKey | ||
| from airflow.providers.amazon.aws.executors.ecs.utils import ( | ||
| CommandType, | ||
| ExecutorConfigType, | ||
|
|
@@ -110,6 +111,11 @@ def __init__(self, *args, **kwargs): | |
| self.IS_BOTO_CONNECTION_HEALTHY = False | ||
|
|
||
| self.run_task_kwargs = self._load_run_kwargs() | ||
| self.adopt_task_instances = conf.getboolean( | ||
| CONFIG_GROUP_NAME, | ||
| AllEcsConfigKeys.ADOPT_TASK_INSTANCES, | ||
| fallback=CONFIG_DEFAULTS[AllEcsConfigKeys.ADOPT_TASK_INSTANCES], | ||
| ) | ||
|
|
||
| def start(self): | ||
| """Call this when the Executor is run for the first time by the scheduler.""" | ||
|
|
@@ -393,6 +399,9 @@ def attempt_task_runs(self): | |
| else: | ||
| task = run_task_response["tasks"][0] | ||
| self.active_workers.add_task(task, task_key, queue, cmd, exec_config, attempt_number) | ||
| # Add Fargate task ARN to executor event buffer, which gets saved | ||
| # in TaskInstance.external_executor_id. | ||
| self.event_buffer[task_key] = (State.QUEUED, task.task_arn) | ||
| if failure_reasons: | ||
| self.log.error( | ||
| "Pending ECS tasks failed to launch for the following reasons: %s. Retrying later.", | ||
|
|
@@ -444,6 +453,10 @@ def execute_async(self, key: TaskInstanceKey, command: CommandType, queue=None, | |
|
|
||
| def end(self, heartbeat_interval=10): | ||
| """Wait for all currently running tasks to end, and don't launch any tasks.""" | ||
| if self.adopt_task_instances: | ||
| self.log.info("Task adoption is enabled, not terminating tasks.") | ||
| return | ||
|
|
||
| try: | ||
| while True: | ||
| self.sync() | ||
|
|
@@ -456,7 +469,11 @@ def end(self, heartbeat_interval=10): | |
| self.log.exception("Failed to end %s", self.__class__.__name__) | ||
|
|
||
| def terminate(self): | ||
| """Kill all ECS processes by calling Boto3's StopTask API.""" | ||
| """Kill all ECS processes by calling Boto3's StopTask API if adopt_task_instances option is False.""" | ||
| if self.adopt_task_instances: | ||
| self.log.info("Task adoption is enabled, not terminating tasks.") | ||
| return | ||
|
|
||
| try: | ||
| for arn in self.active_workers.get_all_arns(): | ||
| self.ecs.stop_task( | ||
|
|
@@ -493,3 +510,49 @@ def get_container(self, container_list): | |
| 'container "name" must be provided in "containerOverrides" configuration' | ||
| ) | ||
| raise KeyError(f"No such container found by container name: {self.container_name}") | ||
|
|
||
| def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]: | ||
ferruzzi marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| """ | ||
| Try to adopt running task instances if adopt_task_instances option is set to True. | ||
|
|
||
| These tasks instances should have an ECS process which can be adopted by the unique task ARN. | ||
|
||
| Anything that is not adopted will be cleared by the scheduler and becomes eligible for re-scheduling. | ||
| """ | ||
| if not self.adopt_task_instances: | ||
| # Do not try to adopt task instances, return all orphaned tasks for clearing. | ||
| return tis | ||
|
|
||
| self.log.warning("Task adoption is on. ECS Executor attempting to adopt tasks.") | ||
|
|
||
| with Stats.timer("ecs_executor.adopt_task_instances.duration"): | ||
| adopted_tis: list[TaskInstance] = [] | ||
|
|
||
| from pprint import pformat | ||
| self.log.warning("tis: \n%s", pformat([vars(ti) for ti in tis])) | ||
|
|
||
| if task_arns := [ti.external_executor_id for ti in tis if ti.external_executor_id]: | ||
| task_descriptions = self.__describe_tasks(task_arns).get("tasks", []) | ||
|
|
||
| for task in task_descriptions: | ||
| ti = [ti for ti in tis if ti.external_executor_id == task.task_arn][0] | ||
| self.active_workers.add_task( | ||
| task, | ||
| ti.key, | ||
| ti.queue, | ||
| ti.command_as_list(), | ||
| ti.executor_config, | ||
| ti.prev_attempted_tries, | ||
| ) | ||
| adopted_tis.append(ti) | ||
|
|
||
| if adopted_tis: | ||
| tasks = [f"{task} in state {task.state}" for task in adopted_tis] | ||
| task_instance_str = "\n\t".join(tasks) | ||
| self.log.info( | ||
| "Adopted the following %d tasks from a dead executor:\n\t%s", | ||
| len(adopted_tis), | ||
| task_instance_str, | ||
| ) | ||
|
|
||
| not_adopted_tis = [ti for ti in tis if ti not in adopted_tis] | ||
| return not_adopted_tis | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have
self.successandself.fail(e.g. link) which handles the other state changes. These are provided by the base executor. I wonder if we should add a new method for putting a task in queued state 🤔 but this might cause weird issues if a provider which contains an executor is installed alongside an older version of airflow... (also, not required for this PR, just thinking out loud)