Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1283,6 +1283,22 @@ public DataflowPipelineJob run(Pipeline pipeline) {
options.as(SdkHarnessOptions.class).setEnableLogViaFnApi(true);
}

// Add use_gbek to dataflow_service_options if gbek is set.
List<String> dataflowServiceOptions = options.getDataflowServiceOptions();
if (dataflowServiceOptions == null) {
dataflowServiceOptions = new ArrayList<>();
}
if (!Strings.isNullOrEmpty(options.as(DataflowPipelineDebugOptions.class).getGbek())) {
if (!dataflowServiceOptions.contains("use_gbek")) {
dataflowServiceOptions.add("use_gbek");
}
} else if (dataflowServiceOptions.contains("use_gbek")) {
throw new IllegalArgumentException(
"Do not set use_gbek directly, pass in the --gbek pipeline option "
+ "with a valid secret instead.");
}
options.setDataflowServiceOptions(dataflowServiceOptions);

logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
logWarningIfBigqueryDLQUnused(pipeline);
if (shouldActAsStreaming(pipeline)) {
Expand Down
14 changes: 12 additions & 2 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -602,8 +602,15 @@ def _check_and_add_missing_options(options):
debug_options = options.view_as(DebugOptions)
dataflow_service_options = options.view_as(
GoogleCloudOptions).dataflow_service_options or []
options.view_as(
GoogleCloudOptions).dataflow_service_options = dataflow_service_options

# Add use_gbek to dataflow_service_options if gbek is set.
if options.view_as(SetupOptions).gbek:
if 'use_gbek' not in dataflow_service_options:
dataflow_service_options.append('use_gbek')
elif 'use_gbek' in dataflow_service_options:
raise ValueError(
'Do not set use_gbek directly, pass in the --gbek pipeline option '
'with a valid secret instead.')

_add_runner_v2_missing_options(options)

Expand All @@ -614,6 +621,9 @@ def _check_and_add_missing_options(options):
elif debug_options.lookup_experiment('enable_prime'):
dataflow_service_options.append('enable_prime')

options.view_as(
GoogleCloudOptions).dataflow_service_options = dataflow_service_options

sdk_location = options.view_as(SetupOptions).sdk_location
if 'dev' in beam.version.__version__ and sdk_location == 'default':
raise ValueError(
Expand Down
Loading