Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,12 @@ public Job translate(List<DataflowPackage> packages) {
if (options.getDiskSizeGb() > 0) {
workerPool.setDiskSizeGb(options.getDiskSizeGb());
}
if (options.getDiskProvisionedIops() != null) {
workerPool.setDiskProvisionedIops(options.getDiskProvisionedIops());
}
if (options.getDiskProvisionedThroughputMibps() != null) {
workerPool.setDiskProvisionedThroughputMibps(options.getDiskProvisionedThroughputMibps());
}
AutoscalingSettings settings = new AutoscalingSettings();
if (options.getAutoscalingAlgorithm() != null) {
settings.setAlgorithm(options.getAutoscalingAlgorithm().getAlgorithm());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,17 @@ public String getAlgorithm() {

void setWorkerDiskType(String value);

@Description("IOPS provisioned for the root disk for VMs. If zero or " +
"unspecified, the service will attempt to choose a reasonable default.")
Long getDiskProvisionedIops();
void setDiskProvisionedIops(Long diskProvisionedIops);

@Description("Throughput provisioned in MiB/s for the root disk for VMs. If zero or " +
"unspecified, the service will attempt to choose a reasonable default.")
Long getDiskProvisionedThroughputMibps();
void setDiskProvisionedThroughputMibps(Long diskProvisionedThroughputMibps);


/**
* Specifies whether worker pools should be started with public IP addresses.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,30 @@ public void testDiskSizeGbConfig() throws IOException {
assertEquals(diskSizeGb, job.getEnvironment().getWorkerPools().get(0).getDiskSizeGb());
}

@Test
public void testDiskProvisioningTranslation() {
DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
options.setDiskProvisionedIops(Long.valueOf(7000));
options.setDiskProvisionedThroughputMibps(Long.valueOf(250));
options.setProject("test-project"); // Required for translator

WorkerPool pool = translateWorkerPool(options);

assertEquals(Long.valueOf(7000), pool.getDiskProvisionedIops());
assertEquals(Long.valueOf(250), pool.getDiskProvisionedThroughputMibps());
}

@Test
public void testDiskProvisioningTranslationDefaults() {
DataflowPipelineOptions options = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
options.setProject("test-project"); // Required for translator

WorkerPool pool = translateWorkerPool(options);

assertNull(pool.getDiskProvisionedIops());
assertNull(pool.getDiskProvisionedThroughputMibps());
}

/** A composite transform that returns an output that is unrelated to the input. */
private static class UnrelatedOutputCreator
extends PTransform<PCollection<Integer>, PCollection<Integer>> {
Expand Down
6 changes: 6 additions & 0 deletions sdks/go/pkg/beam/runners/dataflow/dataflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ var (
maxNumWorkers = flag.Int64("max_num_workers", 0, "Maximum number of workers during scaling (optional).")
diskSizeGb = flag.Int64("disk_size_gb", 0, "Size of root disk for VMs, in GB (optional).")
diskType = flag.String("disk_type", "", "Type of root disk for VMs (optional).")
diskProvisionedIOPS = flag.Int64("disk_provisioned_iops", 0, "Provisioned IOPS for the root disk for VMs (optional).")
diskProvisionedThroughputMibps = flag.Int64("disk_provisioned_throughput_mibps", 0, "Provisioned throughput for the root disk for VMs (optional).")
autoscalingAlgorithm = flag.String("autoscaling_algorithm", "", "Autoscaling mode to use (optional).")
zone = flag.String("zone", "", "GCP zone (optional)")
kmsKey = flag.String("dataflow_kms_key", "", "The Cloud KMS key identifier used to encrypt data at rest (optional).")
Expand Down Expand Up @@ -115,6 +117,8 @@ var flagFilter = map[string]bool{
"max_num_workers": true,
"disk_size_gb": true,
"disk_type": true,
"disk_provisioned_iops": true,
"disk_provisioned_throughput_mibps": true,
"autoscaling_algorithm": true,
"zone": true,
"network": true,
Expand Down Expand Up @@ -396,6 +400,8 @@ func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions
WorkerHarnessThreads: *workerHarnessThreads,
DiskSizeGb: *diskSizeGb,
DiskType: *diskType,
DiskProvisionedIOPS: *diskProvisionedIOPS,
DiskProvisionedThroughputMibps: *diskProvisionedThroughputMibps,
Algorithm: *autoscalingAlgorithm,
FlexRSGoal: *flexRSGoal,
MachineType: *firstNonEmpty(workerMachineType, machineType),
Expand Down
4 changes: 4 additions & 0 deletions sdks/go/pkg/beam/runners/dataflow/dataflowlib/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type JobOptions struct {
NumWorkers int64
DiskSizeGb int64
DiskType string
DiskProvisionedIOPS int64
DiskProvisionedThroughputMibps int64
MachineType string
Labels map[string]string
ServiceAccountEmail string
Expand Down Expand Up @@ -191,6 +193,8 @@ func Translate(ctx context.Context, p *pipepb.Pipeline, opts *JobOptions, worker
},
DiskSizeGb: opts.DiskSizeGb,
DiskType: opts.DiskType,
DiskProvisionedIops: opts.DiskProvisionedIOPS,
DiskProvisionedThroughputMibps: opts.DiskProvisionedThroughputMibps,
IpConfiguration: ipConfiguration,
Kind: "harness",
Packages: packages,
Expand Down
20 changes: 20 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -1405,6 +1405,26 @@ def _add_argparse_args(cls, parser):
dest='disk_type',
default=None,
help=('Specifies what type of persistent disk should be used.'))
parser.add_argument(
'--disk_provisioned_iops',
type=int,
default=None,
dest='disk_provisioned_iops',
help=(
'The provisioned IOPS of the disk. If not set, the Dataflow service'
' will choose a reasonable default.'
),
)
parser.add_argument(
'--disk_provisioned_throughput_mibps',
type=int,
default=None,
dest='disk_provisioned_throughput_mibps',
help=(
'The provisioned throughput of the disk in MiB/s. If not set, the'
' Dataflow service will choose a reasonable default.'
),
)
parser.add_argument(
'--worker_region',
default=None,
Expand Down
6 changes: 6 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,12 +444,18 @@ def test_worker_options(self):
'abc',
'--disk_type',
'def',
'--disk_provisioned_iops',
'4000',
'--disk_provisioned_throughput_mibps',
'200',
'--element_processing_timeout_minutes',
'10',
])
worker_options = options.view_as(WorkerOptions)
self.assertEqual(worker_options.machine_type, 'abc')
self.assertEqual(worker_options.disk_type, 'def')
self.assertEqual(worker_options.disk_provisioned_iops, 4000)
self.assertEqual(worker_options.disk_provisioned_throughput_mibps, 200)
self.assertEqual(worker_options.element_processing_timeout_minutes, 10)

options = PipelineOptions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,12 @@ def __init__(
pool.diskSizeGb = self.worker_options.disk_size_gb
if self.worker_options.disk_type:
pool.diskType = self.worker_options.disk_type
if self.worker_options.disk_provisioned_iops:
pool.diskProvisionedIops = self.worker_options.disk_provisioned_iops
if self.worker_options.disk_provisioned_throughput_mibps:
pool.diskProvisionedThroughputMibps = (
self.worker_options.disk_provisioned_throughput_mibps
)
if self.worker_options.zone:
pool.zone = self.worker_options.zone
if self.worker_options.network:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,26 @@ def test_set_subnetwork(self):
env.proto.workerPools[0].subnetwork,
'/regions/MY/subnetworks/SUBNETWORK')

def test_set_disk_provisioning_options(self):
pipeline_options = PipelineOptions([
'--disk_provisioned_iops',
'4000',
'--disk_provisioned_throughput_mibps',
'200',
'--temp_location',
'gs://any-location/temp',
])
env = apiclient.Environment(
[], # packages
pipeline_options,
'2.0.0', # any environment version
FAKE_PIPELINE_URL,
)
self.assertEqual(env.proto.workerPools[0].diskProvisionedIops, 4000)
self.assertEqual(
env.proto.workerPools[0].diskProvisionedThroughputMibps, 200
)

def test_flexrs_blank(self):
pipeline_options = PipelineOptions(
['--temp_location', 'gs://any-location/temp'])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2685,6 +2685,11 @@ class FlexTemplateRuntimeEnvironment(_messages.Message):
zone](https://cloud.google.com/compute/docs/regions-zones/regions-zones)
for launching worker instances to run your pipeline. In the future,
worker_zone will take precedence.
diskProvisionedIops: Provisioned IOPS for the root disk for VMs. If zero or
unspecified, the service will attempt to choose a reasonable default.
diskProvisionedThroughputMibps: Provisioned throughput for the root disk
for VMs. If zero or unspecified, the service will attempt to choose a
reasonable default.
"""

class AutoscalingAlgorithmValueValuesEnum(_messages.Enum):
Expand Down Expand Up @@ -2804,6 +2809,10 @@ class AdditionalProperty(_messages.Message):
workerRegion = _messages.StringField(24)
workerZone = _messages.StringField(25)
zone = _messages.StringField(26)
diskProvisionedIops = _messages.IntegerField(
27, variant=_messages.Variant.INT64)
diskProvisionedThroughputMibps = _messages.IntegerField(
28, variant=_messages.Variant.INT64)


class FloatingPointList(_messages.Message):
Expand Down Expand Up @@ -7957,6 +7966,10 @@ class AdditionalProperty(_messages.Message):
teardownPolicy = _messages.EnumField('TeardownPolicyValueValuesEnum', 20)
workerHarnessContainerImage = _messages.StringField(21)
zone = _messages.StringField(22)
diskProvisionedIops = _messages.IntegerField(
23, variant=_messages.Variant.INT64)
diskProvisionedThroughputMibps = _messages.IntegerField(
24, variant=_messages.Variant.INT64)


class WorkerSettings(_messages.Message):
Expand Down
Loading