Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite-parent</artifactId>
<version>1.9.4</version>
<version>1.15.10</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>pubsublite-spark-sql-streaming</artifactId>
Expand Down Expand Up @@ -144,6 +144,15 @@
<groupId>com.google.api</groupId>
<artifactId>gax</artifactId>
</dependency>
<dependency>
<groupId>com.google.api</groupId>
<artifactId>gax-grpc</artifactId>
</dependency>
<dependency>
<groupId>com.google.flogger</groupId>
<artifactId>flogger</artifactId>
<version>0.8</version>
</dependency>
<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value-annotations</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion samples/snapshot/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite</artifactId>
<version>1.9.4</version>
<version>1.15.10</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
Expand Down
2 changes: 1 addition & 1 deletion samples/snippets/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsublite</artifactId>
<version>1.9.4</version>
<version>1.15.10</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ public boolean next() {
subscriber.onData().get();
// since next() will not be called concurrently, we are sure that the message
// is available to this thread.
Optional<SequencedMessage> msg = subscriber.messageIfAvailable();
checkState(msg.isPresent());
currentMsg = msg.get();
Optional<com.google.cloud.pubsublite.proto.SequencedMessage> proto_msg =
subscriber.messageIfAvailable();
checkState(proto_msg.isPresent());
currentMsg = SequencedMessage.fromProto(proto_msg.get());
currentOffset =
SparkPartitionOffset.builder()
.partition(currentOffset.partition())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ public boolean next() {
if (batchFulfilled) {
return false;
}
Optional<SequencedMessage> msg;
Optional<com.google.cloud.pubsublite.proto.SequencedMessage> proto_msg;
while (true) {
try {
subscriber.onData().get(SUBSCRIBER_PULL_TIMEOUT.getSeconds(), TimeUnit.SECONDS);
msg = subscriber.messageIfAvailable();
proto_msg = subscriber.messageIfAvailable();
break;
} catch (TimeoutException e) {
log.atWarning().log(
Expand All @@ -76,8 +76,8 @@ public boolean next() {
}
// since next() is only called on one thread at a time, we are sure that the message is
// available to this thread.
checkState(msg.isPresent());
currentMsg = msg.get();
checkState(proto_msg.isPresent());
currentMsg = SequencedMessage.fromProto(proto_msg.get());
if (currentMsg.offset().value() == endOffset.offset()) {
// this is the last msg for the batch.
batchFulfilled = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ListMultimap;
Expand Down Expand Up @@ -116,7 +117,7 @@ private static <T> void extractVal(
}

@SuppressWarnings("CheckReturnValue")
public static Message toPubSubMessage(StructType inputSchema, InternalRow row) {
public static PubSubMessage toPubSubMessage(StructType inputSchema, InternalRow row) {
Message.Builder builder = Message.builder();
extractVal(
inputSchema,
Expand Down Expand Up @@ -159,7 +160,7 @@ public static Message toPubSubMessage(StructType inputSchema, InternalRow row) {
}));
builder.setAttributes(attributeMapBuilder.build());
});
return builder.build();
return builder.build().toProto();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import java.io.Serializable;
import java.util.List;
import java.util.function.Consumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,15 @@ public void testPartitionReader() throws Exception {

// Multiple get w/o next will return same msg.
when(subscriber.onData()).thenReturn(ApiFutures.immediateFuture(null));
when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message1));
when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message1.toProto()));
assertThat(reader.next()).isTrue();
verifyInternalRow(reader.get(), 10L);
verifyInternalRow(reader.get(), 10L);
assertThat(((SparkPartitionOffset) reader.getOffset()).offset()).isEqualTo(10L);

// Next will advance to next msg.
when(subscriber.onData()).thenReturn(ApiFutures.immediateFuture(null));
when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message2));
when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message2.toProto()));
assertThat(reader.next()).isTrue();
verifyInternalRow(reader.get(), 13L);
assertThat(((SparkPartitionOffset) reader.getOffset()).offset()).isEqualTo(13L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ public void testPartitionReader() throws Exception {

// Multiple get w/o next will return same msg.
when(subscriber.onData()).thenReturn(ApiFutures.immediateFuture(null));
when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message1));
when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message1.toProto()));
assertThat(reader.next()).isTrue();
verifyInternalRow(reader.get(), 10L);
verifyInternalRow(reader.get(), 10L);

// Next will advance to next msg which is also the last msg in the batch.
when(subscriber.onData()).thenReturn(ApiFutures.immediateFuture(null));
when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message2));
when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message2.toProto()));
assertThat(reader.next()).isTrue();
verifyInternalRow(reader.get(), 14L);

Expand All @@ -96,14 +96,14 @@ public void testPartitionReaderNewMessageExceedsRange() throws Exception {

// Multiple get w/o next will return same msg.
when(subscriber.onData()).thenReturn(ApiFutures.immediateFuture(null));
when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message1));
when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message1.toProto()));
assertThat(reader.next()).isTrue();
verifyInternalRow(reader.get(), 10L);
verifyInternalRow(reader.get(), 10L);

// Next will advance to next msg, and recognize it's out of the batch range.
when(subscriber.onData()).thenReturn(ApiFutures.immediateFuture(null));
when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message2));
when(subscriber.messageIfAvailable()).thenReturn(Optional.of(message2.toProto()));
assertThat(reader.next()).isFalse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.internal.testing.UnitTestExamples;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
Expand Down Expand Up @@ -151,12 +152,13 @@ public void testToPubSubMessage() {
new StructField("random_extra_field", DataTypes.BinaryType, false, Metadata.empty())
});

assertThat(message).isEqualTo(PslSparkUtils.toPubSubMessage(structType, row));
assertThat(message.toProto()).isEqualTo(PslSparkUtils.toPubSubMessage(structType, row));
}

@Test
public void testToPubSubMessageLongForEventTimestamp() {
Message expectedMsg = Message.builder().setEventTime(Timestamps.fromMicros(100000L)).build();
PubSubMessage expectedMsg =
Message.builder().setEventTime(Timestamps.fromMicros(100000L)).build().toProto();

StructType structType =
new StructType(
Expand All @@ -166,7 +168,7 @@ public void testToPubSubMessageLongForEventTimestamp() {
List<Object> list = Collections.singletonList(100000L);
InternalRow row = InternalRow.apply(asScalaBufferConverter(list).asScala());

Message message = PslSparkUtils.toPubSubMessage(structType, row);
PubSubMessage message = PslSparkUtils.toPubSubMessage(structType, row);
assertThat(message).isEqualTo(expectedMsg);
}

Expand Down