@@ -64,7 +64,7 @@ def download_output(project_id, cluster_id, output_bucket, job_id):
6464 return bucket .blob (output_blob ).download_as_string ()
6565
6666
67- # [START create_cluster ]
67+ # [START dataproc_create_cluster ]
6868def create_cluster (dataproc , project , zone , region , cluster_name ):
6969 print ('Creating cluster...' )
7070 zone_uri = \
@@ -92,7 +92,7 @@ def create_cluster(dataproc, project, zone, region, cluster_name):
9292 region = region ,
9393 body = cluster_data ).execute ()
9494 return result
95- # [END create_cluster ]
95+ # [END dataproc_create_cluster ]
9696
9797
9898def wait_for_cluster_creation (dataproc , project_id , region , cluster_name ):
@@ -113,7 +113,7 @@ def wait_for_cluster_creation(dataproc, project_id, region, cluster_name):
113113 break
114114
115115
116- # [START list_clusters_with_detail ]
116+ # [START dataproc_list_clusters_with_detail ]
117117def list_clusters_with_details (dataproc , project , region ):
118118 result = dataproc .projects ().regions ().clusters ().list (
119119 projectId = project ,
@@ -123,7 +123,7 @@ def list_clusters_with_details(dataproc, project, region):
123123 print ("{} - {}"
124124 .format (cluster ['clusterName' ], cluster ['status' ]['state' ]))
125125 return result
126- # [END list_clusters_with_detail ]
126+ # [END dataproc_list_clusters_with_detail ]
127127
128128
129129def get_cluster_id_by_name (cluster_list , cluster_name ):
@@ -133,7 +133,7 @@ def get_cluster_id_by_name(cluster_list, cluster_name):
133133 return cluster ['clusterUuid' ], cluster ['config' ]['configBucket' ]
134134
135135
136- # [START submit_pyspark_job ]
136+ # [START dataproc_submit_pyspark_job ]
137137def submit_pyspark_job (dataproc , project , region ,
138138 cluster_name , bucket_name , filename ):
139139 """Submits the Pyspark job to the cluster, assuming `filename` has
@@ -156,21 +156,21 @@ def submit_pyspark_job(dataproc, project, region,
156156 job_id = result ['reference' ]['jobId' ]
157157 print ('Submitted job ID {}' .format (job_id ))
158158 return job_id
159- # [END submit_pyspark_job ]
159+ # [END dataproc_submit_pyspark_job ]
160160
161161
162- # [START delete ]
162+ # [START dataproc_delete ]
163163def delete_cluster (dataproc , project , region , cluster ):
164164 print ('Tearing down cluster' )
165165 result = dataproc .projects ().regions ().clusters ().delete (
166166 projectId = project ,
167167 region = region ,
168168 clusterName = cluster ).execute ()
169169 return result
170- # [END delete ]
170+ # [END dataproc_delete ]
171171
172172
173- # [START wait ]
173+ # [START dataproc_wait ]
174174def wait_for_job (dataproc , project , region , job_id ):
175175 print ('Waiting for job to finish...' )
176176 while True :
@@ -184,16 +184,16 @@ def wait_for_job(dataproc, project, region, job_id):
184184 elif result ['status' ]['state' ] == 'DONE' :
185185 print ('Job finished.' )
186186 return result
187- # [END wait ]
187+ # [END dataproc_wait ]
188188
189189
190- # [START get_client ]
190+ # [START dataproc_get_client ]
191191def get_client ():
192192 """Builds an http client authenticated with the service account
193193 credentials."""
194194 dataproc = googleapiclient .discovery .build ('dataproc' , 'v1' )
195195 return dataproc
196- # [END get_client ]
196+ # [END dataproc_get_client ]
197197
198198
199199def main (project_id , zone , cluster_name , bucket_name ,
@@ -221,11 +221,11 @@ def main(project_id, zone, cluster_name, bucket_name,
221221 (cluster_id , output_bucket ) = (
222222 get_cluster_id_by_name (cluster_list , cluster_name ))
223223
224- # [START call_submit_pyspark_job ]
224+ # [START dataproc_call_submit_pyspark_job ]
225225 job_id = submit_pyspark_job (
226226 dataproc , project_id , region ,
227227 cluster_name , bucket_name , spark_filename )
228- # [END call_submit_pyspark_job ]
228+ # [END dataproc_call_submit_pyspark_job ]
229229 wait_for_job (dataproc , project_id , region , job_id )
230230
231231 output = download_output (project_id , cluster_id , output_bucket , job_id )
0 commit comments