Skip to content

Commit

Permalink
feat!: migrate to use microgen (#71)
Browse files Browse the repository at this point in the history
* feat!: migrate to use microgen

* update

* update

* update

* update
  • Loading branch information
arithmetic1728 authored Aug 10, 2020
1 parent 4c4563b commit 162b852
Show file tree
Hide file tree
Showing 12 changed files with 350 additions and 299 deletions.
32 changes: 14 additions & 18 deletions dataproc/snippets/create_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,38 +38,34 @@ def create_cluster(project_id, region, cluster_name):
"""

# Create a client with the endpoint set to the desired cluster region.
cluster_client = dataproc.ClusterControllerClient(client_options={
'api_endpoint': f'{region}-dataproc.googleapis.com:443',
})
cluster_client = dataproc.ClusterControllerClient(
client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)

# Create the cluster config.
cluster = {
'project_id': project_id,
'cluster_name': cluster_name,
'config': {
'master_config': {
'num_instances': 1,
'machine_type_uri': 'n1-standard-1'
},
'worker_config': {
'num_instances': 2,
'machine_type_uri': 'n1-standard-1'
}
}
"project_id": project_id,
"cluster_name": cluster_name,
"config": {
"master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-1"},
"worker_config": {"num_instances": 2, "machine_type_uri": "n1-standard-1"},
},
}

# Create the cluster.
operation = cluster_client.create_cluster(project_id, region, cluster)
operation = cluster_client.create_cluster(
request={"project_id": project_id, "region": region, "cluster": cluster}
)
result = operation.result()

# Output a success message.
print(f'Cluster created successfully: {result.cluster_name}')
print(f"Cluster created successfully: {result.cluster_name}")
# [END dataproc_create_cluster]


if __name__ == "__main__":
if len(sys.argv) < 4:
sys.exit('python create_cluster.py project_id region cluster_name')
sys.exit("python create_cluster.py project_id region cluster_name")

project_id = sys.argv[1]
region = sys.argv[2]
Expand Down
20 changes: 13 additions & 7 deletions dataproc/snippets/create_cluster_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,26 @@
import create_cluster


PROJECT_ID = os.environ['GOOGLE_CLOUD_PROJECT']
REGION = 'us-central1'
CLUSTER_NAME = 'py-cc-test-{}'.format(str(uuid.uuid4()))
PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"]
REGION = "us-central1"
CLUSTER_NAME = "py-cc-test-{}".format(str(uuid.uuid4()))


@pytest.fixture(autouse=True)
def teardown():
yield

cluster_client = dataproc.ClusterControllerClient(client_options={
'api_endpoint': f'{REGION}-dataproc.googleapis.com:443'
})
cluster_client = dataproc.ClusterControllerClient(
client_options={"api_endpoint": f"{REGION}-dataproc.googleapis.com:443"}
)
# Client library function
operation = cluster_client.delete_cluster(PROJECT_ID, REGION, CLUSTER_NAME)
operation = cluster_client.delete_cluster(
request={
"project_id": PROJECT_ID,
"region": REGION,
"cluster_name": CLUSTER_NAME,
}
)
# Wait for cluster to delete
operation.result()

Expand Down
11 changes: 5 additions & 6 deletions dataproc/snippets/dataproc_e2e_donttest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@

import submit_job_to_cluster

PROJECT = os.environ['GOOGLE_CLOUD_PROJECT']
BUCKET = os.environ['CLOUD_STORAGE_BUCKET']
CLUSTER_NAME = 'testcluster3'
ZONE = 'us-central1-b'
PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"]
BUCKET = os.environ["CLOUD_STORAGE_BUCKET"]
CLUSTER_NAME = "testcluster3"
ZONE = "us-central1-b"


def test_e2e():
output = submit_job_to_cluster.main(
PROJECT, ZONE, CLUSTER_NAME, BUCKET)
output = submit_job_to_cluster.main(PROJECT, ZONE, CLUSTER_NAME, BUCKET)
assert b"['Hello,', 'dog', 'elephant', 'panther', 'world!']" in output
68 changes: 29 additions & 39 deletions dataproc/snippets/instantiate_inline_workflow_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,71 +36,61 @@ def instantiate_inline_workflow_template(project_id, region):

# Create a client with the endpoint set to the desired region.
workflow_template_client = dataproc.WorkflowTemplateServiceClient(
client_options={
'api_endpoint': f'{region}-dataproc.googleapis.com:443'
}
client_options={"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
)

parent = workflow_template_client.region_path(project_id, region)
parent = "projects/{}/regions/{}".format(project_id, region)

template = {
'jobs': [
"jobs": [
{
'hadoop_job': {
'main_jar_file_uri': 'file:///usr/lib/hadoop-mapreduce/'
'hadoop-mapreduce-examples.jar',
'args': [
'teragen',
'1000',
'hdfs:///gen/'
]
"hadoop_job": {
"main_jar_file_uri": "file:///usr/lib/hadoop-mapreduce/"
"hadoop-mapreduce-examples.jar",
"args": ["teragen", "1000", "hdfs:///gen/"],
},
'step_id': 'teragen'
"step_id": "teragen",
},
{
'hadoop_job': {
'main_jar_file_uri': 'file:///usr/lib/hadoop-mapreduce/'
'hadoop-mapreduce-examples.jar',
'args': [
'terasort',
'hdfs:///gen/',
'hdfs:///sort/'
]
"hadoop_job": {
"main_jar_file_uri": "file:///usr/lib/hadoop-mapreduce/"
"hadoop-mapreduce-examples.jar",
"args": ["terasort", "hdfs:///gen/", "hdfs:///sort/"],
},
'step_id': 'terasort',
'prerequisite_step_ids': [
'teragen'
]
}],
'placement': {
'managed_cluster': {
'cluster_name': 'my-managed-cluster',
'config': {
'gce_cluster_config': {
"step_id": "terasort",
"prerequisite_step_ids": ["teragen"],
},
],
"placement": {
"managed_cluster": {
"cluster_name": "my-managed-cluster",
"config": {
"gce_cluster_config": {
# Leave 'zone_uri' empty for 'Auto Zone Placement'
# 'zone_uri': ''
'zone_uri': 'us-central1-a'
"zone_uri": "us-central1-a"
}
}
},
}
}
},
}

# Submit the request to instantiate the workflow from an inline template.
operation = workflow_template_client.instantiate_inline_workflow_template(
parent, template
request={"parent": parent, "template": template}
)
operation.result()

# Output a success message.
print('Workflow ran successfully.')
print("Workflow ran successfully.")
# [END dataproc_instantiate_inline_workflow_template]


if __name__ == "__main__":
if len(sys.argv) < 3:
sys.exit('python instantiate_inline_workflow_template.py '
+ 'project_id region')
sys.exit(
"python instantiate_inline_workflow_template.py " + "project_id region"
)

project_id = sys.argv[1]
region = sys.argv[2]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import instantiate_inline_workflow_template


PROJECT_ID = os.environ['GOOGLE_CLOUD_PROJECT']
REGION = 'us-central1'
PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"]
REGION = "us-central1"


def test_workflows(capsys):
Expand Down
43 changes: 24 additions & 19 deletions dataproc/snippets/list_clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,45 +19,50 @@
import argparse

from google.cloud import dataproc_v1
from google.cloud.dataproc_v1.gapic.transports import (
cluster_controller_grpc_transport)
from google.cloud.dataproc_v1.gapic.transports import cluster_controller_grpc_transport


# [START dataproc_list_clusters]
def list_clusters(dataproc, project, region):
"""List the details of clusters in the region."""
for cluster in dataproc.list_clusters(project, region):
print(('{} - {}'.format(cluster.cluster_name,
cluster.status.State.Name(
cluster.status.state))))
for cluster in dataproc.list_clusters(
request={"project_id": project, "region": region}
):
print(
(
"{} - {}".format(
cluster.cluster_name,
cluster.status.State.Name(cluster.status.state),
)
)
)


# [END dataproc_list_clusters]


def main(project_id, region):

if region == 'global':
if region == "global":
# Use the default gRPC global endpoints.
dataproc_cluster_client = dataproc_v1.ClusterControllerClient()
else:
# Use a regional gRPC endpoint. See:
# https://cloud.google.com/dataproc/docs/concepts/regional-endpoints
client_transport = (
cluster_controller_grpc_transport.ClusterControllerGrpcTransport(
address='{}-dataproc.googleapis.com:443'.format(region)))
dataproc_cluster_client = dataproc_v1.ClusterControllerClient(
client_transport)
client_transport = cluster_controller_grpc_transport.ClusterControllerGrpcTransport(
address="{}-dataproc.googleapis.com:443".format(region)
)
dataproc_cluster_client = dataproc_v1.ClusterControllerClient(client_transport)

list_clusters(dataproc_cluster_client, project_id, region)


if __name__ == '__main__':
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=(
argparse.RawDescriptionHelpFormatter))
parser.add_argument(
'--project_id', help='Project ID to access.', required=True)
parser.add_argument(
'--region', help='Region of clusters to list.', required=True)
description=__doc__, formatter_class=(argparse.RawDescriptionHelpFormatter)
)
parser.add_argument("--project_id", help="Project ID to access.", required=True)
parser.add_argument("--region", help="Region of clusters to list.", required=True)

args = parser.parse_args()
main(args.project_id, args.region)
2 changes: 1 addition & 1 deletion dataproc/snippets/pyspark_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import pyspark

sc = pyspark.SparkContext()
rdd = sc.parallelize(['Hello,', 'world!', 'dog', 'elephant', 'panther'])
rdd = sc.parallelize(["Hello,", "world!", "dog", "elephant", "panther"])
words = sorted(rdd.collect())
print(words)
# [END dataproc_pyspark_sort]
2 changes: 1 addition & 1 deletion dataproc/snippets/pyspark_sort_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@
import pyspark

sc = pyspark.SparkContext()
rdd = sc.textFile('gs://path-to-your-GCS-file')
rdd = sc.textFile("gs://path-to-your-GCS-file")
print(sorted(rdd.collect()))
# [END dataproc_pyspark_sort_gcs]
Loading

0 comments on commit 162b852

Please sign in to comment.