Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions airflow/providers/amazon/aws/executors/ecs/ecs_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
)
from airflow.providers.amazon.aws.hooks.ecs import EcsHook
from airflow.utils import timezone
from airflow.utils.helpers import merge_dicts
from airflow.utils.state import State

if TYPE_CHECKING:
Expand Down Expand Up @@ -408,8 +409,8 @@ def _run_task(
The command and executor config will be placed in the container-override
section of the JSON request before calling Boto3's "run_task" function.
"""
run_task_api = self._run_task_kwargs(task_id, cmd, queue, exec_config)
boto_run_task = self.ecs.run_task(**run_task_api)
run_task_kwargs = self._run_task_kwargs(task_id, cmd, queue, exec_config)
boto_run_task = self.ecs.run_task(**run_task_kwargs)
run_task_response = BotoRunTaskSchema().load(boto_run_task)
return run_task_response

Expand All @@ -421,17 +422,17 @@ def _run_task_kwargs(

One last chance to modify Boto3's "run_task" kwarg params before it gets passed into the Boto3 client.
"""
run_task_api = deepcopy(self.run_task_kwargs)
container_override = self.get_container(run_task_api["overrides"]["containerOverrides"])
run_task_kwargs = deepcopy(self.run_task_kwargs)
run_task_kwargs = merge_dicts(run_task_kwargs, exec_config)
container_override = self.get_container(run_task_kwargs["overrides"]["containerOverrides"])
container_override["command"] = cmd
container_override.update(exec_config)

# Inject the env variable to configure logging for containerized execution environment
if "environment" not in container_override:
container_override["environment"] = []
container_override["environment"].append({"name": "AIRFLOW_IS_EXECUTOR_CONTAINER", "value": "true"})

return run_task_api
return run_task_kwargs

def execute_async(self, key: TaskInstanceKey, command: CommandType, queue=None, executor_config=None):
"""Save the task to be executed in the next sync by inserting the commands into a queue."""
Expand Down Expand Up @@ -484,6 +485,11 @@ def _load_run_kwargs(self) -> dict:
def get_container(self, container_list):
"""Searches task list for core Airflow container."""
for container in container_list:
if container["name"] == self.container_name:
return container
try:
if container["name"] == self.container_name:
return container
except KeyError:
raise EcsExecutorException(
'container "name" must be provided in "containerOverrides" configuration'
)
raise KeyError(f"No such container found by container name: {self.container_name}")
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ In the case of conflicts, the order of precedence from lowest to highest is:
3. Load any values provided in the RUN_TASK_KWARGS option if one is
provided.

.. note::
``exec_config`` is an optional parameter that can be provided to operators. It is a dictionary type and in the context of the ECS Executor it represents a ``run_task_kwargs`` configuration which is then updated over-top of the ``run_task_kwargs`` specified in Airflow config above (if present). It is a recursive update which essentially applies Python update to each nested dictionary in the configuration. Loosely approximated as: ``run_task_kwargs.update(exec_config)``

Required config options:
~~~~~~~~~~~~~~~~~~~~~~~~

Expand All @@ -88,7 +91,7 @@ Optional config options:

- ASSIGN_PUBLIC_IP - Whether to assign a public IP address to the
containers launched by the ECS executor. Defaults to "False".
- CONN_ID - The Airflow connection (i.e. credentials) used by the ECS
- AWS_CONN_ID - The Airflow connection (i.e. credentials) used by the ECS
executor to make API calls to AWS ECS. Defaults to "aws_default".
- LAUNCH_TYPE - Launch type can either be 'FARGATE' OR 'EC2'. Defaults
to "FARGATE".
Expand All @@ -113,6 +116,9 @@ For a more detailed description of available options, including type
hints and examples, see the ``config_templates`` folder in the Amazon
provider package.

.. note::
``exec_config`` is an optional parameter that can be provided to operators. It is a dictionary type and in the context of the ECS Executor it represents a ``run_task_kwargs`` configuration which is then updated over-top of the ``run_task_kwargs`` specified in Airflow config above (if present). It is a recursive update which essentially applies Python update to each nested dictionary in the configuration. Loosely approximated as: ``run_task_kwargs.update(exec_config)``

.. _dockerfile_for_ecs_executor:

Dockerfile for ECS Executor
Expand Down
218 changes: 218 additions & 0 deletions tests/providers/amazon/aws/executors/ecs/test_ecs_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

from airflow.exceptions import AirflowException
from airflow.executors.base_executor import BaseExecutor
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.providers.amazon.aws.executors.ecs import ecs_executor, ecs_executor_config
from airflow.providers.amazon.aws.executors.ecs.boto_schema import BotoTaskSchema
from airflow.providers.amazon.aws.executors.ecs.ecs_executor import (
Expand Down Expand Up @@ -1156,3 +1157,220 @@ def test_providing_no_capacity_provider_no_lunch_type_no_cluster_default(self, m

task_kwargs = ecs_executor_config.build_task_kwargs()
assert task_kwargs["launchType"] == "FARGATE"

@pytest.mark.parametrize(
"run_task_kwargs, exec_config, expected_result",
[
# No input run_task_kwargs or executor overrides
(
{},
{},
{
"taskDefinition": "some-task-def",
"launchType": "FARGATE",
"cluster": "some-cluster",
"platformVersion": "LATEST",
"count": 1,
"overrides": {
"containerOverrides": [
{
"command": ["command"],
"name": "container-name",
"environment": [{"name": "AIRFLOW_IS_EXECUTOR_CONTAINER", "value": "true"}],
}
]
},
"networkConfiguration": {
"awsvpcConfiguration": {
"subnets": ["sub1", "sub2"],
"securityGroups": ["sg1", "sg2"],
"assignPublicIp": "DISABLED",
}
},
},
),
# run_task_kwargs provided, not exec_config
(
{
"startedBy": "Banana",
"tags": [{"key": "FOO", "value": "BAR"}],
"overrides": {
"containerOverrides": [
{
"name": "container-name",
"memory": 500,
"cpu": 10,
"environment": [{"name": "X", "value": "Y"}],
}
]
},
},
{},
{
"startedBy": "Banana",
"tags": [{"key": "FOO", "value": "BAR"}],
"taskDefinition": "some-task-def",
"launchType": "FARGATE",
"cluster": "some-cluster",
"platformVersion": "LATEST",
"count": 1,
"overrides": {
"containerOverrides": [
{
"memory": 500,
"cpu": 10,
"command": ["command"],
"name": "container-name",
"environment": [
{"name": "X", "value": "Y"},
# Added by the ecs executor
{"name": "AIRFLOW_IS_EXECUTOR_CONTAINER", "value": "true"},
],
}
]
},
# Added by the ecs executor
"networkConfiguration": {
"awsvpcConfiguration": {
"subnets": ["sub1", "sub2"],
"securityGroups": ["sg1", "sg2"],
"assignPublicIp": "DISABLED",
}
},
},
),
# exec_config provided, no run_task_kwargs
(
{},
{
"startedBy": "Banana",
"tags": [{"key": "FOO", "value": "BAR"}],
"overrides": {
"containerOverrides": [
{
"name": "container-name",
"memory": 500,
"cpu": 10,
"environment": [{"name": "X", "value": "Y"}],
}
]
},
},
{
"startedBy": "Banana",
"tags": [{"key": "FOO", "value": "BAR"}],
"taskDefinition": "some-task-def",
"launchType": "FARGATE",
"cluster": "some-cluster",
"platformVersion": "LATEST",
"count": 1,
"overrides": {
"containerOverrides": [
{
"memory": 500,
"cpu": 10,
"command": ["command"],
"name": "container-name",
"environment": [
{"name": "X", "value": "Y"},
# Added by the ecs executor
{"name": "AIRFLOW_IS_EXECUTOR_CONTAINER", "value": "true"},
],
}
]
},
# Added by the ecs executor
"networkConfiguration": {
"awsvpcConfiguration": {
"subnets": ["sub1", "sub2"],
"securityGroups": ["sg1", "sg2"],
"assignPublicIp": "DISABLED",
}
},
},
),
# Both run_task_kwargs and executor_config provided. The latter should override the former,
# following a recursive python dict update strategy
(
{
"startedBy": "Banana",
"tags": [{"key": "FOO", "value": "BAR"}],
"taskDefinition": "foobar",
"overrides": {
"containerOverrides": [
{
"name": "container-name",
"memory": 500,
"cpu": 10,
"environment": [{"name": "X", "value": "Y"}],
}
]
},
},
{
"startedBy": "Fish",
"tags": [{"key": "X", "value": "Y"}, {"key": "W", "value": "Z"}],
"overrides": {
"containerOverrides": [
{
"name": "container-name",
"memory": 300,
"environment": [{"name": "W", "value": "Z"}],
}
]
},
},
{
# tags and startedBy are overridden by exec_config
"startedBy": "Fish",
# List types overwrite entirely, as python dict update would do
"tags": [{"key": "X", "value": "Y"}, {"key": "W", "value": "Z"}],
# taskDefinition remains since it is not a list type and not overridden by exec config
"taskDefinition": "foobar",
"launchType": "FARGATE",
"cluster": "some-cluster",
"platformVersion": "LATEST",
"count": 1,
"overrides": {
"containerOverrides": [
{
"memory": 300,
# cpu is not present because it was missing from the container overrides in
# the exec_config
"command": ["command"],
"name": "container-name",
"environment": [
# Overridden list type
{"name": "W", "value": "Z"}, # Only new env vars present, overwritten
# Added by the ecs executor
{"name": "AIRFLOW_IS_EXECUTOR_CONTAINER", "value": "true"},
],
}
]
},
# Added by the ecs executor
"networkConfiguration": {
"awsvpcConfiguration": {
"subnets": ["sub1", "sub2"],
"securityGroups": ["sg1", "sg2"],
"assignPublicIp": "DISABLED",
}
},
},
),
],
)
def test_run_task_kwargs_exec_config_overrides(
self, set_env_vars, run_task_kwargs, exec_config, expected_result
):
run_task_kwargs_env_key = f"AIRFLOW__{CONFIG_GROUP_NAME}__{AllEcsConfigKeys.RUN_TASK_KWARGS}".upper()
os.environ[run_task_kwargs_env_key] = json.dumps(run_task_kwargs)

mock_ti_key = mock.Mock(spec=TaskInstanceKey)
command = ["command"]

executor = AwsEcsExecutor()

final_run_task_kwargs = executor._run_task_kwargs(mock_ti_key, command, "queue", exec_config)

assert final_run_task_kwargs == expected_result