Skip to content

Commit

Permalink
update region tags to standard format
Browse files Browse the repository at this point in the history
  • Loading branch information
alixhami committed Nov 7, 2018
1 parent 31a1393 commit 1487fe5
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 22 deletions.
8 changes: 4 additions & 4 deletions dataproc/list_clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@
import googleapiclient.discovery


# [START list_clusters]
# [START dataproc_list_clusters]
def list_clusters(dataproc, project, region):
result = dataproc.projects().regions().clusters().list(
projectId=project,
region=region).execute()
return result
# [END list_clusters]
# [END dataproc_list_clusters]


# [START get_client]
# [START dataproc_get_client]
def get_client():
"""Builds a client to the dataproc API."""
dataproc = googleapiclient.discovery.build('dataproc', 'v1')
return dataproc
# [END get_client]
# [END dataproc_get_client]


def main(project_id, region):
Expand Down
4 changes: 2 additions & 2 deletions dataproc/pyspark_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
environment.
"""

# [START pyspark]
# [START dataproc_pyspark_sort]
import pyspark

sc = pyspark.SparkContext()
rdd = sc.parallelize(['Hello,', 'world!', 'dog', 'elephant', 'panther'])
words = sorted(rdd.collect())
print(words)
# [END pyspark]
# [END dataproc_pyspark_sort]
4 changes: 2 additions & 2 deletions dataproc/pyspark_sort_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
information.
"""

# [START pyspark]
# [START dataproc_pyspark_sort_gcs]
import pyspark

sc = pyspark.SparkContext()
rdd = sc.textFile('gs://path-to-your-GCS-file')
print(sorted(rdd.collect()))
# [END pyspark]
# [END dataproc_pyspark_sort_gcs]
28 changes: 14 additions & 14 deletions dataproc/submit_job_to_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def download_output(project_id, cluster_id, output_bucket, job_id):
return bucket.blob(output_blob).download_as_string()


# [START create_cluster]
# [START dataproc_create_cluster]
def create_cluster(dataproc, project, zone, region, cluster_name):
print('Creating cluster...')
zone_uri = \
Expand Down Expand Up @@ -92,7 +92,7 @@ def create_cluster(dataproc, project, zone, region, cluster_name):
region=region,
body=cluster_data).execute()
return result
# [END create_cluster]
# [END dataproc_create_cluster]


def wait_for_cluster_creation(dataproc, project_id, region, cluster_name):
Expand All @@ -113,7 +113,7 @@ def wait_for_cluster_creation(dataproc, project_id, region, cluster_name):
break


# [START list_clusters_with_detail]
# [START dataproc_list_clusters_with_detail]
def list_clusters_with_details(dataproc, project, region):
result = dataproc.projects().regions().clusters().list(
projectId=project,
Expand All @@ -123,7 +123,7 @@ def list_clusters_with_details(dataproc, project, region):
print("{} - {}"
.format(cluster['clusterName'], cluster['status']['state']))
return result
# [END list_clusters_with_detail]
# [END dataproc_list_clusters_with_detail]


def get_cluster_id_by_name(cluster_list, cluster_name):
Expand All @@ -133,7 +133,7 @@ def get_cluster_id_by_name(cluster_list, cluster_name):
return cluster['clusterUuid'], cluster['config']['configBucket']


# [START submit_pyspark_job]
# [START dataproc_submit_pyspark_job]
def submit_pyspark_job(dataproc, project, region,
cluster_name, bucket_name, filename):
"""Submits the Pyspark job to the cluster, assuming `filename` has
Expand All @@ -156,21 +156,21 @@ def submit_pyspark_job(dataproc, project, region,
job_id = result['reference']['jobId']
print('Submitted job ID {}'.format(job_id))
return job_id
# [END submit_pyspark_job]
# [END dataproc_submit_pyspark_job]


# [START delete]
# [START dataproc_delete]
def delete_cluster(dataproc, project, region, cluster):
print('Tearing down cluster')
result = dataproc.projects().regions().clusters().delete(
projectId=project,
region=region,
clusterName=cluster).execute()
return result
# [END delete]
# [END dataproc_delete]


# [START wait]
# [START dataproc_wait]
def wait_for_job(dataproc, project, region, job_id):
print('Waiting for job to finish...')
while True:
Expand All @@ -184,16 +184,16 @@ def wait_for_job(dataproc, project, region, job_id):
elif result['status']['state'] == 'DONE':
print('Job finished.')
return result
# [END wait]
# [END dataproc_wait]


# [START get_client]
# [START dataproc_get_client]
def get_client():
"""Builds an http client authenticated with the service account
credentials."""
dataproc = googleapiclient.discovery.build('dataproc', 'v1')
return dataproc
# [END get_client]
# [END dataproc_get_client]


def main(project_id, zone, cluster_name, bucket_name,
Expand Down Expand Up @@ -221,11 +221,11 @@ def main(project_id, zone, cluster_name, bucket_name,
(cluster_id, output_bucket) = (
get_cluster_id_by_name(cluster_list, cluster_name))

# [START call_submit_pyspark_job]
# [START dataproc_call_submit_pyspark_job]
job_id = submit_pyspark_job(
dataproc, project_id, region,
cluster_name, bucket_name, spark_filename)
# [END call_submit_pyspark_job]
# [END dataproc_call_submit_pyspark_job]
wait_for_job(dataproc, project_id, region, job_id)

output = download_output(project_id, cluster_id, output_bucket, job_id)
Expand Down

0 comments on commit 1487fe5

Please sign in to comment.