Skip to content

Commit a1d06f0

Browse files
fix: Change commit logic to use streams instead of single RPCs (#177)
* fix: Change commit logic to use streams instead of single RPCs * fix: Change commit logic to use streams instead of single RPCs
1 parent fef6285 commit a1d06f0

File tree

8 files changed

+160
-39
lines changed

8 files changed

+160
-39
lines changed

src/main/java/com/google/cloud/pubsublite/flink/PubsubLiteSource.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.cloud.pubsublite.flink.internal.source.enumerator.SplitDiscovery;
2323
import com.google.cloud.pubsublite.flink.internal.source.enumerator.SplitEnumeratorCheckpointSerializer;
2424
import com.google.cloud.pubsublite.flink.internal.source.enumerator.UniformPartitionAssigner;
25+
import com.google.cloud.pubsublite.flink.internal.source.reader.CommitterCache;
2526
import com.google.cloud.pubsublite.flink.internal.source.reader.PubsubLiteRecordEmitter;
2627
import com.google.cloud.pubsublite.flink.internal.source.reader.PubsubLiteSourceReader;
2728
import com.google.cloud.pubsublite.flink.internal.source.split.SubscriptionPartitionSplit;
@@ -74,10 +75,10 @@ public UserCodeClassLoader getUserCodeClassLoader() {
7475
SourceAssembler<OutputT> assembler = new SourceAssembler<>(settings);
7576
return new PubsubLiteSourceReader<>(
7677
new PubsubLiteRecordEmitter<>(),
77-
assembler.getCursorClientRemoveThis(),
7878
assembler.getSplitReaderSupplier(),
7979
new Configuration(),
80-
readerContext);
80+
readerContext,
81+
new CommitterCache(assembler::getCommitter));
8182
}
8283

8384
@Override
@@ -89,7 +90,7 @@ public SplitEnumerator<SubscriptionPartitionSplit, SplitEnumeratorCheckpoint> cr
8990
UniformPartitionAssigner.create(),
9091
SingleSubscriptionSplitDiscovery.create(
9192
assembler.newAdminClient(),
92-
assembler.getCursorClientRemoveThis(),
93+
assembler.getCursorClient(),
9394
assembler.getTopicPath(),
9495
settings.subscriptionPath()));
9596
}
@@ -106,7 +107,7 @@ public SplitEnumerator<SubscriptionPartitionSplit, SplitEnumeratorCheckpoint> re
106107
checkpoint.getDiscovery(),
107108
assigner.listSplits(),
108109
assembler.newAdminClient(),
109-
assembler.getCursorClientRemoveThis());
110+
assembler.getCursorClient());
110111
return new PubsubLiteSplitEnumerator(enumContext, assigner, discovery);
111112
}
112113

src/main/java/com/google/cloud/pubsublite/flink/internal/source/SourceAssembler.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.api.gax.rpc.ApiException;
2525
import com.google.cloud.pubsublite.AdminClient;
2626
import com.google.cloud.pubsublite.AdminClientSettings;
27+
import com.google.cloud.pubsublite.Partition;
2728
import com.google.cloud.pubsublite.SubscriptionPath;
2829
import com.google.cloud.pubsublite.TopicPath;
2930
import com.google.cloud.pubsublite.flink.PubsubLiteSourceSettings;
@@ -35,6 +36,8 @@
3536
import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl;
3637
import com.google.cloud.pubsublite.internal.CursorClient;
3738
import com.google.cloud.pubsublite.internal.CursorClientSettings;
39+
import com.google.cloud.pubsublite.internal.wire.Committer;
40+
import com.google.cloud.pubsublite.internal.wire.CommitterSettings;
3841
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
3942
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
4043
import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
@@ -135,16 +138,26 @@ private CursorServiceClient newCursorClient() throws ApiException {
135138
}
136139
}
137140

138-
private CursorServiceClient getCursorClient() {
141+
private CursorServiceClient getCursorServiceClient() {
139142
return CURSOR_CLIENTS.computeIfAbsent(settings.subscriptionPath(), path -> newCursorClient());
140143
}
141144

142-
/** TODO(dpcollins): Remove this */
143-
public CursorClient getCursorClientRemoveThis() {
145+
public Committer getCommitter(Partition partition) {
146+
CursorServiceClient client = getCursorServiceClient();
147+
return CommitterSettings.newBuilder()
148+
.setSubscriptionPath(settings.subscriptionPath())
149+
.setPartition(partition)
150+
.setStreamFactory(
151+
responseObserver -> client.streamingCommitCursorCallable().splitCall(responseObserver))
152+
.build()
153+
.instantiate();
154+
}
155+
156+
public CursorClient getCursorClient() {
144157
return CursorClient.create(
145158
CursorClientSettings.newBuilder()
146159
.setRegion(settings.subscriptionPath().location().extractRegion())
147-
.setServiceClient(getCursorClient())
160+
.setServiceClient(getCursorServiceClient())
148161
.build());
149162
}
150163

src/main/java/com/google/cloud/pubsublite/flink/internal/source/reader/CheckpointCursorCommitter.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import com.google.api.core.ApiFutureCallback;
1919
import com.google.api.core.ApiFutures;
2020
import com.google.cloud.pubsublite.flink.internal.source.split.SubscriptionPartitionSplit;
21-
import com.google.cloud.pubsublite.internal.CursorClient;
2221
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
2322
import com.google.common.collect.ImmutableList;
2423
import com.google.errorprone.annotations.concurrent.GuardedBy;
@@ -42,10 +41,10 @@ public class CheckpointCursorCommitter implements AutoCloseable {
4241
private final LinkedHashMap<Long, List<SubscriptionPartitionSplit>> checkpoints =
4342
new LinkedHashMap<>();
4443

45-
private final CursorClient cursorCommitter;
44+
private final CommitterFactory factory;
4645

47-
public CheckpointCursorCommitter(CursorClient cursorCommitter) {
48-
this.cursorCommitter = cursorCommitter;
46+
public CheckpointCursorCommitter(CommitterFactory factory) {
47+
this.factory = factory;
4948
}
5049

5150
public synchronized void addCheckpoint(
@@ -60,7 +59,7 @@ public synchronized void addCheckpoint(
6059

6160
private void commitCursor(SubscriptionPartitionSplit split) {
6261
ApiFutures.addCallback(
63-
cursorCommitter.commitCursor(split.subscriptionPath(), split.partition(), split.start()),
62+
factory.getCommitter(split.partition()).commitOffset(split.start()),
6463
new ApiFutureCallback<Void>() {
6564
@Override
6665
public void onFailure(Throwable throwable) {
@@ -98,6 +97,6 @@ public synchronized void notifySplitFinished(Collection<SubscriptionPartitionSpl
9897

9998
@Override
10099
public void close() {
101-
cursorCommitter.close();
100+
factory.close();
102101
}
103102
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsublite.flink.internal.source.reader;
18+
19+
import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.blockingShutdown;
20+
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
21+
22+
import com.google.api.core.ApiService.Listener;
23+
import com.google.api.core.ApiService.State;
24+
import com.google.cloud.pubsublite.Partition;
25+
import com.google.cloud.pubsublite.internal.wire.Committer;
26+
import com.google.errorprone.annotations.concurrent.GuardedBy;
27+
import java.util.HashMap;
28+
import java.util.function.Function;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
public class CommitterCache implements CommitterFactory {
33+
private static final Logger LOG = LoggerFactory.getLogger(CommitterCache.class);
34+
private final Function<Partition, Committer> underlying;
35+
36+
@GuardedBy("this")
37+
private final HashMap<Partition, Committer> partitionCommitters = new HashMap<>();
38+
39+
public CommitterCache(Function<Partition, Committer> underlying) {
40+
this.underlying = underlying;
41+
}
42+
43+
@Override
44+
public synchronized Committer getCommitter(Partition partition) {
45+
return partitionCommitters.computeIfAbsent(
46+
partition,
47+
p -> {
48+
Committer newCommitter = underlying.apply(p);
49+
newCommitter.addListener(
50+
new Listener() {
51+
@Override
52+
public void failed(State from, Throwable failure) {
53+
LOG.info("Committer failed.", failure);
54+
remove(p);
55+
}
56+
57+
@Override
58+
public void terminated(State from) {
59+
remove(p);
60+
}
61+
},
62+
directExecutor());
63+
newCommitter.startAsync().awaitRunning();
64+
return newCommitter;
65+
});
66+
}
67+
68+
@Override
69+
public synchronized void close() {
70+
blockingShutdown(partitionCommitters.values());
71+
}
72+
73+
private synchronized void remove(Partition partition) {
74+
partitionCommitters.remove(partition);
75+
}
76+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsublite.flink.internal.source.reader;
18+
19+
import com.google.cloud.pubsublite.Partition;
20+
import com.google.cloud.pubsublite.internal.wire.Committer;
21+
import java.io.Closeable;
22+
23+
public interface CommitterFactory extends Closeable {
24+
Committer getCommitter(Partition partition);
25+
26+
@Override
27+
void close();
28+
}

src/main/java/com/google/cloud/pubsublite/flink/internal/source/reader/PubsubLiteSourceReader.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
import com.google.cloud.pubsublite.flink.internal.source.split.SubscriptionPartitionSplit;
1919
import com.google.cloud.pubsublite.flink.internal.source.split.SubscriptionPartitionSplitState;
20-
import com.google.cloud.pubsublite.internal.CursorClient;
2120
import java.util.List;
2221
import java.util.Map;
2322
import java.util.function.Supplier;
@@ -34,10 +33,10 @@ public class PubsubLiteSourceReader<T>
3433

3534
public PubsubLiteSourceReader(
3635
RecordEmitter<Record<T>, T, SubscriptionPartitionSplitState> recordEmitter,
37-
CursorClient cursorCommitter,
3836
Supplier<SplitReader<Record<T>, SubscriptionPartitionSplit>> splitReaderSupplier,
3937
Configuration config,
40-
SourceReaderContext context) {
38+
SourceReaderContext context,
39+
CommitterFactory cursorCommitter) {
4140
super(splitReaderSupplier, recordEmitter, config, context);
4241
this.checkpointCursorCommitter = new CheckpointCursorCommitter(cursorCommitter);
4342
}

src/test/java/com/google/cloud/pubsublite/flink/internal/source/reader/CheckpointCursorCommitterTest.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import com.google.cloud.pubsublite.Offset;
2828
import com.google.cloud.pubsublite.Partition;
2929
import com.google.cloud.pubsublite.flink.internal.source.split.SubscriptionPartitionSplit;
30-
import com.google.cloud.pubsublite.internal.CursorClient;
30+
import com.google.cloud.pubsublite.internal.wire.Committer;
3131
import com.google.common.collect.ImmutableList;
3232
import org.junit.Before;
3333
import org.junit.Test;
@@ -37,14 +37,15 @@
3737

3838
@RunWith(MockitoJUnitRunner.class)
3939
public class CheckpointCursorCommitterTest {
40-
@Mock CursorClient mockCursorClient;
40+
@Mock CommitterFactory mockCommitterFactory;
41+
@Mock Committer mockCommitter;
4142
CheckpointCursorCommitter cursorCommitter;
4243

4344
@Before
4445
public void setUp() {
45-
cursorCommitter = new CheckpointCursorCommitter(mockCursorClient);
46-
when(mockCursorClient.commitCursor(any(), any(), any()))
47-
.thenReturn(ApiFutures.immediateFuture(null));
46+
cursorCommitter = new CheckpointCursorCommitter(mockCommitterFactory);
47+
when(mockCommitterFactory.getCommitter(any())).thenReturn(mockCommitter);
48+
when(mockCommitter.commitOffset(any())).thenReturn(ApiFutures.immediateFuture(null));
4849
}
4950

5051
public static SubscriptionPartitionSplit splitFromPartition(Partition partition) {
@@ -57,25 +58,25 @@ public void testFinishedSplits() {
5758
cursorCommitter.notifySplitFinished(ImmutableList.of(split));
5859
cursorCommitter.addCheckpoint(1, ImmutableList.of());
5960
cursorCommitter.notifyCheckpointComplete(1);
60-
verify(mockCursorClient)
61-
.commitCursor(split.subscriptionPath(), split.partition(), split.start());
61+
verify(mockCommitterFactory).getCommitter(split.partition());
62+
verify(mockCommitter).commitOffset(split.start());
6263
}
6364

6465
@Test
6566
public void testCheckpointCommitted() {
6667
SubscriptionPartitionSplit split = splitFromPartition(examplePartition());
6768
cursorCommitter.addCheckpoint(1, ImmutableList.of(split));
6869
cursorCommitter.notifyCheckpointComplete(1);
69-
verify(mockCursorClient)
70-
.commitCursor(split.subscriptionPath(), split.partition(), split.start());
70+
verify(mockCommitterFactory).getCommitter(split.partition());
71+
verify(mockCommitter).commitOffset(split.start());
7172
}
7273

7374
@Test
7475
public void testUnknownCheckpoint() {
7576
SubscriptionPartitionSplit split = splitFromPartition(examplePartition());
7677
cursorCommitter.addCheckpoint(1, ImmutableList.of(split));
7778
cursorCommitter.notifyCheckpointComplete(4);
78-
verifyNoInteractions(mockCursorClient);
79+
verifyNoInteractions(mockCommitter);
7980
}
8081

8182
@Test
@@ -88,15 +89,15 @@ public void testIntermediateCheckpointSkipped() {
8889

8990
// Checkpoint 1 is committed, removing checkpoint 2
9091
cursorCommitter.notifyCheckpointComplete(1);
91-
verify(mockCursorClient)
92-
.commitCursor(split1.subscriptionPath(), split1.partition(), split1.start());
92+
verify(mockCommitterFactory).getCommitter(split1.partition());
93+
verify(mockCommitter).commitOffset(split1.start());
9394
cursorCommitter.notifyCheckpointComplete(2);
94-
verifyNoMoreInteractions(mockCursorClient);
95+
verifyNoMoreInteractions(mockCommitter);
9596
}
9697

9798
@Test
9899
public void testClose() {
99100
cursorCommitter.close();
100-
verify(mockCursorClient).close();
101+
verify(mockCommitterFactory).close();
101102
}
102103
}

src/test/java/com/google/cloud/pubsublite/flink/internal/source/reader/PubsubLiteSourceReaderTest.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import com.google.cloud.pubsublite.flink.PubsubLiteDeserializationSchema;
3131
import com.google.cloud.pubsublite.flink.internal.source.split.SubscriptionPartitionSplit;
3232
import com.google.cloud.pubsublite.internal.BlockingPullSubscriber;
33-
import com.google.cloud.pubsublite.internal.CursorClient;
33+
import com.google.cloud.pubsublite.internal.wire.Committer;
3434
import com.google.cloud.pubsublite.proto.Cursor;
3535
import com.google.cloud.pubsublite.proto.PubSubMessage;
3636
import com.google.common.collect.ImmutableList;
@@ -51,7 +51,9 @@
5151
@RunWith(MockitoJUnitRunner.class)
5252
public class PubsubLiteSourceReaderTest {
5353
@Mock PullSubscriberFactory mockFactory;
54-
@Mock CursorClient mockCursorClient;
54+
@Mock CommitterFactory mockCommitterFactory;
55+
@Mock Committer mockCommitter0;
56+
@Mock Committer mockCommitter1;
5557

5658
@Mock(answer = RETURNS_DEEP_STUBS)
5759
SourceReaderContext mockContext;
@@ -83,16 +85,18 @@ public void setUp() {
8385
reader =
8486
new PubsubLiteSourceReader<>(
8587
new PubsubLiteRecordEmitter<>(),
86-
mockCursorClient,
8788
() ->
8889
new DeserializingSplitReader<>(
8990
new MessageSplitReader(mockFactory),
9091
PubsubLiteDeserializationSchema.dataOnly(new SimpleStringSchema()),
9192
MessageTimestampExtractor.publishTimeExtractor()),
9293
new Configuration(),
93-
mockContext);
94-
when(mockCursorClient.commitCursor(any(), any(), any()))
95-
.thenReturn(ApiFutures.immediateFuture(null));
94+
mockContext,
95+
mockCommitterFactory);
96+
when(mockCommitterFactory.getCommitter(Partition.of(0))).thenReturn(mockCommitter0);
97+
when(mockCommitterFactory.getCommitter(Partition.of(1))).thenReturn(mockCommitter1);
98+
when(mockCommitter0.commitOffset(any())).thenReturn(ApiFutures.immediateFuture(null));
99+
when(mockCommitter1.commitOffset(any())).thenReturn(ApiFutures.immediateFuture(null));
96100
}
97101

98102
@Test(timeout = 1000)
@@ -110,8 +114,8 @@ public void testReader() throws Exception {
110114

111115
reader.snapshotState(1);
112116
reader.notifyCheckpointComplete(1);
113-
verify(mockCursorClient).commitCursor(exampleSubscriptionPath(), Partition.of(0), Offset.of(2));
114-
verify(mockCursorClient).commitCursor(exampleSubscriptionPath(), Partition.of(1), Offset.of(3));
117+
verify(mockCommitter0).commitOffset(Offset.of(2));
118+
verify(mockCommitter1).commitOffset(Offset.of(3));
115119

116120
while (output.getEmittedRecords().size() < 6) {
117121
reader.pollNext(output);

0 commit comments

Comments
 (0)