Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(components): AWS SageMaker - Add optional parameter to allow training component to accept parameters related to Debugger #4283

Merged
merged 13 commits into from
Aug 19, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 102 additions & 13 deletions components/aws/sagemaker/common/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import re
import json
from pathlib2 import Path
from enum import Enum, auto

import boto3
import botocore
Expand Down Expand Up @@ -88,6 +89,10 @@ def get_component_version():
return component_version


def print_log_header(header_len, title=""):
header_buffer = header_len // 2 * '*'
logging.info(f"{header_buffer}{title}{header_buffer}")

def print_logs_for_job(cw_client, log_grp, job_name):
"""Gets the CloudWatch logs for SageMaker jobs"""
try:
Expand Down Expand Up @@ -205,6 +210,22 @@ def create_training_job_request(args):

enable_spot_instance_support(request, args)

### Update DebugHookConfig and DebugRuleConfigurations
if args['debug_hook_config']:
request['DebugHookConfig'] = args['debug_hook_config']
else:
request.pop('DebugHookConfig')

if args['debug_rule_config']:
request['DebugRuleConfigurations'] = args['debug_rule_config']
else:
request.pop('DebugRuleConfigurations')

if args['tensorboard_output_config']:
request['TensorBoardOutputConfig'] = args['tensorboard_output_config']
else:
request.pop('TensorBoardOutputConfig')

### Update tags
for key, val in args['tags'].items():
request['Tags'].append({'Key': key, 'Value': val})
Expand All @@ -229,18 +250,86 @@ def create_training_job(client, args):


def wait_for_training_job(client, training_job_name, poll_interval=30):
while(True):
response = client.describe_training_job(TrainingJobName=training_job_name)
status = response['TrainingJobStatus']
if status == 'Completed':
logging.info("Training job ended with status: " + status)
break
if status == 'Failed':
message = response['FailureReason']
logging.info('Training failed with the following error: {}'.format(message))
raise Exception('Training job failed')
logging.info("Training job is still in status: " + status)
time.sleep(poll_interval)
while(True):
response = client.describe_training_job(TrainingJobName=training_job_name)
status = response['TrainingJobStatus']
if status == 'Completed':
logging.info("Training job ended with status: " + status)
break
if status == 'Failed':
message = response['FailureReason']
logging.info(f'Training failed with the following error: {message}')
raise Exception('Training job failed')
logging.info("Training job is still in status: " + status)
time.sleep(poll_interval)


def wait_for_debug_rules(client, training_job_name, poll_interval=30):
first_poll = True
while(True):
response = client.describe_training_job(TrainingJobName=training_job_name)
if 'DebugRuleEvaluationStatuses' not in response:
break
if first_poll:
logging.info("Polling for status of all debug rules:")
first_poll = False
if DebugRulesStatus.status(response) != DebugRulesStatus.INPROGRESS:
logging.info("Rules have ended with status:\n")
print_debug_rule_status(response, True)
break
print_debug_rule_status(response)
time.sleep(poll_interval)


class DebugRulesStatus(Enum):
COMPLETED = auto()
ERRORED = auto()
INPROGRESS = auto()

@classmethod
def status(self, response):
has_error = False
for debug_rule in response['DebugRuleEvaluationStatuses']:
if debug_rule['RuleEvaluationStatus'] == "Error":
has_error = True
if debug_rule['RuleEvaluationStatus'] == "InProgress":
return DebugRulesStatus.INPROGRESS
if has_error:
return DebugRulesStatus.ERRORED
else:
return DebugRulesStatus.COMPLETED


def print_debug_rule_status(response, last_print=False):
"""
If last_print is False:
INFO:root: - LossNotDecreasing: InProgress
INFO:root: - Overtraining: NoIssuesFound
ERROR:root:- CustomGradientRule: Error

If last_print is True:
INFO:root: - LossNotDecreasing: IssuesFound
INFO:root: - RuleEvaluationConditionMet: Evaluation of the rule LossNotDecreasing at step 10 resulted in the condition being met

INFO:root: - Overtraining: NoIssuesFound

ERROR:root:- CustomGradientRule: Error
ERROR:root: - ClientError: Required key source_s3_uri not found in rule parameters map for rule configuration CustomGradientRule.

"""
for debug_rule in response['DebugRuleEvaluationStatuses']:
line_ending = "\n" if last_print else ""
if 'StatusDetails' in debug_rule:
status_details = f"- {debug_rule['StatusDetails'].rstrip()}{line_ending}"
line_ending = ""
else:
status_details = ""
rule_status = f"- {debug_rule['RuleConfigurationName']}: {debug_rule['RuleEvaluationStatus']}{line_ending}"
log = logging.error if debug_rule['RuleEvaluationStatus'] == "Error" else logging.info
log(f" {rule_status}")
if last_print and status_details:
log(f" {status_details}")
print_log_header(50)


def get_model_artifacts_from_job(client, job_name):
Expand Down Expand Up @@ -1082,4 +1171,4 @@ def write_output(output_path, output_value, json_encode=False):
write_value = json.dumps(output_value) if json_encode else output_value

Path(output_path).parent.mkdir(parents=True, exist_ok=True)
Path(output_path).write_text(write_value)
Path(output_path).write_text(write_value)
5 changes: 4 additions & 1 deletion components/aws/sagemaker/common/train.template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@ VpcConfig:
StoppingCondition:
MaxRuntimeInSeconds: 86400
MaxWaitTimeInSeconds: 86400
DebugHookConfig: {}
DebugRuleConfigurations: []
TensorBoardOutputConfig: {}
CheckpointConfig:
S3Uri: ''
LocalPath: ''
Tags: []
EnableNetworkIsolation: True
EnableInterContainerTrafficEncryption: False
EnableManagedSpotTraining: False
EnableManagedSpotTraining: False
4 changes: 2 additions & 2 deletions components/aws/sagemaker/tests/integration_tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

1. In the following Python script, change the bucket name and run the [`s3_sample_data_creator.py`](https://github.com/kubeflow/pipelines/tree/master/samples/contrib/aws-samples/mnist-kmeans-sagemaker#the-sample-dataset) to create an S3 bucket with the sample mnist dataset in the region where you want to run the tests.
2. To prepare the dataset for the SageMaker GroundTruth Component test, follow the steps in the `[GroundTruth Sample README](https://github.com/kubeflow/pipelines/tree/master/samples/contrib/aws-samples/ground_truth_pipeline_demo#prep-the-dataset-label-categories-and-ui-template)`.
3. To prepare the processing script for the SageMaker Processing Component tests, upload the `scripts/kmeans_preprocessing.py` script to your bucket. This can be done by replacing `<my-bucket> with your bucket name and running `aws s3 cp scripts/kmeans_preprocessing.py s3://<my-bucket>/mnist_kmeans_example/processing_code/kmeans_preprocessing.py`
3. To prepare the processing script for the SageMaker Processing Component tests, upload the `scripts/kmeans_preprocessing.py` script to your bucket. This can be done by replacing `<my-bucket>` with your bucket name and running `aws s3 cp scripts/kmeans_preprocessing.py s3://<my-bucket>/mnist_kmeans_example/processing_code/kmeans_preprocessing.py`


## Step to run integration tests
Expand All @@ -22,4 +22,4 @@
1. Navigate to the root of this github directory.
1. Run `docker build . -f components/aws/sagemaker/tests/integration_tests/Dockerfile -t amazon/integration_test`
1. Run the image, injecting your environment variable files:
1. Run `docker run --env-file components/aws/sagemaker/tests/integration_tests/.env amazon/integration_test`
1. Run `docker run --env-file components/aws/sagemaker/tests/integration_tests/.env amazon/integration_test`
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@
from utils import sagemaker_utils


@pytest.mark.parametrize(
"test_file_dir",
[
pytest.param(
"resources/config/kmeans-mnist-endpoint", marks=pytest.mark.canary_test
)
],
)
def run_predict_mnist(boto3_session, endpoint_name, download_dir):
""" https://github.com/awslabs/amazon-sagemaker-examples/blob/a8c20eeb72dc7d3e94aaaf28be5bf7d7cd5695cb
/sagemaker-python-sdk/1P_kmeans_lowlevel/kmeans_mnist_lowlevel.ipynb """
Expand Down Expand Up @@ -40,14 +48,6 @@ def np2csv(arr):
return json.loads(response["Body"].read().decode())


@pytest.mark.parametrize(
"test_file_dir",
[
pytest.param(
"resources/config/kmeans-mnist-endpoint", marks=pytest.mark.canary_test
)
],
)
def test_create_endpoint(
kfp_client, experiment_id, boto3_session, sagemaker_client, test_file_dir
):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from utils import minio_utils
from utils import sagemaker_utils


@pytest.mark.parametrize(
"test_file_dir",
[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,14 @@
pytest.param(
"resources/config/simple-mnist-training", marks=pytest.mark.canary_test
),
pytest.param("resources/config/fsx-mnist-training", marks=pytest.mark.fsx_test),
pytest.param(
"resources/config/xgboost-mnist-trainingjob-debugger",
marks=pytest.mark.canary_test
),
pytest.param(
"resources/config/fsx-mnist-training",
marks=pytest.mark.fsx_test
),
"resources/config/spot-sample-pipeline-training",
],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@
from utils import minio_utils


@pytest.mark.parametrize(
"test_file_dir",
[
pytest.param(
"resources/config/create-workteam",
marks=pytest.mark.canary_test
)
],
)
def create_workteamjob(
kfp_client, experiment_id, region, sagemaker_client, test_file_dir, download_dir
):
Expand Down Expand Up @@ -42,10 +51,6 @@ def create_workteamjob(
return workteam_name, workflow_json


@pytest.mark.parametrize(
"test_file_dir",
[pytest.param("resources/config/create-workteam", marks=pytest.mark.canary_test)],
)
def test_workteamjob(
kfp_client, experiment_id, region, sagemaker_client, test_file_dir
):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
PipelineDefinition: resources/definition/training_pipeline.py
TestName: xgboost-mnist-trainingjob-debugger
Timeout: 86400
ExpectedTrainingImage: ((XGBOOST_REGISTRY)).dkr.ecr.((REGION)).amazonaws.com/sagemaker-xgboost:1.0-1-cpu-py3
Arguments:
region: ((REGION))
image: ((XGBOOST_REGISTRY)).dkr.ecr.((REGION)).amazonaws.com/sagemaker-xgboost:1.0-1-cpu-py3
training_input_mode: File
hyperparameters:
max_depth: "5"
eta: "0.2"
gamma: "4"
min_child_weight: "6"
silent: "0"
subsample: "0.7"
num_round: "51"
channels:
- ChannelName: train
CompressionType: None
ContentType: text/csv
DataSource:
S3DataSource:
S3DataType: S3Prefix
S3DataDistributionType: FullyReplicated
S3Uri: s3://((DATA_BUCKET))/mnist_kmeans_example/input/valid_data.csv
instance_type: ml.m4.xlarge
instance_count: 1
volume_size: 5
max_run_time: 3600
model_artifact_path: s3://((DATA_BUCKET))/xgboost-debugger/output
network_isolation: "True"
traffic_encryption: "False"
spot_instance: "False"
max_wait_time: 3600
checkpoint_config: "{}"
debug_hook_config:
S3OutputPath: s3://((DATA_BUCKET))/xgboost-debugger/hookconfig
CollectionConfigurations:
- CollectionName: "feature_importance"
CollectionParameters:
save_interval: "5"
- CollectionName: "losses"
CollectionParameters:
save_interval: "500"
- CollectionName: "average_shap"
CollectionParameters:
save_interval: "5"
- CollectionName: "metrics"
CollectionParameters:
save_interval: "5"
debug_rule_config:
- RuleConfigurationName: LossNotDecreasing
RuleEvaluatorImage: ((BUILTIN_RULE_IMAGE))
RuleParameters:
collection_names: metrics
num_steps: "10"
rule_to_invoke: LossNotDecreasing
role: ((ROLE_ARN))

Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ def training_pipeline(
max_run_time="",
model_artifact_path="",
output_encryption_key="",
vpc_security_group_ids="",
vpc_subnets="",
network_isolation="",
traffic_encryption="",
spot_instance="",
max_wait_time="",
checkpoint_config="{}",
vpc_security_group_ids="",
vpc_subnets="",
checkpoint_config="",
debug_hook_config="",
debug_rule_config="",
tensorboard_output_config="",
role="",
):
sagemaker_train_op(
Expand All @@ -43,13 +46,16 @@ def training_pipeline(
max_run_time=max_run_time,
model_artifact_path=model_artifact_path,
output_encryption_key=output_encryption_key,
vpc_security_group_ids=vpc_security_group_ids,
vpc_subnets=vpc_subnets,
network_isolation=network_isolation,
traffic_encryption=traffic_encryption,
spot_instance=spot_instance,
max_wait_time=max_wait_time,
checkpoint_config=checkpoint_config,
vpc_security_group_ids=vpc_security_group_ids,
vpc_subnets=vpc_subnets,
debug_hook_config=debug_hook_config,
debug_rule_config=debug_rule_config,
tensorboard_output_config=tensorboard_output_config,
role=role,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import shutil

from sagemaker.amazon.amazon_estimator import get_image_uri
Copy link
Contributor

@akartsky akartsky Aug 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That deprecated version is for SageMaker SDK 2.0 @akartsky

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right we can choose to keep using the old version

for some reason i thought this was added to _utils.py file as a new dependency so i thought of using the latest one https://github.com/kubeflow/pipelines/blob/master/components/aws/sagemaker/common/_utils.py#L39

from sagemaker.debugger import get_rule_container_image_uri


def get_region():
Expand Down Expand Up @@ -42,8 +43,8 @@ def get_fsx_id():
return os.environ.get("FSX_ID")


def get_algorithm_image_registry(region, algorithm):
return get_image_uri(region, algorithm).split(".")[0]
def get_algorithm_image_registry(region, algorithm, repo_version=1):
return get_image_uri(region, algorithm, repo_version).split(".")[0]


def run_command(cmd, *popenargs, **kwargs):
Expand Down Expand Up @@ -81,6 +82,8 @@ def replace_placeholders(input_filename, output_filename):
"((ROLE_ARN))": get_role_arn(),
"((DATA_BUCKET))": get_s3_data_bucket(),
"((KMEANS_REGISTRY))": get_algorithm_image_registry(region, "kmeans"),
"((XGBOOST_REGISTRY))": get_algorithm_image_registry(region, "xgboost", "1.0-1"),
"((BUILTIN_RULE_IMAGE))": get_rule_container_image_uri(region),
"((FSX_ID))": get_fsx_id(),
"((FSX_SUBNET))": get_fsx_subnet(),
"((FSX_SECURITY_GROUP))": get_fsx_security_group(),
Expand All @@ -97,7 +100,7 @@ def replace_placeholders(input_filename, output_filename):
with open(output_filename, "w") as f:
f.write(filedata)
return output_filename


def load_params(file_name):
with open(file_name, "r") as f:
Expand Down
Loading