-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Add support for driver pool, instance flexibility policy, and min_num_instances for Dataproc #34172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -25,6 +25,7 @@ | |||||||||||
| import time | ||||||||||||
| import uuid | ||||||||||||
| import warnings | ||||||||||||
| from dataclasses import dataclass | ||||||||||||
| from datetime import datetime, timedelta | ||||||||||||
| from enum import Enum | ||||||||||||
| from typing import TYPE_CHECKING, Any, Sequence | ||||||||||||
|
|
@@ -76,6 +77,38 @@ class PreemptibilityType(Enum): | |||||||||||
| PREEMPTIBILITY_UNSPECIFIED = "PREEMPTIBILITY_UNSPECIFIED" | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| @dataclass | ||||||||||||
| class InstanceSelection: | ||||||||||||
| """Defines machines types and a rank to which the machines types belong. | ||||||||||||
|
|
||||||||||||
| Representation for | ||||||||||||
| google.cloud.dataproc.v1#google.cloud.dataproc.v1.InstanceFlexibilityPolicy.InstanceSelection. | ||||||||||||
|
|
||||||||||||
| :param machine_types: Full machine-type names, e.g. "n1-standard-16". | ||||||||||||
| :param rank: Preference of this instance selection. Lower number means higher preference. | ||||||||||||
| Dataproc will first try to create a VM based on the machine-type with priority rank and fallback | ||||||||||||
| to next rank based on availability. Machine types and instance selections with the same priority have | ||||||||||||
| the same preference. | ||||||||||||
| """ | ||||||||||||
|
|
||||||||||||
| machine_types: list[str] | ||||||||||||
| rank: int = 0 | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| @dataclass | ||||||||||||
| class InstanceFlexibilityPolicy: | ||||||||||||
| """ | ||||||||||||
| Instance flexibility Policy allowing a mixture of VM shapes and provisioning models. | ||||||||||||
|
|
||||||||||||
| Representation for google.cloud.dataproc.v1#google.cloud.dataproc.v1.InstanceFlexibilityPolicy. | ||||||||||||
|
|
||||||||||||
| :param instance_selection_list: List of instance selection options that the group will use when | ||||||||||||
| creating new VMs. | ||||||||||||
| """ | ||||||||||||
|
|
||||||||||||
| instance_selection_list: list[InstanceSelection] | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| class ClusterGenerator: | ||||||||||||
| """Create a new Dataproc Cluster. | ||||||||||||
|
|
||||||||||||
|
|
@@ -84,6 +117,11 @@ class ClusterGenerator: | |||||||||||
| to create the cluster. (templated) | ||||||||||||
| :param num_workers: The # of workers to spin up. If set to zero will | ||||||||||||
| spin up cluster in a single node mode | ||||||||||||
| :param min_num_workers: The minimum number of primary worker instances to create. | ||||||||||||
|
||||||||||||
| If more than ``min_num_workers`` VMs are created out of ``num_workers``, the failed VMs will be | ||||||||||||
| deleted, cluster is resized to available VMs and set to RUNNING. | ||||||||||||
| If created VMs are less than ``min_num_workers``, the cluster is placed in ERROR state. The failed | ||||||||||||
| VMs are not deleted. | ||||||||||||
| :param storage_bucket: The storage bucket to use, setting to None lets dataproc | ||||||||||||
| generate a custom one for you | ||||||||||||
| :param init_actions_uris: List of GCS uri's containing | ||||||||||||
|
|
@@ -152,12 +190,18 @@ class ClusterGenerator: | |||||||||||
| ``projects/[PROJECT_STORING_KEYS]/locations/[LOCATION]/keyRings/[KEY_RING_NAME]/cryptoKeys/[KEY_NAME]`` # noqa | ||||||||||||
| :param enable_component_gateway: Provides access to the web interfaces of default and selected optional | ||||||||||||
| components on the cluster. | ||||||||||||
| :param driver_pool_size: The number of driver nodes in the node group. | ||||||||||||
| :param driver_pool_id: The ID for the driver pool. Must be unique within the cluster. Use this ID to | ||||||||||||
| identify the driver group in future operations, such as resizing the node group. | ||||||||||||
| :param secondary_worker_instance_flexibility_policy: Instance flexibility Policy allowing a mixture of VM | ||||||||||||
| shapes and provisioning models. | ||||||||||||
| """ | ||||||||||||
|
|
||||||||||||
| def __init__( | ||||||||||||
| self, | ||||||||||||
| project_id: str, | ||||||||||||
| num_workers: int | None = None, | ||||||||||||
| min_num_workers: int | None = None, | ||||||||||||
| zone: str | None = None, | ||||||||||||
| network_uri: str | None = None, | ||||||||||||
| subnetwork_uri: str | None = None, | ||||||||||||
|
|
@@ -190,11 +234,15 @@ def __init__( | |||||||||||
| auto_delete_ttl: int | None = None, | ||||||||||||
| customer_managed_key: str | None = None, | ||||||||||||
| enable_component_gateway: bool | None = False, | ||||||||||||
| driver_pool_size: int = 0, | ||||||||||||
| driver_pool_id: str | None = None, | ||||||||||||
| secondary_worker_instance_flexibility_policy: InstanceFlexibilityPolicy | None = None, | ||||||||||||
| **kwargs, | ||||||||||||
| ) -> None: | ||||||||||||
| self.project_id = project_id | ||||||||||||
| self.num_masters = num_masters | ||||||||||||
| self.num_workers = num_workers | ||||||||||||
| self.min_num_workers = min_num_workers | ||||||||||||
| self.num_preemptible_workers = num_preemptible_workers | ||||||||||||
| self.preemptibility = self._set_preemptibility_type(preemptibility) | ||||||||||||
| self.storage_bucket = storage_bucket | ||||||||||||
|
|
@@ -227,6 +275,9 @@ def __init__( | |||||||||||
| self.customer_managed_key = customer_managed_key | ||||||||||||
| self.enable_component_gateway = enable_component_gateway | ||||||||||||
| self.single_node = num_workers == 0 | ||||||||||||
| self.driver_pool_size = driver_pool_size | ||||||||||||
| self.driver_pool_id = driver_pool_id | ||||||||||||
| self.secondary_worker_instance_flexibility_policy = secondary_worker_instance_flexibility_policy | ||||||||||||
|
|
||||||||||||
| if self.custom_image and self.image_version: | ||||||||||||
| raise ValueError("The custom_image and image_version can't be both set") | ||||||||||||
|
|
@@ -240,6 +291,15 @@ def __init__( | |||||||||||
| if self.single_node and self.num_preemptible_workers > 0: | ||||||||||||
| raise ValueError("Single node cannot have preemptible workers.") | ||||||||||||
|
|
||||||||||||
| if self.min_num_workers: | ||||||||||||
| if not self.num_workers: | ||||||||||||
| raise ValueError("Must specify num_workers when min_num_workers are provided.") | ||||||||||||
| if self.min_num_workers > self.num_workers: | ||||||||||||
| raise ValueError( | ||||||||||||
| "The value of min_num_workers must be less than or equal to num_workers. " | ||||||||||||
| f"Provided {self.min_num_workers}(min_num_workers) and {self.num_workers}(num_workers)." | ||||||||||||
| ) | ||||||||||||
|
|
||||||||||||
| def _set_preemptibility_type(self, preemptibility: str): | ||||||||||||
| return PreemptibilityType(preemptibility.upper()) | ||||||||||||
|
|
||||||||||||
|
|
@@ -306,6 +366,17 @@ def _build_lifecycle_config(self, cluster_data): | |||||||||||
|
|
||||||||||||
| return cluster_data | ||||||||||||
|
|
||||||||||||
| def _build_driver_pool(self): | ||||||||||||
| driver_pool = { | ||||||||||||
| "node_group": { | ||||||||||||
| "roles": ["DRIVER"], | ||||||||||||
| "node_group_config": {"num_instances": self.driver_pool_size}, | ||||||||||||
|
||||||||||||
| "node_group_config": {"num_instances": self.driver_pool_size}, | |
| "node_group_config": { | |
| "num_instances": self.driver_pool_size | |
| "machine_type_uri": self.driver_pool_machine_type | |
| }, |
Maybe this is too much config though? I'm fine with leaving this for another day.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let keep it these two for now. Later on, we can driver pool config (like InstanceFlexibilityPolicy) to cover rest of the config options.
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we raise an Exception at initialization time if min_num_workers > num_workers?
It's usually best to surface issues as early as possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Uh oh!
There was an error while loading. Please reload this page.