Skip to content

Commit 76794d4

Browse files
feat: upgrade Pub/Sub Lite version (#187)
* feat: upgrade Pub/Sub Lite version * feat: upgrade Pub/Sub Lite version
1 parent e142808 commit 76794d4

File tree

6 files changed

+23
-28
lines changed

6 files changed

+23
-28
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<groupId>com.google.cloud</groupId>
77
<artifactId>google-cloud-pubsublite-parent</artifactId>
8-
<version>1.9.4</version>
8+
<version>1.11.1</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111
<packaging>jar</packaging>

src/main/java/com/google/cloud/pubsublite/flink/internal/sink/MessagePublisher.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import com.google.api.core.ApiFuture;
2121
import com.google.api.core.ApiFutures;
22-
import com.google.cloud.pubsublite.Message;
2322
import com.google.cloud.pubsublite.MessageMetadata;
2423
import com.google.cloud.pubsublite.internal.ExtractStatus;
2524
import com.google.cloud.pubsublite.internal.Publisher;
@@ -62,7 +61,7 @@ public void publish(PubSubMessage message) throws InterruptedException {
6261
"Publisher flow controlled due to too many bytes (>{}) outstanding", maxBytesOutstanding);
6362
bytesOutstanding.acquire(size);
6463
}
65-
ApiFuture<MessageMetadata> future = publisher.publish(Message.fromProto(message));
64+
ApiFuture<MessageMetadata> future = publisher.publish(message);
6665
future.addListener(() -> bytesOutstanding.release(size), directExecutor());
6766
publishes.add(future);
6867
}

src/main/java/com/google/cloud/pubsublite/flink/internal/source/reader/MessageSplitReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ private Multimap<String, SequencedMessage> getMessages() throws CheckedApiExcept
5959
for (Map.Entry<String, BlockingPullSubscriber> entry : subscribers.entrySet()) {
6060
String splitId = entry.getKey();
6161
BlockingPullSubscriber sub = entry.getValue();
62-
sub.messageIfAvailable().ifPresent(m -> messages.put(splitId, m.toProto()));
62+
sub.messageIfAvailable().ifPresent(m -> messages.put(splitId, m));
6363
}
6464
return messages.build();
6565
}

src/test/java/com/google/cloud/pubsublite/flink/ITSourceAndSinkTest.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import com.google.api.core.ApiFutures;
2323
import com.google.cloud.pubsublite.AdminClient;
2424
import com.google.cloud.pubsublite.CloudZone;
25-
import com.google.cloud.pubsublite.Message;
2625
import com.google.cloud.pubsublite.MessageMetadata;
2726
import com.google.cloud.pubsublite.ProjectId;
2827
import com.google.cloud.pubsublite.SubscriptionName;
@@ -33,6 +32,7 @@
3332
import com.google.cloud.pubsublite.flink.internal.sink.PerServerPublisherCache;
3433
import com.google.cloud.pubsublite.flink.internal.source.SourceAssembler;
3534
import com.google.cloud.pubsublite.internal.Publisher;
35+
import com.google.cloud.pubsublite.proto.PubSubMessage;
3636
import com.google.cloud.pubsublite.proto.Subscription;
3737
import com.google.cloud.pubsublite.proto.Subscription.DeliveryConfig;
3838
import com.google.cloud.pubsublite.proto.Subscription.DeliveryConfig.DeliveryRequirement;
@@ -106,15 +106,17 @@ private PubsubLiteSinkSettings<String> sinkSettings() {
106106
}
107107

108108
private Publisher<MessageMetadata> getPublisher() {
109-
return PerServerPublisherCache.getOrCreate(sinkSettings());
109+
Publisher<MessageMetadata> publisher = PerServerPublisherCache.getOrCreate(sinkSettings());
110+
publisher.awaitRunning();
111+
return publisher;
110112
}
111113

112114
private AdminClient getAdminClient() {
113115
return new SourceAssembler<>(sourceSettings()).getUnownedAdminClient();
114116
}
115117

116-
private static Message messageFromString(String i) {
117-
return Message.builder().setData(ByteString.copyFrom(SCHEMA.serialize(i))).build();
118+
private static PubSubMessage messageFromString(String i) {
119+
return PubSubMessage.newBuilder().setData(ByteString.copyFrom(SCHEMA.serialize(i))).build();
118120
}
119121

120122
@Before
@@ -265,8 +267,8 @@ public void testSinkWithFailure() throws Exception {
265267
Publisher<MessageMetadata> publisher = spy(PerServerPublisherCache.getOrCreate(sinkSettings()));
266268
Mockito.doAnswer(
267269
inv -> {
268-
Message m = inv.getArgument(0);
269-
if (m.data().toStringUtf8().equals(INTEGER_STRINGS.get(37))
270+
PubSubMessage m = inv.getArgument(0);
271+
if (m.getData().toStringUtf8().equals(INTEGER_STRINGS.get(37))
270272
&& staticSet.add("publishFailOnce")) {
271273
return ApiFutures.immediateFailedFuture(new RuntimeException("failure"));
272274
}

src/test/java/com/google/cloud/pubsublite/flink/internal/sink/MessagePublisherTest.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import com.google.api.core.SettableApiFuture;
2727
import com.google.api.gax.rpc.ApiException;
2828
import com.google.api.gax.rpc.StatusCode.Code;
29-
import com.google.cloud.pubsublite.Message;
3029
import com.google.cloud.pubsublite.MessageMetadata;
3130
import com.google.cloud.pubsublite.internal.CheckedApiException;
3231
import com.google.cloud.pubsublite.internal.Publisher;
@@ -58,25 +57,25 @@ public void setUp() {
5857
@Test
5958
public void testPublish() throws Exception {
6059
PubSubMessage message1 = PubSubMessage.newBuilder().build();
61-
when(fakeInnerPublisher.publish(Message.fromProto(message1)))
60+
when(fakeInnerPublisher.publish(message1))
6261
.thenReturn(
6362
ApiFutures.immediateFuture(MessageMetadata.of(examplePartition(), exampleOffset())));
6463

6564
messagePublisher.publish(message1);
6665

6766
messagePublisher.flush();
6867

69-
verify(fakeInnerPublisher).publish(Message.fromProto(message1));
68+
verify(fakeInnerPublisher).publish(message1);
7069
}
7170

7271
@Test
7372
public void testSinglePublishFailure() throws Exception {
7473
PubSubMessage message1 = PubSubMessage.newBuilder().build();
75-
when(fakeInnerPublisher.publish(Message.fromProto(message1)))
74+
when(fakeInnerPublisher.publish(message1))
7675
.thenReturn(
7776
ApiFutures.immediateFailedFuture(new CheckedApiException(Code.INTERNAL).underlying));
7877
messagePublisher.publish(message1);
79-
verify(fakeInnerPublisher).publish(Message.fromProto(message1));
78+
verify(fakeInnerPublisher).publish(message1);
8079

8180
assertThrows(ApiException.class, () -> messagePublisher.flush());
8281
}
@@ -85,9 +84,9 @@ public void testSinglePublishFailure() throws Exception {
8584
public void testCheckpointWithOutstandingPublish() throws Exception {
8685
PubSubMessage message1 = PubSubMessage.newBuilder().build();
8786
SettableApiFuture<MessageMetadata> future = SettableApiFuture.create();
88-
when(fakeInnerPublisher.publish(Message.fromProto(message1))).thenReturn(future);
87+
when(fakeInnerPublisher.publish(message1)).thenReturn(future);
8988
messagePublisher.publish(message1);
90-
verify(fakeInnerPublisher).publish(Message.fromProto(message1));
89+
verify(fakeInnerPublisher).publish(message1);
9190

9291
Future<?> checkpointFuture =
9392
Executors.newSingleThreadExecutor()
@@ -113,12 +112,12 @@ public void testPublishesOfMaximumSizeSerialized() throws Exception {
113112
PubSubMessage message2 =
114113
PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("two")).build();
115114
SettableApiFuture<MessageMetadata> firstPublish = SettableApiFuture.create();
116-
when(fakeInnerPublisher.publish(Message.fromProto(message1))).thenReturn(firstPublish);
117-
when(fakeInnerPublisher.publish(Message.fromProto(message2)))
115+
when(fakeInnerPublisher.publish(message1)).thenReturn(firstPublish);
116+
when(fakeInnerPublisher.publish(message2))
118117
.thenReturn(
119118
ApiFutures.immediateFuture(MessageMetadata.of(examplePartition(), exampleOffset())));
120119
messagePublisher.publish(message1);
121-
verify(fakeInnerPublisher).publish(Message.fromProto(message1));
120+
verify(fakeInnerPublisher).publish(message1);
122121

123122
Future<?> secondPublish =
124123
Executors.newSingleThreadExecutor()

src/test/java/com/google/cloud/pubsublite/flink/internal/source/reader/FakeSubscriber.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,11 @@
1515
*/
1616
package com.google.cloud.pubsublite.flink.internal.source.reader;
1717

18-
import static java.util.stream.Collectors.toList;
19-
2018
import com.google.api.core.ApiFuture;
2119
import com.google.api.core.ApiFutures;
2220
import com.google.api.core.SettableApiFuture;
23-
import com.google.cloud.pubsublite.SequencedMessage;
2421
import com.google.cloud.pubsublite.internal.BlockingPullSubscriber;
22+
import com.google.cloud.pubsublite.proto.SequencedMessage;
2523
import java.util.ArrayDeque;
2624
import java.util.Collection;
2725
import java.util.Optional;
@@ -30,11 +28,8 @@
3028
public class FakeSubscriber implements BlockingPullSubscriber {
3129
private final Queue<Optional<SequencedMessage>> messages;
3230

33-
public FakeSubscriber(
34-
Collection<Optional<com.google.cloud.pubsublite.proto.SequencedMessage>> messages) {
35-
this.messages =
36-
new ArrayDeque<>(
37-
messages.stream().map(x -> x.map(SequencedMessage::fromProto)).collect(toList()));
31+
public FakeSubscriber(Collection<Optional<SequencedMessage>> messages) {
32+
this.messages = new ArrayDeque<>(messages);
3833
}
3934

4035
@Override

0 commit comments

Comments
 (0)