diff --git a/samples/contrib/aws-samples/README.md b/samples/contrib/aws-samples/README.md index 77179d1fbb6..a5a1a9ab451 100644 --- a/samples/contrib/aws-samples/README.md +++ b/samples/contrib/aws-samples/README.md @@ -118,20 +118,8 @@ There are two ways you can give them access to SageMaker. ## Inputs to the pipeline -### Sample MNIST dataset - -The following commands will copy the data extraction pre-processing script to an S3 bucket which we will use to store artifacts for the pipeline. - -1. [Create a bucket](https://docs.aws.amazon.com/AmazonS3/latest/gsg/CreatingABucket.html) in `us-east-1` region if you don't have one already. -For the purposes of this demonstration, all resources will be created in the us-east-1 region. - -2. Upload the `mnist-kmeans-sagemaker/kmeans_preprocessing.py` file to your bucket with the prefix `mnist_kmeans_example/processing_code/kmeans_preprocessing.py`. -This can be done with the following command, replacing `` with the name of the bucket you previously created in `us-east-1`: - ``` - aws s3 cp mnist-kmeans-sagemaker/kmeans_preprocessing.py s3:///mnist_kmeans_example/processing_code/kmeans_preprocessing.py - ``` - ### Role Input +**Note:** Ignore this section if you plan to run [titanic-survival-prediction](https://github.com/kubeflow/pipelines/tree/master/samples/contrib/aws-samples/titanic-survival-prediction) example This role is used by SageMaker jobs created by the KFP to access the S3 buckets and other AWS resources. Run these commands to create the sagemaker-execution-role. diff --git a/samples/contrib/aws-samples/mnist-kmeans-sagemaker/README.md b/samples/contrib/aws-samples/mnist-kmeans-sagemaker/README.md index 56a9695603d..b0b3f372454 100644 --- a/samples/contrib/aws-samples/mnist-kmeans-sagemaker/README.md +++ b/samples/contrib/aws-samples/mnist-kmeans-sagemaker/README.md @@ -4,8 +4,21 @@ The `mnist-classification-pipeline.py` sample runs a pipeline to train a classfi ## Prerequisites +### Setup K8s cluster and authentication Make sure you have the setup explained in this [README.md](https://github.com/kubeflow/pipelines/blob/master/samples/contrib/aws-samples/README.md) +### Sample MNIST dataset + +The following commands will copy the data extraction pre-processing script to an S3 bucket which we will use to store artifacts for the pipeline. + +1. [Create a bucket](https://docs.aws.amazon.com/AmazonS3/latest/gsg/CreatingABucket.html) in `us-east-1` region if you don't have one already. +For the purposes of this demonstration, all resources will be created in the us-east-1 region. + +2. Upload the `mnist-kmeans-sagemaker/kmeans_preprocessing.py` file to your bucket with the prefix `mnist_kmeans_example/processing_code/kmeans_preprocessing.py`. +This can be done with the following command, replacing `` with the name of the bucket you previously created in `us-east-1`: + ``` + aws s3 cp mnist-kmeans-sagemaker/kmeans_preprocessing.py s3:///mnist_kmeans_example/processing_code/kmeans_preprocessing.py + ``` ## Compiling the pipeline template @@ -20,32 +33,49 @@ dsl-compile --py mnist-classification-pipeline.py --output mnist-classification- Open the Kubeflow pipelines UI. Create a new pipeline, and then upload the compiled specification (`.tar.gz` file) as a new pipeline template. -The pipeline requires several arguments, replace `role_arn` and data path with your settings. +Provide the `role_arn` and `bucket_name` you created as pipeline inputs. Once the pipeline done, you can go to `batch_transform_ouput` to check your batch prediction results. -You will have an model endpoint in service. Please remember to clean it up. +You will also have an model endpoint in service. Refer to [Prediction section](#Prediction) below to run predictions aganist your deployed model aganist the endpoint. Please remember to clean up the endpoint. ## Prediction -Open SageMaker [console](https://us-east-1.console.aws.amazon.com/sagemaker/home?region=us-east-1#/endpoints) and find your endpoint name, You can call endpoint in this way. Please check dataset section to get `train_set`. +1. Find your endpoint name either by, + - Opening SageMaker [console](https://us-east-1.console.aws.amazon.com/sagemaker/home?region=us-east-1#/endpoints), or + - Clicking the `sagemaker-deploy-model-endpoint_name` under `Output artifacts` of `SageMaker - Deploy Model` component of the pipeline run + +2. Setup AWS credentials with `sagemaker:InvokeEndpoint` access. [Sample commands](https://sagemaker.readthedocs.io/en/stable/workflows/kubernetes/using_amazon_sagemaker_components.html#configure-permissions-to-run-predictions) +3. Update the `ENDPOINT_NAME` variable in the script below +4. Run the script below to invoke the endpoint ```python import json import io import boto3 +import pickle +import urllib.request +import gzip +import numpy + +ENDPOINT_NAME="" -# Simple function to create a csv from our numpy array +# Simple function to create a csv from numpy array def np2csv(arr): csv = io.BytesIO() numpy.savetxt(csv, arr, delimiter=',', fmt='%g') return csv.getvalue().decode().rstrip() -runtime = boto3.Session().client('sagemaker-runtime') +# Prepare input for the model +urllib.request.urlretrieve("http://deeplearning.net/data/mnist/mnist.pkl.gz", "mnist.pkl.gz") +with gzip.open('mnist.pkl.gz', 'rb') as f: + train_set, _, _ = pickle.load(f, encoding='latin1') payload = np2csv(train_set[0][30:31]) -response = runtime.invoke_endpoint(EndpointName='Endpoint-20190502202738-LDKG', +# Run prediction aganist the endpoint created by the pipeline +runtime = boto3.Session(region_name='us-east-1').client('sagemaker-runtime') +response = runtime.invoke_endpoint(EndpointName=ENDPOINT_NAME, ContentType='text/csv', Body=payload) result = json.loads(response['Body'].read().decode()) diff --git a/samples/contrib/aws-samples/mnist-kmeans-sagemaker/mnist-classification-pipeline.py b/samples/contrib/aws-samples/mnist-kmeans-sagemaker/mnist-classification-pipeline.py index 69fd4ac7d39..4395a14760d 100644 --- a/samples/contrib/aws-samples/mnist-kmeans-sagemaker/mnist-classification-pipeline.py +++ b/samples/contrib/aws-samples/mnist-kmeans-sagemaker/mnist-classification-pipeline.py @@ -2,20 +2,27 @@ import kfp -import json -import copy from kfp import components from kfp import dsl -sagemaker_hpo_op = components.load_component_from_file("../../../../components/aws/sagemaker/hyperparameter_tuning/component.yaml") -sagemaker_process_op = components.load_component_from_file("../../../../components/aws/sagemaker/process/component.yaml") -sagemaker_train_op = components.load_component_from_file("../../../../components/aws/sagemaker/train/component.yaml") -sagemaker_model_op = components.load_component_from_file("../../../../components/aws/sagemaker/model/component.yaml") -sagemaker_deploy_op = components.load_component_from_file("../../../../components/aws/sagemaker/deploy/component.yaml") -sagemaker_batch_transform_op = components.load_component_from_file("../../../../components/aws/sagemaker/batch_transform/component.yaml") - -# Update this to match the name of your bucket -my_bucket_name = "my-bucket" +sagemaker_hpo_op = components.load_component_from_file( + "../../../../components/aws/sagemaker/hyperparameter_tuning/component.yaml" +) +sagemaker_process_op = components.load_component_from_file( + "../../../../components/aws/sagemaker/process/component.yaml" +) +sagemaker_train_op = components.load_component_from_file( + "../../../../components/aws/sagemaker/train/component.yaml" +) +sagemaker_model_op = components.load_component_from_file( + "../../../../components/aws/sagemaker/model/component.yaml" +) +sagemaker_deploy_op = components.load_component_from_file( + "../../../../components/aws/sagemaker/deploy/component.yaml" +) +sagemaker_batch_transform_op = components.load_component_from_file( + "../../../../components/aws/sagemaker/batch_transform/component.yaml" +) def processing_input(input_name, s3_uri, local_path): @@ -26,7 +33,6 @@ def processing_input(input_name, s3_uri, local_path): "LocalPath": local_path, "S3DataType": "S3Prefix", "S3InputMode": "File", - "S3CompressionType": "None", }, } @@ -45,220 +51,124 @@ def processing_output(output_name, s3_uri, local_path): def training_input(input_name, s3_uri): return { "ChannelName": input_name, - "DataSource": { - "S3DataSource": { - "S3Uri": s3_uri, - "S3DataType": "S3Prefix", - "S3DataDistributionType": "FullyReplicated", - } - }, - "CompressionType": "None", - "RecordWrapperType": "None", - "InputMode": "File", + "DataSource": {"S3DataSource": {"S3Uri": s3_uri, "S3DataType": "S3Prefix"}}, } -hpoChannels = [ - training_input( - "train", f"s3://{my_bucket_name}/mnist_kmeans_example/input/train_data" - ), - training_input( - "test", f"s3://{my_bucket_name}/mnist_kmeans_example/input/test_data" - ), -] -trainChannels = [ - training_input( - "train", f"s3://{my_bucket_name}/mnist_kmeans_example/input/train_data" - ) -] - - @dsl.pipeline( name="MNIST Classification pipeline", description="MNIST Classification using KMEANS in SageMaker", ) -def mnist_classification( - region="us-east-1", - # General component inputs - output_encryption_key="", - instance_type="ml.m5.2xlarge", - instance_count=1, - volume_size=50, - max_run_time=3600, - endpoint_url="", - network_isolation=True, - traffic_encryption=False, - role_arn="", - # Pre-processing inputs - process_instance_type="ml.m5.large", - process_instance_count=1, - process_entrypoint=["python", "/opt/ml/processing/code/kmeans_preprocessing.py"], - process_image="763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-training:1.5.0-cpu-py36-ubuntu16.04", - process_input_config=[ - processing_input( - "mnist_tar", - "s3://sagemaker-sample-data-us-east-1/algorithms/kmeans/mnist/mnist.pkl.gz", - "/opt/ml/processing/input", - ), - processing_input( - "source_code", - f"s3://{my_bucket_name}/mnist_kmeans_example/processing_code/kmeans_preprocessing.py", - "/opt/ml/processing/code", - ), - ], - process_output_config=[ - processing_output( - "train_data", - f"s3://{my_bucket_name}/mnist_kmeans_example/input/", - "/opt/ml/processing/output_train/", - ), - processing_output( - "test_data", - f"s3://{my_bucket_name}/mnist_kmeans_example/input/", - "/opt/ml/processing/output_test/", - ), - processing_output( - "valid_data", - f"s3://{my_bucket_name}/mnist_kmeans_example/input/", - "/opt/ml/processing/output_valid/", - ), - ], - # HyperParameter Tuning inputs - hpo_strategy="Bayesian", - hpo_metric_name="test:msd", - hpo_metric_type="Minimize", - hpo_early_stopping_type="Off", - hpo_static_parameters={"k": "10", "feature_dim": "784"}, - hpo_integer_parameters=[ - {"Name": "mini_batch_size", "MinValue": "500", "MaxValue": "600"}, - {"Name": "extra_center_factor", "MinValue": "10", "MaxValue": "20"}, - ], - hpo_continuous_parameters=[], - hpo_categorical_parameters=[ - {"Name": "init_method", "Values": ["random", "kmeans++"]} - ], - hpo_channels=hpoChannels, - hpo_spot_instance=False, - hpo_max_wait_time=3600, - hpo_checkpoint_config={}, - hpo_max_num_jobs=9, - hpo_max_parallel_jobs=3, - # Training inputs - train_image="382416733822.dkr.ecr.us-east-1.amazonaws.com/kmeans:1", - train_input_mode="File", - train_output_location=f"s3://{my_bucket_name}/mnist_kmeans_example/output", - train_channels=trainChannels, - train_spot_instance=False, - train_max_wait_time=3600, - train_checkpoint_config={}, - # Batch transform inputs - batch_transform_instance_type="ml.m4.xlarge", - batch_transform_input=f"s3://{my_bucket_name}/mnist_kmeans_example/input", - batch_transform_data_type="S3Prefix", - batch_transform_content_type="text/csv", - batch_transform_compression_type="None", - batch_transform_ouput=f"s3://{my_bucket_name}/mnist_kmeans_example/output", - batch_transform_max_concurrent=4, - batch_transform_max_payload=6, - batch_strategy="MultiRecord", - batch_transform_split_type="Line", -): +def mnist_classification(role_arn="", bucket_name=""): + # Common component inputs + region = "us-east-1" + instance_type = "ml.m5.2xlarge" + train_image = "382416733822.dkr.ecr.us-east-1.amazonaws.com/kmeans:1" + + # Training input and output location based on bucket name + hpo_channels = [ + training_input("train", f"s3://{bucket_name}/mnist_kmeans_example/train_data"), + training_input("test", f"s3://{bucket_name}/mnist_kmeans_example/test_data"), + ] + train_channels = [ + training_input("train", f"s3://{bucket_name}/mnist_kmeans_example/train_data") + ] + train_output_location = f"s3://{bucket_name}/mnist_kmeans_example/output" + process = sagemaker_process_op( role=role_arn, region=region, - image=process_image, + image="763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-training:1.5.0-cpu-py36-ubuntu16.04", instance_type=instance_type, - instance_count=process_instance_count, - volume_size=volume_size, - max_run_time=max_run_time, - container_entrypoint=process_entrypoint, - input_config=process_input_config, - output_config=process_output_config + container_entrypoint=[ + "python", + "/opt/ml/processing/code/kmeans_preprocessing.py", + ], + input_config=[ + processing_input( + "mnist_tar", + "s3://sagemaker-sample-data-us-east-1/algorithms/kmeans/mnist/mnist.pkl.gz", + "/opt/ml/processing/input", + ), + processing_input( + "source_code", + f"s3://{bucket_name}/mnist_kmeans_example/processing_code/kmeans_preprocessing.py", + "/opt/ml/processing/code", + ), + ], + output_config=[ + processing_output( + "train_data", + f"s3://{bucket_name}/mnist_kmeans_example/", + "/opt/ml/processing/output_train/", + ), + processing_output( + "test_data", + f"s3://{bucket_name}/mnist_kmeans_example/", + "/opt/ml/processing/output_test/", + ), + processing_output( + "valid_data", + f"s3://{bucket_name}/mnist_kmeans_example/input/", + "/opt/ml/processing/output_valid/", + ), + ], ) hpo = sagemaker_hpo_op( region=region, - endpoint_url=endpoint_url, image=train_image, - training_input_mode=train_input_mode, - strategy=hpo_strategy, - metric_name=hpo_metric_name, - metric_type=hpo_metric_type, - early_stopping_type=hpo_early_stopping_type, - static_parameters=hpo_static_parameters, - integer_parameters=hpo_integer_parameters, - continuous_parameters=hpo_continuous_parameters, - categorical_parameters=hpo_categorical_parameters, + metric_name="test:msd", + metric_type="Minimize", + static_parameters={"k": "10", "feature_dim": "784"}, + integer_parameters=[ + {"Name": "mini_batch_size", "MinValue": "500", "MaxValue": "600"}, + {"Name": "extra_center_factor", "MinValue": "10", "MaxValue": "20"}, + ], + categorical_parameters=[ + {"Name": "init_method", "Values": ["random", "kmeans++"]} + ], channels=hpo_channels, output_location=train_output_location, - output_encryption_key=output_encryption_key, instance_type=instance_type, - instance_count=instance_count, - volume_size=volume_size, - max_num_jobs=hpo_max_num_jobs, - max_parallel_jobs=hpo_max_parallel_jobs, - max_run_time=max_run_time, - network_isolation=network_isolation, - traffic_encryption=traffic_encryption, - spot_instance=hpo_spot_instance, - max_wait_time=hpo_max_wait_time, - checkpoint_config=hpo_checkpoint_config, + max_num_jobs=3, + max_parallel_jobs=2, role=role_arn, ).after(process) training = sagemaker_train_op( region=region, - endpoint_url=endpoint_url, image=train_image, - training_input_mode=train_input_mode, hyperparameters=hpo.outputs["best_hyperparameters"], channels=train_channels, instance_type=instance_type, - instance_count=instance_count, - volume_size=volume_size, - max_run_time=max_run_time, model_artifact_path=train_output_location, - output_encryption_key=output_encryption_key, - network_isolation=network_isolation, - traffic_encryption=traffic_encryption, - spot_instance=train_spot_instance, - max_wait_time=train_max_wait_time, - checkpoint_config=train_checkpoint_config, role=role_arn, ) create_model = sagemaker_model_op( region=region, - endpoint_url=endpoint_url, model_name=training.outputs["job_name"], image=training.outputs["training_image"], model_artifact_url=training.outputs["model_artifact_url"], - network_isolation=network_isolation, role=role_arn, ) - prediction = sagemaker_deploy_op( - region=region, endpoint_url=endpoint_url, model_name_1=create_model.output, + sagemaker_deploy_op( + region=region, model_name_1=create_model.output, ) - batch_transform = sagemaker_batch_transform_op( + sagemaker_batch_transform_op( region=region, - endpoint_url=endpoint_url, model_name=create_model.output, - instance_type=batch_transform_instance_type, - instance_count=instance_count, - max_concurrent=batch_transform_max_concurrent, - max_payload=batch_transform_max_payload, - batch_strategy=batch_strategy, - input_location=batch_transform_input, - data_type=batch_transform_data_type, - content_type=batch_transform_content_type, - split_type=batch_transform_split_type, - compression_type=batch_transform_compression_type, - output_location=batch_transform_ouput, + instance_type=instance_type, + batch_strategy="MultiRecord", + input_location=f"s3://{bucket_name}/mnist_kmeans_example/input", + content_type="text/csv", + split_type="Line", + output_location=f"s3://{bucket_name}/mnist_kmeans_example/output", ) if __name__ == "__main__": kfp.compiler.Compiler().compile(mnist_classification, __file__ + ".zip") -