Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Dataproc region tags to standard format #1826

Merged
merged 1 commit into from
Nov 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions dataproc/list_clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@
import googleapiclient.discovery


# [START list_clusters]
# [START dataproc_list_clusters]
def list_clusters(dataproc, project, region):
result = dataproc.projects().regions().clusters().list(
projectId=project,
region=region).execute()
return result
# [END list_clusters]
# [END dataproc_list_clusters]


# [START get_client]
# [START dataproc_get_client]
def get_client():
"""Builds a client to the dataproc API."""
dataproc = googleapiclient.discovery.build('dataproc', 'v1')
return dataproc
# [END get_client]
# [END dataproc_get_client]


def main(project_id, region):
Expand Down
4 changes: 2 additions & 2 deletions dataproc/pyspark_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
environment.
"""

# [START pyspark]
# [START dataproc_pyspark_sort]
import pyspark

sc = pyspark.SparkContext()
rdd = sc.parallelize(['Hello,', 'world!', 'dog', 'elephant', 'panther'])
words = sorted(rdd.collect())
print(words)
# [END pyspark]
# [END dataproc_pyspark_sort]
4 changes: 2 additions & 2 deletions dataproc/pyspark_sort_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
information.
"""

# [START pyspark]
# [START dataproc_pyspark_sort_gcs]
import pyspark

sc = pyspark.SparkContext()
rdd = sc.textFile('gs://path-to-your-GCS-file')
print(sorted(rdd.collect()))
# [END pyspark]
# [END dataproc_pyspark_sort_gcs]
28 changes: 14 additions & 14 deletions dataproc/submit_job_to_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def download_output(project_id, cluster_id, output_bucket, job_id):
return bucket.blob(output_blob).download_as_string()


# [START create_cluster]
# [START dataproc_create_cluster]
def create_cluster(dataproc, project, zone, region, cluster_name):
print('Creating cluster...')
zone_uri = \
Expand Down Expand Up @@ -92,7 +92,7 @@ def create_cluster(dataproc, project, zone, region, cluster_name):
region=region,
body=cluster_data).execute()
return result
# [END create_cluster]
# [END dataproc_create_cluster]


def wait_for_cluster_creation(dataproc, project_id, region, cluster_name):
Expand All @@ -113,7 +113,7 @@ def wait_for_cluster_creation(dataproc, project_id, region, cluster_name):
break


# [START list_clusters_with_detail]
# [START dataproc_list_clusters_with_detail]
def list_clusters_with_details(dataproc, project, region):
result = dataproc.projects().regions().clusters().list(
projectId=project,
Expand All @@ -123,7 +123,7 @@ def list_clusters_with_details(dataproc, project, region):
print("{} - {}"
.format(cluster['clusterName'], cluster['status']['state']))
return result
# [END list_clusters_with_detail]
# [END dataproc_list_clusters_with_detail]


def get_cluster_id_by_name(cluster_list, cluster_name):
Expand All @@ -133,7 +133,7 @@ def get_cluster_id_by_name(cluster_list, cluster_name):
return cluster['clusterUuid'], cluster['config']['configBucket']


# [START submit_pyspark_job]
# [START dataproc_submit_pyspark_job]
def submit_pyspark_job(dataproc, project, region,
cluster_name, bucket_name, filename):
"""Submits the Pyspark job to the cluster, assuming `filename` has
Expand All @@ -156,21 +156,21 @@ def submit_pyspark_job(dataproc, project, region,
job_id = result['reference']['jobId']
print('Submitted job ID {}'.format(job_id))
return job_id
# [END submit_pyspark_job]
# [END dataproc_submit_pyspark_job]


# [START delete]
# [START dataproc_delete]
def delete_cluster(dataproc, project, region, cluster):
print('Tearing down cluster')
result = dataproc.projects().regions().clusters().delete(
projectId=project,
region=region,
clusterName=cluster).execute()
return result
# [END delete]
# [END dataproc_delete]


# [START wait]
# [START dataproc_wait]
def wait_for_job(dataproc, project, region, job_id):
print('Waiting for job to finish...')
while True:
Expand All @@ -184,16 +184,16 @@ def wait_for_job(dataproc, project, region, job_id):
elif result['status']['state'] == 'DONE':
print('Job finished.')
return result
# [END wait]
# [END dataproc_wait]


# [START get_client]
# [START dataproc_get_client]
def get_client():
"""Builds an http client authenticated with the service account
credentials."""
dataproc = googleapiclient.discovery.build('dataproc', 'v1')
return dataproc
# [END get_client]
# [END dataproc_get_client]


def main(project_id, zone, cluster_name, bucket_name,
Expand Down Expand Up @@ -221,11 +221,11 @@ def main(project_id, zone, cluster_name, bucket_name,
(cluster_id, output_bucket) = (
get_cluster_id_by_name(cluster_list, cluster_name))

# [START call_submit_pyspark_job]
# [START dataproc_call_submit_pyspark_job]
job_id = submit_pyspark_job(
dataproc, project_id, region,
cluster_name, bucket_name, spark_filename)
# [END call_submit_pyspark_job]
# [END dataproc_call_submit_pyspark_job]
wait_for_job(dataproc, project_id, region, job_id)

output = download_output(project_id, cluster_id, output_bucket, job_id)
Expand Down