Skip to content

Commit dc5ed73

Browse files
GaoleMengrelease-please[bot]gcf-owl-bot[bot]
authored
feat: expose settings to configure default missing value interpretation. (#2230)
* chore(main): release 2.41.1 (#2222) Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com> * chore(main): release 2.41.1 (#2222) Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com> * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: expose configuration to config the default missing value interpretation --------- Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com> Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent df686d6 commit dc5ed73

File tree

6 files changed

+213
-0
lines changed

6 files changed

+213
-0
lines changed

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.api.gax.batching.FlowController;
2121
import com.google.api.gax.rpc.FixedHeaderProvider;
2222
import com.google.auto.value.AutoValue;
23+
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation;
2324
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.ProtoData;
2425
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError;
2526
import com.google.cloud.bigquery.storage.v1.StreamConnection.DoneCallback;
@@ -388,6 +389,11 @@ ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows,
388389
requestBuilder.setWriteStream(streamWriter.getStreamName());
389390
requestBuilder.putAllMissingValueInterpretations(
390391
streamWriter.getMissingValueInterpretationMap());
392+
if (streamWriter.getDefaultValueInterpretation()
393+
!= MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED) {
394+
requestBuilder.setDefaultMissingValueInterpretation(
395+
streamWriter.getDefaultValueInterpretation());
396+
}
391397
return appendInternal(streamWriter, requestBuilder.build());
392398
}
393399

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,22 @@ public Builder setCompressorName(String compressorName) {
346346
return this;
347347
}
348348

349+
/**
350+
* Sets the default missing value interpretation value if the column is not presented in the
351+
* missing_value_interpretations map.
352+
*
353+
* <p>If this value is set to `DEFAULT_VALUE`, we will always populate default value if the
354+
* field is missing from json and default value is defined in the column.
355+
*
356+
* <p>If this value is set to `NULL_VALUE`, we will always not populate default value.
357+
*/
358+
public Builder setDefaultMissingValueInterpretation(
359+
AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation) {
360+
this.schemaAwareStreamWriterBuilder.setDefaultMissingValueInterpretation(
361+
defaultMissingValueInterpretation);
362+
return this;
363+
}
364+
349365
/**
350366
* Builds JsonStreamWriter
351367
*

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/SchemaAwareStreamWriter.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.api.gax.core.CredentialsProvider;
2121
import com.google.api.gax.core.ExecutorProvider;
2222
import com.google.api.gax.rpc.TransportChannelProvider;
23+
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation;
2324
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError;
2425
import com.google.cloud.bigquery.storage.v1.Exceptions.RowIndexToErrorException;
2526
import com.google.common.base.Preconditions;
@@ -97,6 +98,8 @@ private SchemaAwareStreamWriter(Builder<T> builder)
9798
builder.compressorName);
9899
streamWriterBuilder.setEnableConnectionPool(builder.enableConnectionPool);
99100
streamWriterBuilder.setLocation(builder.location);
101+
streamWriterBuilder.setDefaultMissingValueInterpretation(
102+
builder.defaultMissingValueInterpretation);
100103
this.streamWriter = streamWriterBuilder.build();
101104
this.streamName = builder.streamName;
102105
this.tableSchema = builder.tableSchema;
@@ -433,6 +436,9 @@ public static final class Builder<T> {
433436
private String location;
434437
private String compressorName;
435438

439+
private AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation =
440+
MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED;
441+
436442
private static final String streamPatternString =
437443
"(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+";
438444
private static final String tablePatternString = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)";
@@ -627,6 +633,16 @@ public Builder<T> setCompressorName(String compressorName) {
627633
return this;
628634
}
629635

636+
/**
637+
* Sets the default missing value interpretation value if the column is not presented in the
638+
* missing_value_interpretations map.
639+
*/
640+
public Builder setDefaultMissingValueInterpretation(
641+
AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation) {
642+
this.defaultMissingValueInterpretation = defaultMissingValueInterpretation;
643+
return this;
644+
}
645+
630646
/**
631647
* Builds SchemaAwareStreamWriter
632648
*

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.api.gax.rpc.TransportChannelProvider;
2323
import com.google.auto.value.AutoOneOf;
2424
import com.google.auto.value.AutoValue;
25+
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation;
2526
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.AppendRequestAndResponse;
2627
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.TableSchemaAndTimestamp;
2728
import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind;
@@ -90,6 +91,13 @@ public class StreamWriter implements AutoCloseable {
9091
*/
9192
private final String writerId = UUID.randomUUID().toString();
9293

94+
/**
95+
* The default missing value interpretation if the column has default value defined but not
96+
* presented in the missing value map.
97+
*/
98+
private AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation =
99+
MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED;
100+
93101
/**
94102
* Stream can access a single connection or a pool of connection depending on whether multiplexing
95103
* is enabled.
@@ -201,6 +209,7 @@ public static SingleConnectionOrConnectionPool ofConnectionPool(
201209
private StreamWriter(Builder builder) throws IOException {
202210
this.streamName = builder.streamName;
203211
this.writerSchema = builder.writerSchema;
212+
this.defaultMissingValueInterpretation = builder.defaultMissingValueInterpretation;
204213
BigQueryWriteSettings clientSettings = getBigQueryWriteSettings(builder);
205214
if (!builder.enableConnectionPool) {
206215
this.location = builder.location;
@@ -312,6 +321,10 @@ static boolean isDefaultStream(String streamName) {
312321
return streamMatcher.find();
313322
}
314323

324+
AppendRowsRequest.MissingValueInterpretation getDefaultValueInterpretation() {
325+
return defaultMissingValueInterpretation;
326+
}
327+
315328
static BigQueryWriteSettings getBigQueryWriteSettings(Builder builder) throws IOException {
316329
BigQueryWriteSettings.Builder settingsBuilder = null;
317330
if (builder.client != null) {
@@ -602,6 +615,10 @@ public static final class Builder {
602615

603616
private String compressorName = null;
604617

618+
// Default missing value interpretation value.
619+
private AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation =
620+
MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED;
621+
605622
private Builder(String streamName) {
606623
this.streamName = Preconditions.checkNotNull(streamName);
607624
this.client = null;
@@ -729,6 +746,16 @@ public Builder setCompressorName(String compressorName) {
729746
return this;
730747
}
731748

749+
/**
750+
* Sets the default missing value interpretation value if the column is not presented in the
751+
* missing_value_interpretations map.
752+
*/
753+
public Builder setDefaultMissingValueInterpretation(
754+
AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation) {
755+
this.defaultMissingValueInterpretation = defaultMissingValueInterpretation;
756+
return this;
757+
}
758+
732759
/** Builds the {@code StreamWriterV2}. */
733760
public StreamWriter build() throws IOException {
734761
return new StreamWriter(this);

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.google.cloud.bigquery.storage.test.Test.FooType;
3434
import com.google.cloud.bigquery.storage.test.Test.RepetitionType;
3535
import com.google.cloud.bigquery.storage.test.Test.UpdatedFooType;
36+
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation;
3637
import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool.Settings;
3738
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError;
3839
import com.google.cloud.bigquery.storage.v1.TableFieldSchema.Mode;
@@ -45,8 +46,10 @@
4546
import java.io.IOException;
4647
import java.math.BigDecimal;
4748
import java.math.RoundingMode;
49+
import java.util.ArrayList;
4850
import java.util.Arrays;
4951
import java.util.HashMap;
52+
import java.util.List;
5053
import java.util.Map;
5154
import java.util.UUID;
5255
import java.util.concurrent.ExecutionException;
@@ -64,6 +67,7 @@
6467

6568
@RunWith(JUnit4.class)
6669
public class JsonStreamWriterTest {
70+
6771
private static final int NUMERIC_SCALE = 9;
6872
private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/_default";
6973
private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/_default";
@@ -514,6 +518,9 @@ public void testSingleAppendMultipleSimpleJson() throws Exception {
514518
.getSerializedRows(i),
515519
expectedProto.toByteString());
516520
}
521+
assertEquals(
522+
testBigQueryWrite.getAppendRequests().get(0).getDefaultMissingValueInterpretation(),
523+
MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED);
517524
}
518525
}
519526

@@ -1015,6 +1022,79 @@ public void testSchemaUpdateInMultiplexing_singleConnection() throws Exception {
10151022
writer2.close();
10161023
}
10171024

1025+
@Test
1026+
public void testMissingValueInterpretation_multiplexingCase() throws Exception {
1027+
// Set min connection count to be 1 to force sharing connection.
1028+
ConnectionWorkerPool.setOptions(
1029+
Settings.builder().setMinConnectionsPerRegion(1).setMaxConnectionsPerRegion(1).build());
1030+
testBigQueryWrite.addResponse(
1031+
WriteStream.newBuilder()
1032+
.setName(TEST_STREAM)
1033+
.setTableSchema(TABLE_SCHEMA)
1034+
.setLocation("us")
1035+
.build());
1036+
testBigQueryWrite.addResponse(
1037+
WriteStream.newBuilder()
1038+
.setName(TEST_STREAM)
1039+
.setTableSchema(TABLE_SCHEMA)
1040+
.setLocation("us")
1041+
.build());
1042+
// The following two writers have different stream name and schema, but will share the same
1043+
// connection .
1044+
JsonStreamWriter writer1 =
1045+
getTestJsonStreamWriterBuilder(TEST_STREAM)
1046+
.setEnableConnectionPool(true)
1047+
.setLocation("us")
1048+
.setDefaultMissingValueInterpretation(MissingValueInterpretation.DEFAULT_VALUE)
1049+
.build();
1050+
JsonStreamWriter writer2 =
1051+
getTestJsonStreamWriterBuilder(TEST_STREAM_2)
1052+
.setEnableConnectionPool(true)
1053+
.setLocation("us")
1054+
.setDefaultMissingValueInterpretation(MissingValueInterpretation.NULL_VALUE)
1055+
.build();
1056+
1057+
long appendCountPerStream = 5;
1058+
for (int i = 0; i < appendCountPerStream * 4; i++) {
1059+
testBigQueryWrite.addResponse(createAppendResponse(i));
1060+
}
1061+
1062+
JSONObject foo = new JSONObject();
1063+
foo.put("foo", "aaa");
1064+
JSONArray jsonArr = new JSONArray();
1065+
jsonArr.put(foo);
1066+
List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
1067+
// In total insert append `appendCountPerStream` * 4 requests.
1068+
// We insert using the pattern of
1069+
// jsonStreamWriter1, jsonStreamWriter1, jsonStreamWriter2, jsonStreamWriter2
1070+
for (int i = 0; i < appendCountPerStream; i++) {
1071+
ApiFuture<AppendRowsResponse> appendFuture1 = writer1.append(jsonArr);
1072+
ApiFuture<AppendRowsResponse> appendFuture2 = writer1.append(jsonArr);
1073+
ApiFuture<AppendRowsResponse> appendFuture3 = writer2.append(jsonArr);
1074+
ApiFuture<AppendRowsResponse> appendFuture4 = writer2.append(jsonArr);
1075+
appendFuture1.get();
1076+
appendFuture2.get();
1077+
appendFuture3.get();
1078+
appendFuture4.get();
1079+
}
1080+
1081+
for (int i = 0; i < appendCountPerStream * 4; i++) {
1082+
AppendRowsRequest appendRowsRequest = testBigQueryWrite.getAppendRequests().get(i);
1083+
if (i % 4 <= 1) {
1084+
assertEquals(
1085+
appendRowsRequest.getDefaultMissingValueInterpretation(),
1086+
MissingValueInterpretation.DEFAULT_VALUE);
1087+
} else {
1088+
assertEquals(
1089+
appendRowsRequest.getDefaultMissingValueInterpretation(),
1090+
MissingValueInterpretation.NULL_VALUE);
1091+
}
1092+
}
1093+
1094+
writer1.close();
1095+
writer2.close();
1096+
}
1097+
10181098
@Test
10191099
public void testSchemaUpdateInMultiplexing_multipleWriterForSameStreamName() throws Exception {
10201100
// Set min connection count to be 1 to force sharing connection.

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import com.google.api.gax.rpc.StatusCode.Code;
3939
import com.google.api.gax.rpc.UnknownException;
4040
import com.google.cloud.bigquery.storage.test.Test.FooType;
41+
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest.MissingValueInterpretation;
4142
import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool.Settings;
4243
import com.google.cloud.bigquery.storage.v1.Exceptions.StreamWriterClosedException;
4344
import com.google.cloud.bigquery.storage.v1.StorageError.StorageErrorCode;
@@ -849,6 +850,73 @@ public void testProtoSchemaPiping_multiplexingCase() throws Exception {
849850
appendRowsRequest.getProtoRows().getWriterSchema(), ProtoSchema.getDefaultInstance());
850851
assertEquals(appendRowsRequest.getWriteStream(), TEST_STREAM_2);
851852
}
853+
assertEquals(
854+
appendRowsRequest.getDefaultMissingValueInterpretation(),
855+
MissingValueInterpretation.MISSING_VALUE_INTERPRETATION_UNSPECIFIED);
856+
}
857+
858+
writer1.close();
859+
writer2.close();
860+
}
861+
862+
@Test
863+
public void testDefaultValueInterpretation_multiplexingCase() throws Exception {
864+
// Use the shared connection mode.
865+
ConnectionWorkerPool.setOptions(
866+
Settings.builder().setMinConnectionsPerRegion(1).setMaxConnectionsPerRegion(1).build());
867+
ProtoSchema schema1 = createProtoSchema("Schema1");
868+
ProtoSchema schema2 = createProtoSchema("Schema2");
869+
StreamWriter writer1 =
870+
StreamWriter.newBuilder(TEST_STREAM_1, client)
871+
.setWriterSchema(schema1)
872+
.setLocation("US")
873+
.setEnableConnectionPool(true)
874+
.setMaxInflightRequests(1)
875+
.setDefaultMissingValueInterpretation(MissingValueInterpretation.DEFAULT_VALUE)
876+
.build();
877+
StreamWriter writer2 =
878+
StreamWriter.newBuilder(TEST_STREAM_2, client)
879+
.setWriterSchema(schema2)
880+
.setMaxInflightRequests(1)
881+
.setEnableConnectionPool(true)
882+
.setLocation("US")
883+
.setDefaultMissingValueInterpretation(MissingValueInterpretation.NULL_VALUE)
884+
.build();
885+
886+
long appendCountPerStream = 5;
887+
for (int i = 0; i < appendCountPerStream * 4; i++) {
888+
testBigQueryWrite.addResponse(createAppendResponse(i));
889+
}
890+
891+
// In total insert append `appendCountPerStream` * 4 requests.
892+
// We insert using the pattern of streamWriter1, streamWriter1, streamWriter2, streamWriter2
893+
for (int i = 0; i < appendCountPerStream; i++) {
894+
ApiFuture<AppendRowsResponse> appendFuture1 =
895+
writer1.append(createProtoRows(new String[] {String.valueOf(i)}), i * 4);
896+
ApiFuture<AppendRowsResponse> appendFuture2 =
897+
writer1.append(createProtoRows(new String[] {String.valueOf(i)}), i * 4 + 1);
898+
ApiFuture<AppendRowsResponse> appendFuture3 =
899+
writer2.append(createProtoRows(new String[] {String.valueOf(i)}), i * 4 + 2);
900+
ApiFuture<AppendRowsResponse> appendFuture4 =
901+
writer2.append(createProtoRows(new String[] {String.valueOf(i)}), i * 4 + 3);
902+
appendFuture1.get();
903+
appendFuture2.get();
904+
appendFuture3.get();
905+
appendFuture4.get();
906+
}
907+
908+
for (int i = 0; i < appendCountPerStream * 4; i++) {
909+
AppendRowsRequest appendRowsRequest = testBigQueryWrite.getAppendRequests().get(i);
910+
assertEquals(i, appendRowsRequest.getOffset().getValue());
911+
if (i % 4 <= 1) {
912+
assertEquals(
913+
appendRowsRequest.getDefaultMissingValueInterpretation(),
914+
MissingValueInterpretation.DEFAULT_VALUE);
915+
} else {
916+
assertEquals(
917+
appendRowsRequest.getDefaultMissingValueInterpretation(),
918+
MissingValueInterpretation.NULL_VALUE);
919+
}
852920
}
853921

854922
writer1.close();

0 commit comments

Comments
 (0)