Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,14 @@
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.DescriptorProtos.DescriptorProto;
import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Int64Value;
import com.google.protobuf.Message;
import io.grpc.Status;
import io.grpc.Status.Code;
import java.io.ByteArrayOutputStream;
Expand Down Expand Up @@ -124,7 +127,7 @@ public class ITBigQueryStorageWriteClientTest {

// Arrow is a bit special in that timestamps are limited to nanoseconds precision.
// The data will be padded to fit into the higher precision columns.
public static final Object[][] INPUT_ARROW_WRITE_TIMESTAMPS =
private static final Object[][] INPUT_ARROW_WRITE_TIMESTAMPS =
new Object[][] {
{1735734896123456L /* 2025-01-01T12:34:56.123456Z */, 1735734896123456789L},
{1580646896123456L /* 2020-02-02T12:34:56.123456Z */, 1580646896123456789L},
Expand All @@ -134,14 +137,35 @@ public class ITBigQueryStorageWriteClientTest {

// Arrow's higher precision column is padded with extra 0's if configured to return
// ISO as output for any picosecond enabled column.
public static final Object[][] EXPECTED_ARROW_WRITE_TIMESTAMPS_ISO_OUTPUT =
private static final Object[][] EXPECTED_ARROW_WRITE_TIMESTAMPS_ISO_OUTPUT =
new Object[][] {
{1735734896123456L /* 2025-01-01T12:34:56.123456Z */, "2025-01-01T12:34:56.123456789000Z"},
{1580646896123456L /* 2020-02-02T12:34:56.123456Z */, "2020-02-02T12:34:56.123456789000Z"},
{636467696123456L /* 1990-03-03T12:34:56.123456Z */, "1990-03-03T12:34:56.123456789000Z"},
{165846896123456L /* 1975-04-04T12:34:56.123456Z */, "1975-04-04T12:34:56.123456789000Z"}
};

// Special case where users can use the Write API with Protobuf messages
// The format is two fields: 1. Seconds from epoch and 2. Subsecond fractional (millis, micros,
// nano, or pico). This test case is using picos sub-second fractional
private static final Long[][] INPUT_PROTO_DESCRIPTOR_WRITE_TIMESTAMPS =
new Long[][] {
{1735734896L, 123456789123L}, /* 2025-01-01T12:34:56.123456789123Z */
{1580646896L, 123456789123L}, /* 2020-02-02T12:34:56.123456789123Z */
{636467696L, 123456789123L}, /* 1990-03-03T12:34:56.123456789123Z */
{165846896L, 123456789123L} /* 1975-04-04T12:34:56.123456789123Z */
};

// Expected ISO8601 output when using proto descriptors to write to BQ with pico precision
private static final String[]
EXPECTED_PROTO_DESCRIPTOR_WRITE_TIMESTAMPS_HIGH_PRECISION_ISO_OUTPUT =
new String[] {
"2025-01-01T12:34:56.123456789123Z",
"2020-02-02T12:34:56.123456789123Z",
"1990-03-03T12:34:56.123456789123Z",
"1975-04-04T12:34:56.123456789123Z"
};

public static class StringWithSecondsNanos {
public String foo;
public long seconds;
Expand Down Expand Up @@ -2368,7 +2392,7 @@ public void timestamp_arrowWrite() throws IOException {
@Test
public void timestamp_protobufWrite()
throws IOException, DescriptorValidationException, InterruptedException {
String tableName = "bqstorage_timestamp_write_protobuf";
String tableName = "bqstorage_timestamp_write_protobuf_schema_aware";
// Opt to create a new table to write to instead of re-using table to prevent
// the test from failing due to any issues with deleting data after test.
// Increases the test time duration, but would be more resilient to transient
Expand Down Expand Up @@ -2417,6 +2441,130 @@ public void timestamp_protobufWrite()
assertTimestamps(tableName, EXPECTED_TIMESTAMPS_HIGHER_PRECISION_ISO_OUTPUT);
}

// Tests that users can use a Protobuf message that contains second a fractional
// part (pico) to be written to BQ
@Test
public void timestamp_protobufWrite_customMessage_higherPrecision()
throws IOException, DescriptorValidationException {
String tableName = "bqstorage_timestamp_write_protobuf_custom_descriptor";
// Opt to create a new table to write to instead of re-using table to prevent
// the test from failing due to any issues with deleting data after test.
// Increases the test time duration, but would be more resilient to transient
// failures
createTimestampTable(tableName);

/*
A sample protobuf format:
message Wrapper {
message TimestampPicos {
int64 seconds = 1;
int64 picoseconds = 2;
}
Wrapper timestampHigherPrecision = 1;
// ...
}
*/
String wrapperProtoName = "Wrapper";
String timestampPicosProtoName = "TimestampPicos";
String secondsProtoName = "seconds";
String picosProtoName = "picoseconds";
DescriptorProto timestampPicosDescriptor =
DescriptorProto.newBuilder()
.setName(timestampPicosProtoName)
.addField(
DescriptorProtos.FieldDescriptorProto.newBuilder()
.setName(secondsProtoName)
.setNumber(1)
.setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT64)
.build())
.addField(
DescriptorProtos.FieldDescriptorProto.newBuilder()
.setName(picosProtoName)
.setNumber(2)
.setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_INT64)
.build())
.build();
DescriptorProto wrapperDescriptor =
DescriptorProto.newBuilder()
.setName(wrapperProtoName) // random name
.addField(
DescriptorProtos.FieldDescriptorProto.newBuilder()
.setName(TIMESTAMP_HIGHER_PRECISION_COLUMN_NAME)
.setNumber(3)
.setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_MESSAGE)
.setTypeName(timestampPicosDescriptor.getName())
.build())
.addNestedType(timestampPicosDescriptor)
.build();
ProtoSchema protoSchema =
ProtoSchema.newBuilder().setProtoDescriptor(wrapperDescriptor).build();

TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
try (StreamWriter streamWriter =
StreamWriter.newBuilder(parent.toString() + "/_default", writeClient)
.setWriterSchema(protoSchema)
.build()) {
DescriptorProtos.FileDescriptorProto fileProto =
DescriptorProtos.FileDescriptorProto.newBuilder()
.setName("test.proto") // dummy proto file
.addMessageType(wrapperDescriptor)
.build();

// Build the runtime descriptor (resolves types and names)
Descriptors.FileDescriptor file =
Descriptors.FileDescriptor.buildFrom(fileProto, new Descriptors.FileDescriptor[] {});

// Get the handle to the "wrapper" message type
Descriptors.Descriptor descriptor = file.findMessageTypeByName(wrapperProtoName);

ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
for (Long[] timestampParts : INPUT_PROTO_DESCRIPTOR_WRITE_TIMESTAMPS) {
Message message =
DynamicMessage.newBuilder(descriptor)
.setField(
descriptor.findFieldByName(TIMESTAMP_HIGHER_PRECISION_COLUMN_NAME),
DynamicMessage.newBuilder(
descriptor.findNestedTypeByName(timestampPicosProtoName))
.setField(
descriptor
.findNestedTypeByName(timestampPicosProtoName)
.findFieldByName(secondsProtoName),
timestampParts[0])
.setField(
descriptor
.findNestedTypeByName(timestampPicosProtoName)
.findFieldByName(picosProtoName),
timestampParts[1])
.build())
.build();
rowsBuilder.addSerializedRows(message.toByteString());
}
ApiFuture<AppendRowsResponse> future = streamWriter.append(rowsBuilder.build());
ApiFutures.addCallback(
future, new Helper.AppendCompleteCallback(), MoreExecutors.directExecutor());
}
String table =
BigQueryResource.formatTableResource(
ServiceOptions.getDefaultProjectId(), DATASET, tableName);

// Read all the data as Avro GenericRecords
List<GenericData.Record> rows = Helper.readAllRows(readClient, parentProjectId, table, null);
List<String> timestampHigherPrecision =
rows.stream()
.map(x -> x.get(TIMESTAMP_HIGHER_PRECISION_COLUMN_NAME).toString())
.collect(Collectors.toList());
assertEquals(
EXPECTED_PROTO_DESCRIPTOR_WRITE_TIMESTAMPS_HIGH_PRECISION_ISO_OUTPUT.length,
timestampHigherPrecision.size());
for (int i = 0;
i < EXPECTED_PROTO_DESCRIPTOR_WRITE_TIMESTAMPS_HIGH_PRECISION_ISO_OUTPUT.length;
i++) {
assertEquals(
EXPECTED_PROTO_DESCRIPTOR_WRITE_TIMESTAMPS_HIGH_PRECISION_ISO_OUTPUT[i],
timestampHigherPrecision.get(i));
}
}

private void createTimestampTable(String tableName) {
Schema bqTableSchema =
Schema.of(
Expand Down
Loading