Skip to content

Commit

Permalink
Merge pull request #2768 from PrefectHQ/fargate_enhancements
Browse files Browse the repository at this point in the history
Add config validation utility to Fargate Agent
  • Loading branch information
joshmeek authored Jun 15, 2020
2 parents 89e6f69 + f7be4f7 commit 737b01d
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 1 deletion.
2 changes: 2 additions & 0 deletions changes/pr2768.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
enhancement:
- "Add `validate_configuration` utility to Fargate Agent for verifying it can manage tasks properly - [#2768](https://github.com/PrefectHQ/prefect/pull/2768)"
6 changes: 5 additions & 1 deletion docs/orchestration/agents/fargate.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ The Fargate Agent allows for a set of AWS configuration options to be set or pro

While the above configuration options allow for the initialization of the boto3 client, you may also need to specify the arguments that allow for the registering and running of Fargate task definitions. The Fargate Agent makes no assumptions on how your particular AWS configuration is set up and instead has a `kwargs` argument which will accept any arguments for boto3's `register_task_definition` and `run_task` functions.

::: tip Validating Configuration
The Fargate Agent has a utility function [`validate_configuration`](/api/latest/agent/fargate.html#fargateagent) which can be used to test the configuration options set on the agent to ensure is it able to register the task definition and run the task.
:::

Accepted kwargs for [`register_task_definition`](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs.html#ECS.Client.register_task_definition):

```
Expand All @@ -142,7 +146,7 @@ mountPoints list
logConfiguration dict
```

Environment was added to support adding flow level environment variables via the `use_external_kwargs` described later on in the documentation.
Environment was added to support adding flow level environment variables via the `use_external_kwargs` described later on in the documentation.
You should continue to use the `env_vars` kwarg to pass agent level environment variables to your tasks.

This adds support for Native AWS Secrets Manager and/or Parameter Store in your flows.
Expand Down
75 changes: 75 additions & 0 deletions src/prefect/agent/fargate/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
from ast import literal_eval
from typing import Iterable
import uuid

from slugify import slugify

Expand Down Expand Up @@ -683,6 +684,80 @@ def _run_task(

return task["tasks"][0].get("taskArn")

def validate_configuration(self) -> None:
"""
Utility function for testing Agent's configuration. This function is helpful in
determining if the provided configuration for the Agent is able to register a
task definition and then subsequently run the task.
"""
task_name = f"prefect-test-task-{str(uuid.uuid4())[:8]}"

# Populate container definition with provided kwargs
flow_container_definitions_kwargs = copy.deepcopy(
self.container_definitions_kwargs
)

container_definitions = [
{
"name": "test-container",
"image": "busybox",
"command": ["/bin/sh", "-c", "echo 'I am alive!'"],
"environment": [],
"secrets": [],
"mountPoints": [],
"logConfiguration": {},
"essential": True,
}
]

base_envar_keys = [x["name"] for x in container_definitions[0]["environment"]] # type: ignore
container_definitions_environment = [
x
for x in flow_container_definitions_kwargs.get("environment", [])
if x["name"] not in base_envar_keys
]
container_definitions[0]["environment"].extend( # type: ignore
container_definitions_environment
)
container_definitions[0]["secrets"] = flow_container_definitions_kwargs.get(
"secrets", []
)
container_definitions[0]["mountPoints"] = flow_container_definitions_kwargs.get(
"mountPoints", []
)
container_definitions[0][
"logConfiguration"
] = flow_container_definitions_kwargs.get("logConfiguration", {})

# Register task definition
flow_task_definition_kwargs = copy.deepcopy(self.task_definition_kwargs)

if self.launch_type:
flow_task_definition_kwargs["requiresCompatibilities"] = [self.launch_type]

self.logger.info("Testing task definition registration...")
self.boto3_client.register_task_definition(
family=task_name,
containerDefinitions=container_definitions,
networkMode="awsvpc",
**flow_task_definition_kwargs,
)
self.logger.info("Task definition registration successful")

# Run task
flow_task_run_kwargs = copy.deepcopy(self.task_run_kwargs)

if self.launch_type:
flow_task_run_kwargs["launchType"] = self.launch_type

self.logger.info("Testing task run...")
task = self.boto3_client.run_task(
taskDefinition=task_name,
overrides={"containerOverrides": []},
**flow_task_run_kwargs,
)
self.logger.info(f"Task run {task['tasks'][0].get('taskArn')} successful")


if __name__ == "__main__":
FargateAgent().start()
41 changes: 41 additions & 0 deletions tests/agent/test_fargate_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -1973,3 +1973,44 @@ def test_fargate_agent_start_max_polls_zero(monkeypatch, runner_token):
assert on_shutdown.call_count == 1
assert agent_process.call_count == 0
assert heartbeat.call_count == 0


def test_agent_configuration_utility(monkeypatch, runner_token):
boto3_client = MagicMock()

boto3_client.run_task.return_value = {"tasks": [{"taskArn": "test"}]}

monkeypatch.setattr("boto3.client", MagicMock(return_value=boto3_client))

kwarg_dict = {
"cluster": "cluster",
"networkConfiguration": {
"awsvpcConfiguration": {
"subnets": ["subnet"],
"assignPublicIp": "DISABLED",
"securityGroups": ["security_group"],
}
},
}

agent = FargateAgent(
aws_access_key_id="id",
aws_secret_access_key="secret",
aws_session_token="token",
region_name="region",
**kwarg_dict
)
agent.validate_configuration()

assert boto3_client.register_task_definition.called
assert boto3_client.run_task.called
assert boto3_client.run_task.call_args[1]["cluster"] == "cluster"
assert "prefect-test-task" in boto3_client.run_task.call_args[1]["taskDefinition"]
assert boto3_client.run_task.call_args[1]["launchType"] == "FARGATE"
assert boto3_client.run_task.call_args[1]["networkConfiguration"] == {
"awsvpcConfiguration": {
"subnets": ["subnet"],
"assignPublicIp": "DISABLED",
"securityGroups": ["security_group"],
}
}

0 comments on commit 737b01d

Please sign in to comment.