Skip to content

Event driven dra #58

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 71 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
6c27582
Call creator lambda with suffix
heerener Jun 11, 2025
171926d
correct ClusterName in the json file
WeinaJi Jun 11, 2025
1420e79
move to include_lustre flag instead of dev flag
heerener Jun 11, 2025
06e9546
Bugfix in pcluster_manager
heerener Jun 11, 2025
8a4a664
Too much popping; it's not bubble wrap!
heerener Jun 11, 2025
4fa930f
Cleaner way than multiple pops
heerener Jun 11, 2025
915cd36
dynamodb: cluster actions pt 1
heerener Jun 11, 2025
a748229
Register and delete cluster in dynamo
heerener Jun 11, 2025
192ea9f
Lock library
heerener Jun 6, 2025
5ab499b
release_subnets: debug logging
heerener Jun 11, 2025
132ae90
cluster_name -> cluster.name for 80_cloudwatch_agent_config_prolog.sh…
heerener Jun 11, 2025
a8ef26f
pcluster_delete: debug logging
heerener Jun 11, 2025
3e20bc4
dynamodb actions on Table, not on dynamodb resource
heerener Jun 11, 2025
033a559
Register cluster with all parameters
heerener Jun 11, 2025
031614f
No waiting for fsx/dra, exit after first fsx precreate
heerener Jun 11, 2025
c2a3bcc
Log dra_ready event
heerener Jun 11, 2025
ba9a513
Handle EFS events
heerener Jun 11, 2025
89272ba
get_fsx_name: argument correction
heerener Jun 12, 2025
2b14a61
test_delete: assert dynamodb also cleared
heerener Jun 12, 2025
f53b904
test_{vlab,project}_id_not_specified: add path to event
heerener Jun 12, 2025
3cbc750
Change path for DRA callback and cluster comparison
heerener Jun 12, 2025
1437c65
WIP: fix POST tests
heerener Jun 13, 2025
b9e53da
WIP: create eventbridge timed rule
heerener Jun 16, 2025
81fc662
add benchmarks environment
heerener Jun 13, 2025
ca6d02d
Custom AMI ID from env var
heerener Jun 16, 2025
6ea1541
bump aws-parallelcluster
heerener Jun 16, 2025
b02d4c0
Infra bucket as env var
heerener Jun 16, 2025
edf19f3
bucket prefix
heerener Jun 16, 2025
206d7c9
replace instead of lstrip
heerener Jun 16, 2025
6e3a21b
Regular checks to precreate DRAs one at a time
heerener Jun 17, 2025
782264b
Small fixes and more logging
heerener Jun 18, 2025
3332921
Claim cluster + fixes for get unclaimed clusters
heerener Jun 18, 2025
5c4d51f
Debug logging
heerener Jun 18, 2025
6eb8b30
Bugfixes in pcluster_manager
heerener Jun 18, 2025
c5193b6
Sort out which functions need to do what
heerener Jun 18, 2025
88e0991
Log response
heerener Jun 18, 2025
9a45775
Fix easy-to-fix failing tests and add new dynamo actions test
heerener Jun 19, 2025
68294d8
New DRA check tests
heerener Jun 19, 2025
a5c3c8a
Process all clusters on DRA call
heerener Jun 19, 2025
8bda0a6
shared is defined in the filesystems data
heerener Jun 19, 2025
a87118b
aws-parallelcluster back to 3.13.0
heerener Jun 19, 2025
ad658ae
claimed should be provisioning_launched
heerener Jun 19, 2025
493a66a
Response for all unclaimed clusters
heerener Jun 19, 2025
3b61363
Too much s3://
heerener Jun 19, 2025
df06814
fsx name in pcluster config cannot be more than 30 characters
heerener Jun 19, 2025
8b2a72b
Create multiple DRAs per filesystem
heerener Jun 19, 2025
bee6584
Create / delete eventbridge rule and delete fsx on cluster delete
heerener Jun 19, 2025
fca67e7
WIP: README updates
heerener Jun 19, 2025
bf9c144
Log eventbridge target
heerener Jun 20, 2025
30cd391
DRA: ClientRequestToken per mountpoint
heerener Jun 20, 2025
5bd46d7
fs_name == cluster.name
heerener Jun 20, 2025
f5dfa04
Bugfix: dynamodb_resource
heerener Jun 20, 2025
19a2c3a
Don't delete eventbridge rule for now
heerener Jun 20, 2025
3cb6ab8
One fsx will do
heerener Jun 20, 2025
c839f5d
Return ssh key secret ARNs in get cluster info
heerener Jun 20, 2025
4db3822
Attempt to correct FsxLustre cluster settings
heerener Jun 20, 2025
bda7dd6
Make get cluster more robust
heerener Jun 20, 2025
dbe556d
Fix DRA mountpoints
heerener Jun 20, 2025
a93f9a0
Store sim pubkey after generating it
heerener Jun 20, 2025
c2e6577
Fix some failing tests
heerener Jun 23, 2025
18908db
aws-parallelcluster-3.13.1 for benchmarks sandbox
heerener Jun 23, 2025
55682ba
No more suffix
heerener Jun 24, 2025
d211380
Shorter DRA creation token
heerener Jun 24, 2025
50ac1b8
AWS doesn't like long strings
heerener Jun 25, 2025
6a4ee86
mount the whole scratch bucket, that's the whole point of the benchma…
heerener Jun 25, 2025
926ce39
Track cluster creation time and use it in the DRA create token
heerener Jun 26, 2025
bbcd4a8
Decimal -> int
heerener Jun 26, 2025
046138e
Fix typo
heerener Jun 26, 2025
9fe8f5f
Bugfix
heerener Jun 26, 2025
916ccf1
Add cluster name tag on fsx and dra
heerener Jun 27, 2025
5737c71
Try with EfaEnabled=False for FSx
heerener Jun 27, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ on:
environment:
description: Which environment to push the image to
type: choice
options: [aws-sandbox-hpc, public-ecr]
options: [aws-sandbox-hpc, public-ecr, aws-sandbox-benchmarks]
required: true
default: aws-sandbox-hpc
push:
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ LABEL org.opencontainers.image.source="https://github.com/openbraininstitute/hpc

ARG SETUPTOOLS_SCM_PRETEND_VERSION

RUN dnf -y install nodejs findutils && python3 -m pip install aws-parallelcluster==3.13.0 awslambdaric
RUN dnf -y install nodejs findutils && python3 -m pip install aws-parallelcluster==3.13.1 awslambdaric

ADD hpc_provisioner /opt/hpc_provisioner
RUN python3 -m pip install /opt/hpc_provisioner
Expand Down
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@ The HPC Resource Provisioner is a small application that offers an API to manage

There's a GitHub workflow called `Build and Release HPC Resource Provisioner` that allows for very easy releasing of new versions. Select the branch you want to create a release from, and an AWS environment to push the image to, and run the workflow.

Once released, you'll still have to deploy it - this is not done automatically!

## Manual Usage

If you have awscli configured to connect to the sandbox environment, it's fairly easy to get the necessary variables in your shell. Make sure the keypair for sandbox is the first entry in `~/.aws/credentials`

`source ./sandbox.sh`
Adapt `sandbox.sh` to fit your sandbox environment, then run `source ./sandbox.sh`

Deploying a new cluster. You can run this command multiple times, as long as you specify the same `vlab_id` and `project_id` it will not deploy additional clusters.

Parameters (specify them in alphabetical order!):
* benchmark: optional: benchmark mode. This will give you access to the full scratch bucket and metrics
* dev: optional: dev mode, when you need features that are currently still in development
* include_lustre: optional: defaults to true: set to false if you don't need lustre, it speeds up deployment and is a lot cheaper
* project_id: required: string to identify your project. Will be part of the cluster name.
Expand Down Expand Up @@ -104,6 +107,23 @@ python3.12 -m venv venv
pip install -e 'hpc_provisioner[test]'
pytest hpc_provisioner
```

### Architecture

Resource provisioner is set up as a pair of lambdas behind an API gateway. The API gateway forwards to one of the lambdas which takes care of most of the work. When the time comes to actually deploy the cluster, it will call the second lambda async to perform the actual create_cluster command. The reason for this is that the call blocks longer than an API gateway is allowed to take to respond.

What happens when a user requests a cluster:
* handlers.py: the main handler function `pcluster_handler` inspects the request and routes it to the correct function based on the HTTP method and the path
* handlers.py: `pcluster_create_request_handler` will :
* register the cluster in dynamodb,
* create the EventBridge rule to check at regular intervals whether a filesystem or a cluster needs to be created (not implemented yet)
* pre-create the necessary SSH keys (one for the admin user, one for the sim user) and store them in secretsmanager.
* It will return a response to the user containing the secretsmanager ARNs for these SSH keys

At this point the interaction with the user is done and the lambda ends. The next activity will be triggered by the EventBridge rule: it will call the /pcluster/dra endpoint to trigger a check:
* handlers.py: the main handler function `pcluster_handler` inspects the request and routes it to the correct function based on the HTTP method and the path


# Acknowledgment

The development of this software was supported by funding to the Blue Brain Project,
Expand Down
5 changes: 5 additions & 0 deletions hpc_provisioner/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ dependencies = [
"aws-parallelcluster",
"boto3",
"cryptography",
"python_dynamodb_lock",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove again

"PyYAML"
]
authors = [{"name" = "Fernando Pereira", "email" = "fernando.pereira@epfl.ch"},
Expand Down Expand Up @@ -58,9 +59,13 @@ addopts = ["--import-mode=importlib", "-vv", "--disable-warnings", "--cov=hpc_pr
[tool.pytest_env]
SBO_NEXUSDATA_BUCKET = "s3://sbonexusdata-test"
CONTAINERS_BUCKET = "s3://sboinfrastructureassets-test/containers"
INFRA_ASSETS_BUCKET = "s3://sboinfrastructureassets-test"
SCRATCH_BUCKET = "s3://scratch-test"
EFA_SG_ID = "sg-123456789"
FSX_POLICY_ARN = "arn:aws:iam::123456:policy/fsx_policy"
SUFFIX = "dev"
FS_SUBNET_IDS = '["subnet-1234"]'
FS_SG_ID = "sg-123456789"
EVENTBRIDGE_ROLE_ARN = "arn:eventbridge_role"
API_GW_STAGE_ARN = "arn:api_gw_stage"
PCLUSTER_AMI_ID = "ami-12345"
179 changes: 116 additions & 63 deletions hpc_provisioner/src/hpc_provisioner/aws_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,27 @@
from hpc_provisioner.constants import (
BILLING_TAG_KEY,
BILLING_TAG_VALUE,
DRA_CHECKING_RULE_NAME,
PROJECT_TAG_KEY,
VLAB_TAG_KEY,
)
from hpc_provisioner.dynamodb_actions import (
SubnetAlreadyRegisteredException,
dynamodb_client,
dynamodb_resource,
free_subnet,
get_cluster_by_name,
get_registered_subnets,
get_subnet,
register_subnet,
)
from hpc_provisioner.logging_config import LOGGING_CONFIG
from hpc_provisioner.utils import get_fs_sg_id, get_fs_subnet_ids
from hpc_provisioner.utils import (
get_api_gw_arn,
get_eventbridge_role_arn,
get_fs_sg_id,
get_fs_subnet_ids,
)

logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger("hpc-resource-provisioner")
Expand Down Expand Up @@ -99,6 +107,20 @@ def create_secret(sm_client, vlab_id, project_id, secret_name, secret_value):
return secret


def get_secrets_for_cluster(sm_client, cluster_name: str) -> dict:
response = {"ssh_user": "sim"}
for secret_data in [
{"response_key": "admin_user_private_ssh_key_arn", "secret_suffix": ""},
{"response_key": "private_ssh_key_arn", "secret_suffix": "_sim"},
]:
secret = get_secret(
sm_client=sm_client, secret_name=f"{cluster_name}{secret_data['secret_suffix']}"
)
response[secret_data["response_key"]] = secret["ARN"]

return response


def get_secret(sm_client, secret_name):
existing_secrets = sm_client.list_secrets(Filters=[{"Key": "name", "Values": [secret_name]}])
if secret_list := existing_secrets.get("SecretList", []):
Expand Down Expand Up @@ -142,8 +164,10 @@ def get_security_group(ec2_client) -> str:


def release_subnets(cluster_name: str) -> None:
logger.debug(f"Releasing subnets for {cluster_name}")
client = dynamodb_client()
registered_subnets = get_registered_subnets(client)
logger.debug(f"Registered subnets: {registered_subnets}")
claimed_subnets = [
subnet for subnet in registered_subnets if registered_subnets[subnet] == cluster_name
]
Expand Down Expand Up @@ -294,46 +318,33 @@ def list_existing_stacks(cf_client):
return existing_stack_names


def get_fsx_name(shared: bool, fs_name: str, cluster: Optional[Cluster]) -> str:
if shared:
return fs_name
else:
return f"{fs_name}-{cluster.name}"


def create_fsx(
fsx_client,
fs_name: str,
shared: bool = True,
cluster: Optional[Cluster] = None,
cluster: Cluster,
) -> Dict:
"""
Create an FSX filesystem if it doesn't exist yet

:param fs_name: name to identify the filesystem (e.g. "scratch", "projects", ...)
:param shared: whether the filesystem is shared among all pclusters or specific to one cluster
:param vlab_id: vlab of the cluster to which the filesystem will be attached.
:param project_id: project of the cluster to which the filesystem will be attached.
"""

logger.debug(
f"Creating fsx with name {fs_name}, shared {shared}, and cluster {cluster}",
f"Creating fsx with name {fs_name} and cluster {cluster}",
)
tags = [
{"Key": BILLING_TAG_KEY, "Value": BILLING_TAG_VALUE},
{"Key": VLAB_TAG_KEY, "Value": cluster.vlab_id},
{"Key": PROJECT_TAG_KEY, "Value": cluster.project_id},
{"Key": "parallelcluster:cluster-name", "Value": cluster.name},
]
token = get_fsx_name(shared, fs_name, cluster)
logger.debug(f"Token: {token}")
tags.append({"Key": "Name", "Value": token})

if not shared:
tags.append({"Key": VLAB_TAG_KEY, "Value": cluster.vlab_id})
tags.append({"Key": PROJECT_TAG_KEY, "Value": cluster.project_id})

tags.append({"Key": "Name", "Value": fs_name})
logger.debug(f"Tags: {tags}")

fs = fsx_client.create_file_system(
ClientRequestToken=token,
ClientRequestToken=fs_name,
FileSystemType="LUSTRE",
StorageCapacity=19200,
StorageType="SSD",
Expand All @@ -346,7 +357,7 @@ def create_fsx(
"DeploymentType": "PERSISTENT_2",
"PerUnitStorageThroughput": 250,
"DataCompressionType": "LZ4",
"EfaEnabled": True,
"EfaEnabled": False,
# "LogConfiguration": { # TODO do we want this?
# "Level": "DISABLED" | "WARN_ONLY" | "ERROR_ONLY" | "WARN_ERROR",
# "Destination": "string",
Expand Down Expand Up @@ -376,22 +387,32 @@ def get_fsx_by_id(fsx_client, filesystem_id: str) -> Optional[dict]:
go_on = next_token is not None


def get_fsx(
fsx_client, shared: bool, fs_name: str, vlab_id: str, project_id: str
) -> Optional[dict]:
full_fs_name = get_fsx_name(shared, fs_name, vlab_id, project_id)
def list_all_fsx(fsx_client) -> list:
all_fsx = []
go_on = True
next_token = None
while go_on:
if next_token:
file_systems = fsx_client.describe_file_systems(NextToken=next_token)
else:
file_systems = fsx_client.describe_file_systems()
for fs in file_systems["FileSystems"]:
if any([t["Value"] == full_fs_name for t in fs["Tags"] if t["Key"] == "Name"]):
return fs
next_token = file_systems.get("NextToken")
logger.debug(f"Checking for next_token in file systems: {file_systems}")
go_on = next_token is not None
all_fsx.extend(file_systems["FileSystems"])
return all_fsx


def list_all_dras_for_fsx(fsx_client, filesystem_id) -> list:
return fsx_client.describe_data_repository_associations(
Filters=[{"Name": "file-system-id", "Values": [filesystem_id]}]
).get("Associations", [])


def get_fsx(fsx_client, fs_name: str) -> Optional[dict]:
for fsx in list_all_fsx(fsx_client):
if any([t["Value"] == fs_name for t in fsx["Tags"] if t["Key"] == "Name"]):
return fsx
return None


Expand All @@ -400,12 +421,11 @@ def create_dra(
filesystem_id: str,
mountpoint: str,
bucket: str,
vlab_id: str,
project_id: str,
cluster: Cluster,
writable: bool = False,
) -> dict:
logger.debug(
f"Creating DRA for fs {filesystem_id}, mount {bucket} at {mountpoint}, for {vlab_id}-{project_id}, writable {writable}"
f"Creating DRA for fs {filesystem_id}, mount {bucket} at {mountpoint}, for {cluster.vlab_id}-{cluster.project_id}, writable {writable}"
)
s3_config = {
"AutoImportPolicy": { # from S3 to FS
Expand All @@ -421,6 +441,13 @@ def create_dra(
s3_config["AutoExportPolicy"] = {"Events": ["NEW", "CHANGED", "DELETED"]}

logger.debug(f"s3 config: {s3_config}")
dynamo_cluster = get_cluster_by_name(
dynamodb_resource=dynamodb_resource(), cluster_name=cluster.name
)
if not dynamo_cluster:
raise RuntimeError(f"Clould not retrieve cluster {cluster.name} from dynamodb")
if dynamo_cluster["creation_time"] == 0:
raise ValueError(f"Creation time for {cluster.name} is 0; something is wrong")

dra = fsx_client.create_data_repository_association(
FileSystemId=filesystem_id,
Expand All @@ -429,12 +456,13 @@ def create_dra(
BatchImportMetaDataOnCreate=True,
ImportedFileChunkSize=1024,
S3=s3_config,
ClientRequestToken=f"{filesystem_id}-{vlab_id}-{project_id}",
ClientRequestToken=f"{dynamo_cluster['creation_time']}-{cluster.vlab_id[:21]}-{cluster.project_id[:21]}-{mountpoint.split('/')[-1]}",
Tags=[
{"Key": "Name", "Value": f"{filesystem_id}-{mountpoint}"},
{"Key": BILLING_TAG_KEY, "Value": BILLING_TAG_VALUE},
{"Key": VLAB_TAG_KEY, "Value": vlab_id},
{"Key": PROJECT_TAG_KEY, "Value": project_id},
{"Key": VLAB_TAG_KEY, "Value": cluster.vlab_id},
{"Key": PROJECT_TAG_KEY, "Value": cluster.project_id},
{"Key": "parallelcluster:cluster-name", "Value": cluster.name},
],
)

Expand All @@ -450,9 +478,7 @@ def get_dra_by_id(fsx_client, dra_id: str) -> Optional[dict]:


def get_dra(fsx_client, filesystem_id: str, mountpoint: str) -> Optional[dict]:
dras = fsx_client.describe_data_repository_associations(
Filters=[{"Name": "file-system-id", "Values": [filesystem_id]}]
)
dras = list_all_dras_for_fsx(fsx_client=fsx_client, filesystem_id=filesystem_id)

try:
dra = next(dra for dra in dras if dra["FileSystemPath"] == mountpoint)
Expand All @@ -461,32 +487,59 @@ def get_dra(fsx_client, filesystem_id: str, mountpoint: str) -> Optional[dict]:
return None


def wait_for_fsx(fsx_client, filesystem_id, target_status="AVAILABLE", timeout=300) -> None:
fsx = {"Lifecycle": "UNKNOWN"}
start = time.time()
while fsx["Lifecycle"] != target_status and time.time() < start + timeout:
fsx = get_fsx_by_id(fsx_client=fsx_client, filesystem_id=filesystem_id)
if not fsx:
raise RuntimeError(f"FSx with id {filesystem_id} not found")
def eventbridge_dra_checking_rule_exists(eb_client):
response = eb_client.list_rules(NamePrefix="resource_provisioner", Limit=100)
return any(rule["Name"] == DRA_CHECKING_RULE_NAME for rule in response.get("Rules", []))

if fsx["Lifecycle"] != target_status:
raise RuntimeError(
f"FSx {filesystem_id} did not reach status {target_status} within {timeout} seconds"
)

def create_eventbridge_dra_checking_rule(eb_client):
if eventbridge_dra_checking_rule_exists(eb_client):
return

eb_client.put_rule(
Name=DRA_CHECKING_RULE_NAME,
ScheduleExpression="rate(5 minutes)",
State="ENABLED",
Description="Periodically check for DRAs and fire resource creator",
RoleArn=get_eventbridge_role_arn(),
Tags=[
{"Key": BILLING_TAG_KEY, "Value": BILLING_TAG_VALUE},
],
)

create_eventbridge_target(eb_client)


def wait_for_dra(fsx_client, dra_id, target_status="AVAILABLE", timeout=600) -> None:
dra = {"Lifecycle": "UNKNOWN"}
logger.debug(f"Waiting for dra {dra_id} for {timeout} seconds")
start = time.time()
while dra.get("Lifecycle") != target_status and time.time() < start + timeout:
dra = get_dra_by_id(fsx_client=fsx_client, dra_id=dra_id)
logger.debug(f"DRA: {dra}")
if not dra:
raise RuntimeError(f"DRA with id {dra_id} not found")
time.sleep(10)

if dra["Lifecycle"] != target_status:
raise RuntimeError(
f"DRA {dra_id} did not reach status {target_status} within {timeout} seconds"
def create_eventbridge_target(eb_client):
target = {
"Id": "hpc-resource-provisioner",
"Arn": f"{get_api_gw_arn()}production/POST/hpc-provisioner/dra",
"RoleArn": get_eventbridge_role_arn(),
}
logger.debug(f"Creating eventbridge target: {target}")
eb_client.put_targets(
Rule=DRA_CHECKING_RULE_NAME,
Targets=[
target,
],
)


def delete_eventbridge_dra_checking_rule(eb_client):
logger.debug("Deleting eventbridge DRA checking rule")
existing_targets = eb_client.list_targets_by_rule(Rule=DRA_CHECKING_RULE_NAME).get(
"Targets", []
)
if len(existing_targets) > 0:
logger.debug(f"Clearing targets first: {existing_targets}")
eb_client.remove_targets(
Rule=DRA_CHECKING_RULE_NAME, Ids=[target["Id"] for target in existing_targets]
)

logger.debug("Deleting rule")
response = eb_client.delete_rule(Name=DRA_CHECKING_RULE_NAME)
logger.debug(f"Delete response: {response}")


def delete_fsx(fsx_client, filesystem_id: str) -> None:
fsx_client.delete_file_system(FileSystemId=filesystem_id, ClientRequestToken=filesystem_id)
Loading
Loading