Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Website restructure and Dynamic EBS volumes for karpenter userdata #221

Merged
merged 5 commits into from
Jun 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion analytics/cdk/stream-emr-on-eks/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ Config
**/.coverage
**/.coveragerc
**/coverage-reports/
*.jar
*.jar
2 changes: 1 addition & 1 deletion analytics/cdk/stream-emr-on-eks/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ FROM 895885662937.dkr.ecr.us-west-2.amazonaws.com/spark/emr-6.5.0:latest
USER root
RUN pip3 install --upgrade boto3 pandas numpy
COPY spark-sql-kinesis_2.12-1.2.0_spark-3.0.jar ${SPARK_HOME}/jars/
USER hadoop:hadoop
USER hadoop:hadoop
1 change: 0 additions & 1 deletion analytics/cdk/stream-emr-on-eks/LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,3 @@ FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

46 changes: 23 additions & 23 deletions analytics/cdk/stream-emr-on-eks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,51 +6,51 @@ It includes sample data, Kafka producer simulator, and a consumer example that c
The infrastructure deployment includes the following:
- A new S3 bucket to store sample data and stream job code
- An EKS cluster v1.24 in a new VPC across 2 AZs
- The Cluster has 2 default managed node groups: the OnDemand nodegroup scales from 1 to 5, SPOT instance nodegroup can scale from 1 to 30.
- The Cluster has 2 default managed node groups: the OnDemand nodegroup scales from 1 to 5, SPOT instance nodegroup can scale from 1 to 30.
- It also has a Fargate profile labelled with the value `serverless`
- An EMR virtual cluster in the same VPC
- The virtual cluster links to `emr` namespace
- The virtual cluster links to `emr` namespace
- The namespace accommodates two types of Spark jobs, ie. run on managed node group or serverless job on Fargate
- All EMR on EKS configuration are done, including fine-grained access controls for pods by the AWS native solution IAM roles for service accounts
- A MSK Cluster in the same VPC with 2 brokers in total. Kafka version is 2.8.1
- A Cloud9 IDE as the command line environment in the demo.
- A Cloud9 IDE as the command line environment in the demo.
- Kafka Client tool will be installed on the Cloud9 IDE
- An EMR on EC2 cluster with managed scaling enabled.
- 1 primary and 1 core nodes with r5.xlarge.
- configured to run one Spark job at a time.
- can scale from 1 to 10 core + task nodes
- mounted EFS for checkpointing test/demo (a bootstrap action)
- mounted EFS for checkpointing test/demo (a bootstrap action)

## Spark examples - read stream from MSK
Spark consumer applications reading from Amazon MSK:

* [1. Run a job with EMR on EKS](#1-submit-a-job-with-emr-on-eks)
* [2. Same job with Fargate on EMR on EKS](#2-EMR-on-EKS-with-Fargate)
* [3. Same job with EMR on EC2](#3-optional-Submit-step-to-EMR-on-EC2)
* [1. Run a job with EMR on EKS](#1-submit-a-job-with-emr-on-eks)
* [2. Same job with Fargate on EMR on EKS](#2-EMR-on-EKS-with-Fargate)
* [3. Same job with EMR on EC2](#3-optional-Submit-step-to-EMR-on-EC2)

## Spark examples - read stream from Kinesis
* [1. (Optional) Build a custom docker image](#1-optional-Build-custom-docker-image)
* [2. Run a job with kinesis-sql connector](#2-Use-kinesis-sql-connector)
* [3. Run a job with Spark's DStream](#3-use-spark-s-dstream)
* [1. (Optional) Build a custom docker image](#1-optional-Build-custom-docker-image)
* [2. Run a job with kinesis-sql connector](#2-Use-kinesis-sql-connector)
* [3. Run a job with Spark's DStream](#3-use-spark-s-dstream)

## Deploy Infrastructure

The provisioning takes about 30 minutes to complete.
The provisioning takes about 30 minutes to complete.
Two ways to deploy:
1. AWS CloudFormation template (CFN)
1. AWS CloudFormation template (CFN)
2. [AWS Cloud Development Kit (AWS CDK)](https://docs.aws.amazon.com/cdk/latest/guide/home.html).

### CloudFormation Deployment

| Region | Launch Template |
| --------------------------- | ----------------------- |
| --------------------------- | ----------------------- |
**US East (N. Virginia)**| [![Deploy to AWS](source/app_resources/00-deploy-to-aws.png)](https://console.aws.amazon.com/cloudformation/home?region=us-east-1#/stacks/quickcreate?stackName=emr-stream-demo&templateURL=https://blogpost-sparkoneks-us-east-1.s3.amazonaws.com/emr-stream-demo/v2.0.0/emr-stream-demo.template)
**US East (N. Virginia)**| [![Deploy to AWS](source/app_resources/00-deploy-to-aws.png)](https://console.aws.amazon.com/cloudformation/home?region=us-east-1#/stacks/quickcreate?stackName=emr-stream-demo&templateURL=https://blogpost-sparkoneks-us-east-1.s3.amazonaws.com/emr-stream-demo/v2.0.0/emr-stream-demo.template)

* To launch in a different AWS Region, check out the following customization section, or use the CDK deployment option.

### Customization
You can customize the solution, such as set to a different region, then generate the CFN templates in your required region:
You can customize the solution, such as set to a different region, then generate the CFN templates in your required region:
```bash
export BUCKET_NAME_PREFIX=<my-bucket-name> # bucket where customized code will reside
export AWS_REGION=<your-region>
Expand All @@ -71,7 +71,7 @@ echo -e "\nIn web browser, paste the URL to launch the template: https://console

### CDK Deployment

#### Prerequisites
#### Prerequisites
Install the folowing tools:
1. [Python 3.6 +](https://www.python.org/downloads/).
2. [Node.js 10.3.0 +](https://nodejs.org/en/)
Expand All @@ -92,12 +92,12 @@ cdk deploy

The following `post-deployment.sh` is executable in Linux, not for Mac OSX. Modify the script if needed.

1. Open the "Kafka Client" IDE in Cloud9 console. Create one if the Cloud9 IDE doesn't exist.
1. Open the "Kafka Client" IDE in Cloud9 console. Create one if the Cloud9 IDE doesn't exist.
```
VPC prefix: 'emr-stream-demo'
Instance Type: 't3.small'
```
2. [Attach the IAM role that contains `Cloud9Admin` to your IDE](https://catalog.us-east-1.prod.workshops.aws/workshops/d90c2f2d-a84b-4e80-b4f9-f5cee0614426/en-US/30-emr-serverless/31-set-up-env#setup-cloud9-ide).
2. [Attach the IAM role that contains `Cloud9Admin` to your IDE](https://catalog.us-east-1.prod.workshops.aws/workshops/d90c2f2d-a84b-4e80-b4f9-f5cee0614426/en-US/30-emr-serverless/31-set-up-env#setup-cloud9-ide).

3. Turn off AWS managed temporary credentials in Cloud9:
```bash
Expand Down Expand Up @@ -127,7 +127,7 @@ kafka_2.12-2.8.1/bin/kafka-console-consumer.sh \
```

## MSK integration
### 1. Submit a job with EMR on EKS
### 1. Submit a job with EMR on EKS

- [Sample job](deployment/app_code/job/msk_consumer.py) to consume data stream in MSK
- Submit the job:
Expand All @@ -145,7 +145,7 @@ aws emr-containers start-job-run \
--configuration-overrides '{
"applicationConfiguration": [
{
"classification": "spark-defaults",
"classification": "spark-defaults",
"properties": {
"spark.kubernetes.driver.podTemplateFile":"s3://'$S3BUCKET'/app_code/job/driver_template.yaml","spark.kubernetes.executor.podTemplateFile":"s3://'$S3BUCKET'/app_code/job/executor_template.yaml"
}
Expand Down Expand Up @@ -193,7 +193,7 @@ aws emr-containers start-job-run \
"sparkSubmitParameters": "--conf spark.jars.packages=org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.7 --conf spark.cleaner.referenceTracking.cleanCheckpoints=true --conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.driver.memory=2G --conf spark.executor.cores=2 --conf spark.kubernetes.driver.label.type=serverless --conf spark.kubernetes.executor.label.type=serverless"}}' \
--configuration-overrides '{
"monitoringConfiguration": {
"s3MonitoringConfiguration": {"logUri": "s3://'${S3BUCKET}'/elasticmapreduce/emreksfg-log/"}}}'
"s3MonitoringConfiguration": {"logUri": "s3://'${S3BUCKET}'/elasticmapreduce/emreksfg-log/"}}}'
```
### Verify the job is running on EKS Fargate
```bash
Expand All @@ -211,7 +211,7 @@ kafka_2.12-2.8.1/bin/kafka-console-consumer.sh \

```bash
cluster_id=$(aws emr list-clusters --cluster-states WAITING --query 'Clusters[?Name==`emr-stream-demo`].Id' --output text)
MSK_SERVER=$(echo $MSK_SERVER | cut -d',' -f 2)
MSK_SERVER=$(echo $MSK_SERVER | cut -d',' -f 2)

aws emr add-steps \
--cluster-id $cluster_id \
Expand Down Expand Up @@ -252,7 +252,7 @@ docker push $ECR_URL/emr6.5_custom_boto3
```

### 2. Use kinesis-sql connector
This demo uses the `com.qubole.spark/spark-sql-kinesis_2.12/1.2.0-spark_3.0` connector to interact with Kinesis.
This demo uses the `com.qubole.spark/spark-sql-kinesis_2.12/1.2.0-spark_3.0` connector to interact with Kinesis.

To enable the job-level access control, ie. the [IRSA feature](https://docs.aws.amazon.com/eks/latest/userguide/iam-roles-for-service-accounts.html), we have forked the [kinesis-sql git repo](https://github.com/aws-samples/kinesis-sql) and recompiled a new jar after upgraded the AWS java SDK. The custom docker build above will pick up the upgraded connector automatically.

Expand Down Expand Up @@ -321,7 +321,7 @@ aws emr-containers start-job-run \
"monitoringConfiguration": {
"s3MonitoringConfiguration": {"logUri": "s3://'${S3BUCKET}'/elasticmapreduce/kinesis-fargate-log/"}
}
}'
}'
```

## Useful commands
Expand Down
2 changes: 1 addition & 1 deletion analytics/cdk/stream-emr-on-eks/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@
CfnOutput(eks_stack, "FargateVirtualClusterId",value=eks_stack.EMRFargateVC)
CfnOutput(eks_stack, "EMRExecRoleARN", value=eks_stack.EMRExecRole)

app.synth()
app.synth()
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ if [[ -z "$1" || -z "$2" ]]
echo "Missing mandatory arguments: File system ID, region"
exit 1
fi

# get file system id from input argument
fs_id=$1

# get region from input argument
region_id=$2

Expand All @@ -22,7 +22,7 @@ do
times=$(( $times + 1 ))
echo Attempt $times at verifying efs $fs_id is available...
done

# verify mount target is ready
times=0
echo
Expand All @@ -32,21 +32,21 @@ do
times=$(( $times + 1 ))
echo Attempt $times at verifying efs $fs_id mount target is available...
done

# create local path to mount efs
sudo mkdir -p /efs

# mount efs
until sudo mount -t nfs4 \
-o nfsvers=4.1,rsize=1048576,wsize=1048576,hard,timeo=600,retrans=2 \
$fs_id.efs.$region_id.amazonaws.com:/ \
/efs; do echo "Shared filesystem no ready yet..." ; sleep 5; done

cd /efs

# give hadoop user permission to efs directory
sudo chown -R hadoop:hadoop .

if grep $fs_id /proc/mounts; then
echo "File system is mounted successfully."
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ apiVersion: v1
kind: Pod
spec:
nodeSelector:
eks.amazonaws.com/capacityType: SPOT
eks.amazonaws.com/capacityType: SPOT
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
.option("startingOffsets", "latest") \
.option("auto.offset.reset", "latest") \
.load() \
.selectExpr("decode(CAST(value AS STRING),'utf-8') as value")
.selectExpr("decode(CAST(value AS STRING),'utf-8') as value")

taxiRidesSchema = StructType([ \
StructField("rideId", LongType()), StructField("isStart", StringType()), \
StructField("endTime", TimestampType()), StructField("startTime", TimestampType()), \
Expand All @@ -28,9 +28,9 @@

def parse_data_from_kafka_message(sdf, schema):
assert sdf.isStreaming == True, "DataFrame doesn't receive streaming data"
col = split(sdf['value'], ',')
for idx, field in enumerate(schema):
sdf = sdf.withColumn(field.name, col.getItem(idx).cast(field.dataType))
col = split(sdf['value'], ',')
for idx, field in enumerate(schema):
sdf = sdf.withColumn(field.name, col.getItem(idx).cast(field.dataType))
if field.name=="timestamp":
sdf = sdf.withColumn(field.name, current_timestamp())
return sdf.select([field.name for field in schema])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
# StreamName=stream_name,
# Data=json.dumps(message),
# PartitionKey='part_key')


# start Spark process, read from kinesis
appName = "PythonStreamingKinesisAsl"
Expand All @@ -60,4 +60,4 @@ def format_sample(x):
parsed.pprint()

ssc.start()
ssc.awaitTermination()
ssc.awaitTermination()
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ aws emr-containers start-job-run \
"monitoringConfiguration": {
"s3MonitoringConfiguration": {"logUri": "s3://'${S3BUCKET}'/elasticmapreduce/kinesis-fargate-log/"}
}
}'
}'
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
StructField("message_type", StringType()),
StructField("count", IntegerType())])


kinesis.selectExpr('CAST(data AS STRING)')\
.select(from_json('data', schema).alias('data'))\
.select('data.*')\
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ aws emr-containers start-job-run \
"monitoringConfiguration": {
"s3MonitoringConfiguration": {"logUri": "s3://'${S3BUCKET}'/elasticmapreduce/kinesis-fargate-log/"}
}
}'
}'
9 changes: 4 additions & 5 deletions analytics/cdk/stream-emr-on-eks/deployment/build-s3-dist.sh
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ sed -i '' -e $replace $template_dist_dir/*.template
if [ -z "$4" ]; then
replace="s/%%TEMPLATE_OUTPUT_BUCKET%%/$1"-"${AWS_REGION}/g"
else
replace="s/%%TEMPLATE_OUTPUT_BUCKET%%/$4/g"
replace="s/%%TEMPLATE_OUTPUT_BUCKET%%/$4/g"
fi

echo "sed -i '' -e $replace $template_dist_dir/*.template"
Expand Down Expand Up @@ -185,7 +185,7 @@ for d in `find . -mindepth 1 -maxdepth 1 -type d`; do
cd $staging_dist_dir/$fname
rm -rf $venv_folder
zip -grq $staging_dist_dir/$fname.zip .

elif ls *.js 1> /dev/null 2>&1; then
echo "===================================="
echo "This is Node runtime"
Expand All @@ -207,7 +207,7 @@ for d in `find . -mindepth 1 -maxdepth 1 -type d`; do
# Zip the artifact
echo "zip -r $staging_dist_dir/$fname"
zip -rq $staging_dist_dir/$fname.zip .
fi
fi

cd $staging_dist_dir
# Copy the zipped artifact from /staging to /regional-s3-assets
Expand All @@ -230,7 +230,7 @@ for d in `find . -mindepth 1 -maxdepth 1`; do
pfname="$(basename -- $d)"
fname="$(echo $pfname | sed -e 's/asset./asset/g')"
mv $d $build_dist_dir/$fname
done
done

echo "------------------------------------------------------------------------------"
echo "[Cleanup] Remove temporary files"
Expand All @@ -239,4 +239,3 @@ echo "--------------------------------------------------------------------------
# Delete the temporary /staging folder
echo "rm -rf $staging_dist_dir"
rm -rf $staging_dist_dir

Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ them for use with the AWS Solutions publishing pipeline. This function performs

#### Lambda function preparation

Replaces the AssetParameter-style properties that identify source code for Lambda functions with the common variables
used by the AWS Solutions publishing pipeline.
Replaces the AssetParameter-style properties that identify source code for Lambda functions with the common variables
used by the AWS Solutions publishing pipeline.

- `Code.S3Bucket` is assigned the `%%BUCKET_NAME%%` placeholder value.
- `Code.S3Key` is assigned the `%%SOLUTION_NAME%%`/`%%VERSION%%` placeholder value.
Expand Down Expand Up @@ -98,8 +98,8 @@ After CloudFormation deployment:

#### Template cleanup

Cleans-up the parameters section and improves readability by removing the AssetParameter-style fields that would have
been used to specify Lambda source code properties. This allows solution-specific parameters to be highlighted and
Cleans-up the parameters section and improves readability by removing the AssetParameter-style fields that would have
been used to specify Lambda source code properties. This allows solution-specific parameters to be highlighted and
removes unnecessary clutter.

Before:
Expand Down Expand Up @@ -149,4 +149,4 @@ After:
```

***
&copy; Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
&copy; Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ fs.readdirSync(global_s3_assets).forEach(file => {
const rsrctype=[
"AWS::Lambda::Function",
"AWS::Lambda::LayerVersion",
"Custom::CDKBucketDeployment",
"Custom::CDKBucketDeployment",
"AWS::CloudFormation::Stack",
"AWS::CloudFront::Distribution"
]
Expand All @@ -68,7 +68,7 @@ fs.readdirSync(global_s3_assets).forEach(file => {
else if (fn.Properties.hasOwnProperty('Content') && fn.Properties.Content.hasOwnProperty('S3Bucket')) {
// Set Lambda::LayerVersion S3 bucket reference
fn.Properties.Content.S3Key = `%%SOLUTION_NAME%%/%%VERSION%%/asset`+fn.Properties.Content.S3Key;
fn.Properties.Content.S3Bucket = {'Fn::Sub': '%%BUCKET_NAME%%-${AWS::Region}'};
fn.Properties.Content.S3Bucket = {'Fn::Sub': '%%BUCKET_NAME%%-${AWS::Region}'};
}
else if (fn.Properties.hasOwnProperty('SourceBucketNames')) {
// Set CDKBucketDeployment S3 bucket reference
Expand Down Expand Up @@ -112,9 +112,9 @@ fs.readdirSync(global_s3_assets).forEach(file => {
}
}
});

//6. Output modified template file
const output_template = JSON.stringify(template, null, 2);
fs.writeFileSync(`${global_s3_assets}/${file}`, output_template);
}
});
});
2 changes: 1 addition & 1 deletion analytics/cdk/stream-emr-on-eks/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
-e .
# pytest
# pytest
Loading