Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: added cli functionality to dataproc quickstart example #2734

Merged
merged 7 commits into from
Jan 28, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 44 additions & 13 deletions dataproc/quickstart/quickstart.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,25 @@
# limitations under the License.

# [START dataproc_quickstart]
"""
This quickstart sample walks a user through creating a Cloud Dataproc
cluster, submitting a PySpark job from Google Cloud Storage to the
cluster, reading the output of the job and deleting the cluster, all
using the Python client library.

Usage:
python quickstart.py --project_id <PROJECT_ID> --region <REGION> \
--cluster_name <CLUSTER_NAME> --job_file_path <GCS_JOB_FILE_PATH>
"""

import argparse
import time

from google.cloud import dataproc_v1 as dataproc
from google.cloud import storage


def quickstart(project_id, region, cluster_name, job_file_path):
"""This quickstart sample walks a user through creating a Cloud Dataproc
cluster, submitting a PySpark job from Google Cloud Storage to the
cluster, reading the output of the job and deleting the cluster, all
using the Python client library.

Args:
project_id (string): Project to use for creating resources.
region (string): Region where the resources should live.
cluster_name (string): Name to use for creating a cluster.
job_file_path (string): Job in GCS to execute against the cluster.
"""

# Create the cluster client.
cluster_client = dataproc.ClusterControllerClient(client_options={
'api_endpoint': '{}-dataproc.googleapis.com:443'.format(region)
Expand Down Expand Up @@ -125,4 +125,35 @@ def quickstart(project_id, region, cluster_name, job_file_path):
operation.result()

print('Cluster {} successfully deleted.'.format(cluster_name))
# [END dataproc_quickstart]


if __name__ == "__main__":
parser = argparse.ArgumentParser(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't tested at all - if we are going to add CLI functionality, we should have some tests verifying that it still works.

Alternatively, I'd rather avoid CLI and instead give the user somewhere at the top of the code to set the variables instead.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add a test for this, but the motivation for CLI is to make this a runnable tool as-is without needing to modify code.

description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
'--project_id',
type=str,
required=True,
help='Project to use for creating resources.')
parser.add_argument(
'--region',
type=str,
required=True,
help='Region where the resources should live.')
parser.add_argument(
'--cluster_name',
type=str,
required=True,
help='Name to use for creating a cluster.')
parser.add_argument(
'--job_file_path',
type=str,
required=True,
help='Job in GCS to execute against the cluster.')

args = parser.parse_args()
quickstart(args.project_id, args.region,
args.cluster_name, args.job_file_path)
# [END dataproc_quickstart]
23 changes: 14 additions & 9 deletions dataproc/quickstart/quickstart_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
import os
import uuid
import pytest
import subprocess

from google.cloud import dataproc_v1 as dataproc
from google.cloud import storage

import quickstart


PROJECT_ID = os.environ['GCLOUD_PROJECT']
REGION = 'us-central1'
Expand All @@ -29,10 +28,10 @@
JOB_FILE_NAME = 'sum.py'
JOB_FILE_PATH = 'gs://{}/{}'.format(STAGING_BUCKET, JOB_FILE_NAME)
SORT_CODE = (
"import pyspark\n"
"sc = pyspark.SparkContext()\n"
"rdd = sc.parallelize((1,2,3,4,5))\n"
"sum = rdd.reduce(lambda x, y: x + y)\n"
"import pyspark\n"
"sc = pyspark.SparkContext()\n"
"rdd = sc.parallelize((1,2,3,4,5))\n"
"sum = rdd.reduce(lambda x, y: x + y)\n"
)


Expand Down Expand Up @@ -60,10 +59,16 @@ def setup_teardown():
blob.delete()


def test_quickstart(capsys):
quickstart.quickstart(PROJECT_ID, REGION, CLUSTER_NAME, JOB_FILE_PATH)
def test_quickstart():
command = [
'python', 'quickstart/quickstart.py',
'--project_id', PROJECT_ID,
'--region', REGION,
'--cluster_name', CLUSTER_NAME,
'--job_file_path', JOB_FILE_PATH
]
out = subprocess.check_output(command).decode("utf-8")

out, _ = capsys.readouterr()
assert 'Cluster created successfully' in out
assert 'Submitted job' in out
assert 'finished with state DONE:' in out
Expand Down