Skip to content

Commit

Permalink
feat: check and auto-create S3 artifact bucket if missing (#34)
Browse files Browse the repository at this point in the history
* feat: check and auto-create S3 artifact bucket if missing

Adds the feature that:
* Checks related S3 storage for the bucket named by the default_artifact_root config option
* If create_artifact_root_if_not_exists==True and default bucket does not exist, attempts to create the bucket

Supporting this is an S3 bucket wrapper to help checking the bucket existance/creating new buckets and unit/integration tests.

Also included here was some refactoring to make the main function of the Operator read a bit clearer, packaging logic into some verbosely named helpers.

 Closes #23

* feat: add more unit tests for bucket creation

Also refactors/fixes bucket creation logic

* fix: typos in services/s3.py, related tests

* fix: linting/fmt

* refactor: move test_s3.py to unit test folder

* feat: add integration test for automatic bucket creation

* fix: errors in test_default_bucket_created

* fix: linting

* fix: integration test for github actions

Using AWS client from some virtualized environments prevents the client from inferring the region, resulting in an error.  This change sets the region explicitly.

* fix: formatting

* fix: formatting

* Revert "fix: formatting"

This reverts commit fbee1a8.

* fix: formatting

* fix: formatting. again...
  • Loading branch information
ca-scribner authored Apr 4, 2022
1 parent fc8a8da commit 9430475
Show file tree
Hide file tree
Showing 8 changed files with 581 additions and 83 deletions.
6 changes: 6 additions & 0 deletions charms/mlflow-server/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
# See LICENSE file for licensing details.
#
options:
create_default_artifact_root_if_missing:
description: |
If True, charm will try to create the default_artifact_root bucket in S3 if it does not
exist. If False and the bucket does not exist, the charm enter Blocked status
type: boolean
default: true
default_artifact_root:
description: |
The name of the default bucket mlflow uses for artifacts, if not specified by the workflow
Expand Down
1 change: 1 addition & 0 deletions charms/mlflow-server/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
boto3
ops==1.2.0
oci-image==1.0.0
ops-lib-mysql
Expand Down
159 changes: 104 additions & 55 deletions charms/mlflow-server/src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import json
import logging
import re
from base64 import b64encode

from oci_image import OCIImageResource, OCIImageResourceError
Expand All @@ -28,7 +27,7 @@
get_interfaces,
)

DB_NAME = "mlflow"
from services.s3 import S3BucketWrapper, validate_s3_bucket_name


class Operator(CharmBase):
Expand All @@ -42,6 +41,7 @@ def __init__(self, *args):

self.image = OCIImageResource(self, "oci-image")
self.log = logging.getLogger(__name__)
self.charm_name = self.model.app.name

for event in [
self.on.install,
Expand Down Expand Up @@ -94,52 +94,27 @@ def main(self, event):
Runs at install, update, config change and relation change.
"""
try:
self.model.unit.status = MaintenanceStatus("Validating inputs and computing pod spec")

self._check_leader()
default_artifact_root = validate_s3_bucket_name(self.config["default_artifact_root"])
interfaces = self._get_interfaces()
image_details = self._check_image_details()
except CheckFailedError as check_failed:
self.model.unit.status = check_failed.status
self.model.unit.message = check_failed.msg
return

self._configure_mesh(interfaces)
config = self.model.config
charm_name = self.model.app.name
mysql = self._configure_mysql()
obj_storage = _get_obj_storage(interfaces)
secrets = self._define_secrets(obj_storage=obj_storage, mysql=mysql)

mysql = self.model.relations["db"]
if len(mysql) > 1:
self.model.unit.status = BlockedStatus("Too many mysql relations")
return

try:
mysql = mysql[0]
unit = list(mysql.units)[0]
mysql = mysql.data[unit]
mysql["database"]
except (IndexError, KeyError):
self.model.unit.status = WaitingStatus("Waiting for mysql relation data")
return
default_artifact_root = self._validate_default_s3_bucket(obj_storage)

if not ((obj_storage := interfaces["object-storage"]) and obj_storage.get_data()):
self.model.unit.status = WaitingStatus("Waiting for object-storage relation data")
self._configure_mesh(interfaces)
except CheckFailedError as check_failed:
self.model.unit.status = check_failed.status
self.model.unit.message = check_failed.msg
return

self.model.unit.status = MaintenanceStatus("Setting pod spec")

obj_storage = list(obj_storage.get_data().values())[0]
secrets = [
{
"name": f"{charm_name}-minio-secret",
"data": _minio_credentials_dict(obj_storage=obj_storage),
},
{
"name": f"{charm_name}-seldon-init-container-s3-credentials",
"data": _seldon_credentials_dict(obj_storage=obj_storage),
},
{"name": f"{charm_name}-db-secret", "data": _db_secret_dict(mysql=mysql)},
]

config = self.model.config
self.model.pod.set_spec(
{
"version": 3,
Expand All @@ -157,8 +132,8 @@ def main(self, event):
f"s3://{default_artifact_root}/",
],
"envConfig": {
"db-secret": {"secret": {"name": f"{charm_name}-db-secret"}},
"aws-secret": {"secret": {"name": f"{charm_name}-minio-secret"}},
"db-secret": {"secret": {"name": f"{self.charm_name}-db-secret"}},
"aws-secret": {"secret": {"name": f"{self.charm_name}-minio-secret"}},
"AWS_DEFAULT_REGION": "us-east-1",
"MLFLOW_S3_ENDPOINT_URL": "http://{service}.{namespace}:{port}".format(
**obj_storage
Expand Down Expand Up @@ -236,6 +211,22 @@ def _configure_mesh(self, interfaces):
}
)

def _configure_mysql(
self,
):
mysql = self.model.relations["db"]
if len(mysql) > 1:
raise CheckFailedError("Too many mysql relations", BlockedStatus)

try:
mysql = mysql[0]
unit = list(mysql.units)[0]
mysql = mysql.data[unit]
mysql["database"]
return mysql
except (IndexError, KeyError):
raise CheckFailedError("Waiting for mysql relation data", WaitingStatus)

def _check_leader(self):
if not self.unit.is_leader():
# We can't do anything useful when not the leader, so do nothing.
Expand All @@ -257,21 +248,59 @@ def _check_image_details(self):
raise CheckFailedError(f"{e.status.message}", e.status_type)
return image_details


def validate_s3_bucket_name(name):
"""Validates the name as a valid S3 bucket name, raising a CheckFailedError if invalid."""
# regex from https://stackoverflow.com/a/50484916/5394584
if re.match(
r"(?=^.{3,63}$)(?!^(\d+\.)+\d+$)(^(([a-z0-9]|[a-z0-9][a-z0-9\-]*[a-z0-9])\.)*([a-z0-9]|[a-z0-9][a-z0-9\-]*[a-z0-9])$)",
name,
):
return name
else:
msg = (
f"Invalid value for config default_artifact_root '{name}'"
f" - value must be a valid S3 bucket name"
def _validate_default_s3_bucket(self, obj_storage):
"""Validates the default S3 store, ensuring bucket is accessible and creating if needed."""
# Validate the bucket name
bucket_name = self.config["default_artifact_root"]
if not validate_s3_bucket_name(bucket_name):
msg = (
f"Invalid value for config default_artifact_root '{bucket_name}'"
f" - value must be a valid S3 bucket name"
)
raise CheckFailedError(msg, BlockedStatus)

# Ensure the bucket exists, creating it if missing and create_root_if_not_exists==True
s3_wrapper = S3BucketWrapper(
access_key=obj_storage["access-key"],
secret_access_key=obj_storage["secret-key"],
s3_service=obj_storage["service"],
s3_port=obj_storage["port"],
)
raise CheckFailedError(msg, BlockedStatus)

if s3_wrapper.check_if_bucket_accessible(bucket_name):
return bucket_name
else:
if self.config["create_default_artifact_root_if_missing"]:
try:
s3_wrapper.create_bucket(bucket_name)
return bucket_name
except Exception as e:
raise CheckFailedError(
"Error with default S3 artifact store - bucket not accessible or "
f"cannot be created. Caught error: '{str(e)}",
BlockedStatus,
)
else:
raise CheckFailedError(
"Error with default S3 artifact store - bucket not accessible or does not exist."
" Set create_default_artifact_root_if_missing=True to automatically create a "
"missing default bucket",
BlockedStatus,
)

def _define_secrets(self, obj_storage, mysql):
"""Returns needed secrets in pod_spec.kubernetesResources.secrets format."""
return [
{
"name": f"{self.charm_name}-minio-secret",
"data": _minio_credentials_dict(obj_storage=obj_storage),
},
{
"name": f"{self.charm_name}-seldon-init-container-s3-credentials",
"data": _seldon_credentials_dict(obj_storage=obj_storage),
},
{"name": f"{self.charm_name}-db-secret", "data": _db_secret_dict(mysql=mysql)},
]


class CheckFailedError(Exception):
Expand All @@ -294,7 +323,7 @@ def _b64_encode_dict(d):
def _minio_credentials_dict(obj_storage):
"""Returns a dict of minio credentials with the values base64 encoded."""
minio_credentials = {
"AWS_ENDPOINT_URL": f"http://{obj_storage['service']}.{obj_storage['namespace']}:{obj_storage['port']}",
"AWS_ENDPOINT_URL": f"http://{obj_storage['service']}:{obj_storage['port']}",
"AWS_ACCESS_KEY_ID": obj_storage["access-key"],
"AWS_SECRET_ACCESS_KEY": obj_storage["secret-key"],
"USE_SSL": str(obj_storage["secure"]).lower(),
Expand Down Expand Up @@ -325,5 +354,25 @@ def _db_secret_dict(mysql):
return _b64_encode_dict(db_secret)


def _get_obj_storage(interfaces):
"""Unpacks and returns the object-storage relation data.
Raises CheckFailedError if an anticipated error occurs.
"""
if not ((obj_storage := interfaces["object-storage"]) and obj_storage.get_data()):
raise CheckFailedError("Waiting for object-storage relation data", WaitingStatus)

try:
obj_storage = list(obj_storage.get_data().values())[0]
except Exception as e:
raise CheckFailedError(
f"Unexpected error unpacking object storage data - data format not "
f"as expected. Caught exception: '{str(e)}'",
BlockedStatus,
)

return obj_storage


if __name__ == "__main__":
main(Operator)
83 changes: 83 additions & 0 deletions charms/mlflow-server/src/services/s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""Wrapper for basic accessing and validating of S3 Buckets."""

import re
from typing import Union

import boto3
import botocore.client
import botocore.exceptions


class S3BucketWrapper:
"""Wrapper for basic accessing and validating of S3 Buckets."""

def __init__(
self, access_key: str, secret_access_key: str, s3_service: str, s3_port: Union[str, int]
):
self.access_key: str = access_key
self.secret_access_key: str = secret_access_key
self.s3_service: str = s3_service
self.s3_port: str = str(s3_port)

self._client: botocore.client.BaseClient = None

def check_if_bucket_accessible(self, bucket_name):
"""Checks if a bucket exists and is accessible, returning True if both are satisfied.
Will return False if we encounter a botocore.exceptions.ClientError, which could be
due to the bucket not existing, the client session not having permission to access the
bucket, or some other error with the client.
"""
try:
self.client.head_bucket(Bucket=bucket_name)
return True
except botocore.exceptions.ClientError:
return False

def create_bucket_if_missing(self, bucket_name):
"""Creates the bucket bucket_name if it does not exist, raising an error if it cannot.
This method tries to access the bucket, assuming that if it is unaccessible that it does
not exist (this is a required assumption as unaccessible buckets look the same as those
that do not exist). If inaccessible, we try to create_bucket and do not catch any
exceptions that result from the call.
"""
if self.check_if_bucket_accessible(bucket_name=bucket_name):
return

self.create_bucket(bucket_name=bucket_name)

def create_bucket(self, bucket_name):
"""Create a bucket via the client."""
self.client.create_bucket(Bucket=bucket_name)

@property
def client(self) -> botocore.client.BaseClient:
"""Returns an open boto3 client, creating and caching one if needed."""
if self._client:
return self._client
else:
self._client = boto3.client(
"s3",
endpoint_url=self.s3_url,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_access_key,
)
return self._client

@property
def s3_url(self):
"""Returns the S3 url."""
return f"http://{self.s3_service}:{self.s3_port}"


def validate_s3_bucket_name(name):
"""Returns True if name is a valid S3 bucket name, else False."""
# regex from https://stackoverflow.com/a/50484916/5394584
if re.match(
r"(?=^.{3,63}$)(?!^(\d+\.)+\d+$)(^(([a-z0-9]|[a-z0-9][a-z0-9\-]*[a-z0-9])\.)*([a-z0-9]|[a-z0-9][a-z0-9\-]*[a-z0-9])$)",
name,
):
return True
else:
return False
2 changes: 2 additions & 0 deletions charms/mlflow-server/test-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
black
flake8
pytest
pytest-mock
pytest-lazy-fixture
Loading

0 comments on commit 9430475

Please sign in to comment.