Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AWS EMR and Athena components #1286

Merged
merged 3 commits into from
May 7, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions components/aws/athena/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


FROM ubuntu:16.04

RUN apt-get update -y && apt-get install --no-install-recommends -y -q ca-certificates python-dev python-setuptools wget unzip

RUN easy_install pip

RUN pip install boto3==1.9.130 pathlib2

COPY query/src/query.py .

ENV PYTHONPATH /app

ENTRYPOINT [ "bash" ]
35 changes: 35 additions & 0 deletions components/aws/athena/query/component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name: Athena - Query
A Kubeflow Pipeline component to submit a query to Amazon Web Services Athena
service and dump outputs to AWS S3.

inputs:
- {name: region, description: 'The Athena region in which to handle the request.'}
- {name: database, description: 'The name of the database.'}
- {name: query, description: 'The SQL query statements to be executed in Athena.}
Jeffwan marked this conversation as resolved.
Show resolved Hide resolved
- {name: output_path, description: 'The path to the Amazon S3 location where logs for this cluster are stored.'}
outputs:
- {name: output_path, description: 'The path to the S3 bucket containing the query output in CSV format.'}
implementation:
container:
image: seedjeffwan/kubeflow-pipeline-aws-athena:20190501
command: ['python', 'query.py']
args: [
--region, {inputValue: region},
--database, {inputValue: database},
--query, {inputValue: query},
--output, {inputValue: output}
]
fileOutputs:
output_path: /output.txt
92 changes: 92 additions & 0 deletions components/aws/athena/query/src/query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import json
import logging
import time
import re

import boto3


def get_client(region=None):
"""Builds a client to the AWS Athena API."""
client = boto3.client('athena', region_name=region)
return client

def query(client, query, database, output):
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': database
},
ResultConfiguration={
'OutputLocation': output,
}
)

execution_id = response['QueryExecutionId']
logging.info('Execution ID: %s', execution_id)

# Athena query is aync call, we need to fetch results and wait for execution
state = 'RUNNING'
max_execution = 5 # TODO: this should be an optional parameter from users. or use timeout
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - can be a default param in input

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I will give to our users and see how do they like it. Change will be made and expose to users later either with max_execution or max_waiting_time.


while (max_execution > 0 and state in ['RUNNING']):
max_execution = max_execution - 1
response = client.get_query_execution(QueryExecutionId = execution_id)

if 'QueryExecution' in response and \
'Status' in response['QueryExecution'] and \
'State' in response['QueryExecution']['Status']:
state = response['QueryExecution']['Status']['State']
if state == 'FAILED':
raise Exception('Athena Query Failed')
elif state == 'SUCCEEDED':
s3_path = response['QueryExecution']['ResultConfiguration']['OutputLocation']
# could be multiple files?
filename = re.findall('.*\/(.*)', s3_path)[0]
logging.info("S3 output file name %s", filename)
break
time.sleep(5)

# TODO:(@Jeffwan) Add more details.
result = {
'total_bytes_processed': response['QueryExecution']['Statistics']['DataScannedInBytes'],
'filename': filename
}

return result

def main():
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument('--region', type=str, help='Athena region.')
parser.add_argument('--database', type=str, required=True, help='The name of the database.')
parser.add_argument('--query', type=str, required=True, help='The SQL query statements to be executed in Athena.')
parser.add_argument('--output', type=str, required=False,
help='The location in Amazon S3 where your query results are stored, such as s3://path/to/query/bucket/')

args = parser.parse_args()

client = get_client(args.region)
results = query(client, args.query, args.database, args.output)

results['output'] = output
logger.INFO("Athena results: %s", results)
with open('/output.txt', 'w+') as f:
json.dump(results, f)


if __name__ == '__main__':
main()
30 changes: 30 additions & 0 deletions components/aws/emr/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


FROM ubuntu:16.04

RUN apt-get update -y && apt-get install --no-install-recommends -y -q ca-certificates python-dev python-setuptools wget unzip

RUN easy_install pip

RUN pip install boto3==1.9.130 pathlib2

COPY create_cluster/src/create_cluster.py .
COPY delete_cluster/src/delete_cluster.py .
COPY submit_pyspark_job/src/submit_pyspark_job.py .
COPY submit_spark_job/src/submit_spark_job.py .
COPY common /app/common/

ENV PYTHONPATH /app

ENTRYPOINT [ "bash" ]
Empty file.
147 changes: 147 additions & 0 deletions components/aws/emr/common/_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import datetime
import os
import subprocess
import time

import boto3
from botocore.exceptions import ClientError
import json


def get_client(region=None):
"""Builds a client to the AWS EMR API."""
client = boto3.client('emr', region_name=region)
return client

def create_cluster(client, cluster_name, log_s3_uri, release_label, instance_type, instance_count):
"""Create a EMR cluster."""
response = client.run_job_flow(
Name=cluster_name,
LogUri=log_s3_uri,
ReleaseLabel=release_label,
Applications=[
{
'Name': 'Spark'
}
],
BootstrapActions=[
{
'Name': 'Maximize Spark Default Config',
'ScriptBootstrapAction': {
'Path': 's3://support.elasticmapreduce/spark/maximize-spark-default-config',
}
},
],
Instances= {
'MasterInstanceType': instance_type,
'SlaveInstanceType': instance_type,
'InstanceCount': instance_count,
'KeepJobFlowAliveWhenNoSteps':True,
'TerminationProtected':False,

},
VisibleToAllUsers=True,
JobFlowRole='EMR_EC2_DefaultRole',
ServiceRole='EMR_DefaultRole'
)
return response


def delete_cluster(client, jobflow_id):
"""Delete a EMR cluster. Cluster shutdowns in background"""
client.terminate_job_flows(JobFlowIds=[jobflow_id])

def wait_for_cluster(client, jobflow_id):
"""Waiting for a new cluster to be ready."""
while True:
response = client.describe_cluster(ClusterId=jobflow_id)
cluster_status = response['Cluster']['Status']
state = cluster_status['State']

if 'Message' in cluster_status['StateChangeReason']:
state = cluster_status['State']
message = cluster_status['StateChangeReason']['Message']

if state in ['TERMINATED', 'TERMINATED', 'TERMINATED_WITH_ERRORS']:
raise Exception(message)

if state == 'WAITING':
print('EMR cluster create completed')
break

print("Cluster state: {}, wait 15s for cluster to start up.".format(state))
time.sleep(15)

# Check following documentation to add other job type steps. Seems python SDK only have 'HadoopJarStep' here.
# https://docs.aws.amazon.com/cli/latest/reference/emr/add-steps.html
def submit_spark_job(client, jobflow_id, job_name, jar_path, main_class, extra_args):
"""Submits single spark job to a running cluster"""

spark_job = {
'Name': job_name,
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar'
}
}

spark_args = ['spark-submit', "--deploy-mode", "cluster"]
if main_class:
spark_args.extend(['--class', main_class])
spark_args.extend([jar_path])
spark_args.extend(extra_args)

spark_job['HadoopJarStep']['Args'] = spark_args

try:
response = client.add_job_flow_steps(
JobFlowId=jobflow_id,
Steps=[spark_job],
)
except ClientError as e:
print(e.response['Error']['Message'])
exit(1)

step_id = response['StepIds'][0]
print("Step Id {} has been submitted".format(step_id))
return step_id

def wait_for_job(client, jobflow_id, step_id):
"""Waiting for a cluster step by polling it."""
while True:
result = client.describe_step(ClusterId=jobflow_id, StepId=step_id)
step_status = result['Step']['Status']
state = step_status['State']

if state in ('CANCELLED', 'FAILED', 'INTERRUPTED'):
err_msg = 'UNKNOWN'
if 'FailureDetails' in step_status:
err_msg = step_status['FailureDetails']

raise Exception(err_msg)
elif state == 'COMPLETED':
print('EMR Step finishes')
break

print("Step state: {}, wait 15s for step status update.".format(state))
time.sleep(10)

def submit_pyspark_job(client, jobflow_id, job_name, py_file, extra_args):
"""Submits single spark job to a running cluster"""

pyspark_args = ['spark-submit', py_file]
pyspark_args.extend(extra_args)
return submit_spark_job(client, jobflow_id, job_name, 'command-runner.jar', '', pyspark_args)
38 changes: 38 additions & 0 deletions components/aws/emr/create_cluster/component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name: emr_create_cluster
description: |
Creates an Elastic Map Reduce (EMR) cluster in sepcific region.
inputs:
- {name: region, description: 'The EMR region in which to handle the request.'}
- {name: name, description: 'The EMR cluster name. Cluster names within a region must be unique. Names of deleted clusters can be reused'}
- {name: release_label, description: 'The EMR version.', default: 'emr-5.23.0'}
- {name: log_s3_uri, description: 'The path to the Amazon S3 location where logs for this cluster are stored.'}
- {name: instance_type, description: 'The EC2 instance type of master, the core and task nodes.', default: 'm4.xlarge'}
- {name: instance_count, description: 'The number of EC2 instances in the cluster.', default: '3'}
outputs:
- {name: cluster_name, description: 'The cluster name of the created cluster.'}
implementation:
container:
image: seedjeffwan/kubeflow-pipeline-aws-emr:20190507
command: ['python', 'create_cluster.py']
args: [
--region, {inputValue: region},
--name, {inputValue: name},
--release_label, {inputValue: release_label},
--log_s3_uri, {inputValue: log_s3_uri},
--instance_type, {inputValue: instance_type},
--instance_count, {inputValue: instance_count}
]
fileOutputs:
cluster_name: /output.txt
Loading