Skip to content

Commit 4d3f41d

Browse files
feat: Add the stop condition to allow users to stop based on offsets (#59)
* stop condition * fix test * update pom * address comments * revert version change * comments
1 parent 045450a commit 4d3f41d

File tree

10 files changed

+466
-80
lines changed

10 files changed

+466
-80
lines changed

src/main/java/com/google/cloud/pubsublite/flink/PubsubLiteSinkSettings.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ PublisherOptions getPublisherConfig() {
5151
}
5252

5353
@AutoValue.Builder
54-
abstract static class Builder<InputT> {
54+
public abstract static class Builder<InputT> {
5555
// Required.
5656
public abstract Builder<InputT> setTopicPath(TopicPath value);
5757

src/main/java/com/google/cloud/pubsublite/flink/PubsubLiteSourceSettings.java

Lines changed: 70 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,11 @@
3131
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
3232
import com.google.cloud.pubsublite.flink.internal.reader.*;
3333
import com.google.cloud.pubsublite.flink.internal.split.SubscriptionPartitionSplit;
34-
import com.google.cloud.pubsublite.internal.BlockingPullSubscriber;
3534
import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl;
3635
import com.google.cloud.pubsublite.internal.CursorClient;
3736
import com.google.cloud.pubsublite.internal.CursorClientSettings;
37+
import com.google.cloud.pubsublite.internal.TopicStatsClient;
38+
import com.google.cloud.pubsublite.internal.TopicStatsClientSettings;
3839
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
3940
import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
4041
import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder;
@@ -65,7 +66,7 @@ public static <OutputT> Builder<OutputT> builder(
6566
.setDeserializationSchema(schema)
6667
.setBoundedness(Boundedness.CONTINUOUS_UNBOUNDED)
6768
.setTimestampSelector(MessageTimestampExtractor.publishTimeExtractor())
68-
.setPartitionFinishedCondition(PartitionFinishedCondition.continueIndefinitely());
69+
.setStopCondition(StopCondition.continueIndefinitely());
6970
}
7071

7172
public static Builder<SequencedMessage> messagesBuilder() {
@@ -85,7 +86,7 @@ public static Builder<SequencedMessage> messagesBuilder() {
8586
public abstract MessageTimestampExtractor timestampSelector();
8687

8788
// Optional
88-
public abstract PartitionFinishedCondition.Factory partitionFinishedCondition();
89+
public abstract StopCondition stopCondition();
8990

9091
// Internal
9192
abstract PubsubLiteDeserializationSchema<OutputT> deserializationSchema();
@@ -94,6 +95,38 @@ public static Builder<SequencedMessage> messagesBuilder() {
9495

9596
abstract @Nullable SerializableSupplier<CursorClient> cursorClientSupplier();
9697

98+
abstract @Nullable SerializableSupplier<TopicStatsClient> topicStatsClientSupplier();
99+
100+
AdminClient getAdminClient() {
101+
if (adminClientSupplier() != null) {
102+
return adminClientSupplier().get();
103+
}
104+
return AdminClient.create(
105+
AdminClientSettings.newBuilder()
106+
.setRegion(subscriptionPath().location().extractRegion())
107+
.build());
108+
}
109+
110+
TopicStatsClient getTopicStatsClient() {
111+
if (topicStatsClientSupplier() != null) {
112+
return topicStatsClientSupplier().get();
113+
}
114+
return TopicStatsClient.create(
115+
TopicStatsClientSettings.newBuilder()
116+
.setRegion(subscriptionPath().location().extractRegion())
117+
.build());
118+
}
119+
120+
CursorClient getCursorClient() {
121+
if (cursorClientSupplier() != null) {
122+
return cursorClientSupplier().get();
123+
}
124+
return CursorClient.create(
125+
CursorClientSettings.newBuilder()
126+
.setRegion(subscriptionPath().location().extractRegion())
127+
.build());
128+
}
129+
97130
private static SubscriberServiceClient newSubscriberServiceClient(
98131
SubscriptionPath path, Partition partition) throws ApiException {
99132
try {
@@ -110,51 +143,29 @@ private static SubscriberServiceClient newSubscriberServiceClient(
110143
}
111144
}
112145

113-
private static SubscriberFactory getSubscriberFactory(
114-
SubscriptionPath path, Partition partition, SeekRequest seek) {
146+
private static SubscriberFactory getSubscriberFactory(SubscriptionPartitionSplit split) {
115147
return (consumer) ->
116148
SubscriberBuilder.newBuilder()
117-
.setSubscriptionPath(path)
118-
.setPartition(partition)
119-
.setServiceClient(newSubscriberServiceClient(path, partition))
149+
.setSubscriptionPath(split.subscriptionPath())
150+
.setPartition(split.partition())
151+
.setServiceClient(
152+
newSubscriberServiceClient(split.subscriptionPath(), split.partition()))
120153
.setMessageConsumer(consumer)
121-
.setInitialLocation(seek)
154+
.setInitialLocation(
155+
SeekRequest.newBuilder()
156+
.setCursor(Cursor.newBuilder().setOffset(split.start().value()).build())
157+
.build())
122158
.build();
123159
}
124160

125-
AdminClient getAdminClient() {
126-
if (adminClientSupplier() != null) {
127-
return adminClientSupplier().get();
128-
}
129-
return AdminClient.create(
130-
AdminClientSettings.newBuilder()
131-
.setRegion(subscriptionPath().location().extractRegion())
132-
.build());
133-
}
134-
135-
CursorClient getCursorClient() {
136-
if (cursorClientSupplier() != null) {
137-
return cursorClientSupplier().get();
138-
}
139-
return CursorClient.create(
140-
CursorClientSettings.newBuilder()
141-
.setRegion(subscriptionPath().location().extractRegion())
142-
.build());
143-
}
144-
145161
CompletablePullSubscriber.Factory getSplitStateFactory() {
146-
return split -> {
147-
SeekRequest seek =
148-
SeekRequest.newBuilder()
149-
.setCursor(Cursor.newBuilder().setOffset(split.start().value()).build())
150-
.build();
151-
SubscriberFactory factory =
152-
getSubscriberFactory(split.subscriptionPath(), split.partition(), seek);
153-
154-
BlockingPullSubscriber b = new BlockingPullSubscriberImpl(factory, flowControlSettings());
155-
return new CompletablePullSubscriberImpl(
156-
b, partitionFinishedCondition().New(split.subscriptionPath(), split.partition()));
157-
};
162+
PartitionFinishedCondition.Factory conditionFactory = stopCondition().toFinishCondition();
163+
return new ConditionallyCompleteSubscriberFactory(
164+
(split) ->
165+
new CompletablePullSubscriberImpl(
166+
new BlockingPullSubscriberImpl(getSubscriberFactory(split), flowControlSettings()),
167+
conditionFactory.New(split.subscriptionPath(), split.partition())),
168+
conditionFactory);
158169
}
159170

160171
Supplier<SplitReader<Record<OutputT>, SubscriptionPartitionSplit>> getSplitReaderSupplier() {
@@ -184,7 +195,7 @@ public void onSuccess(Void unused) {}
184195
}
185196

186197
@AutoValue.Builder
187-
abstract static class Builder<OutputT> {
198+
public abstract static class Builder<OutputT> {
188199
// Required
189200
public abstract Builder<OutputT> setSubscriptionPath(SubscriptionPath path);
190201

@@ -198,8 +209,7 @@ abstract static class Builder<OutputT> {
198209
public abstract Builder<OutputT> setTimestampSelector(MessageTimestampExtractor value);
199210

200211
// Optional
201-
public abstract Builder<OutputT> setPartitionFinishedCondition(
202-
PartitionFinishedCondition.Factory value);
212+
public abstract Builder<OutputT> setStopCondition(StopCondition value);
203213

204214
abstract Builder<OutputT> setDeserializationSchema(
205215
PubsubLiteDeserializationSchema<OutputT> schema);
@@ -208,6 +218,21 @@ abstract Builder<OutputT> setDeserializationSchema(
208218

209219
abstract Builder<OutputT> setCursorClientSupplier(SerializableSupplier<CursorClient> value);
210220

211-
abstract PubsubLiteSourceSettings<OutputT> build();
221+
abstract Builder<OutputT> setTopicStatsClientSupplier(
222+
SerializableSupplier<TopicStatsClient> value);
223+
224+
public abstract PubsubLiteSourceSettings<OutputT> autoBuild();
225+
226+
public PubsubLiteSourceSettings<OutputT> build() {
227+
PubsubLiteSourceSettings<OutputT> settings = autoBuild();
228+
setStopCondition(
229+
settings
230+
.stopCondition()
231+
.canonicalize(
232+
settings.subscriptionPath(),
233+
settings::getAdminClient,
234+
settings::getTopicStatsClient));
235+
return autoBuild();
236+
}
212237
}
213238
}
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Copyright 2021 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.pubsublite.flink;
17+
18+
import com.google.api.core.ApiFutures;
19+
import com.google.cloud.pubsublite.AdminClient;
20+
import com.google.cloud.pubsublite.Offset;
21+
import com.google.cloud.pubsublite.Partition;
22+
import com.google.cloud.pubsublite.SubscriptionPath;
23+
import com.google.cloud.pubsublite.TopicPath;
24+
import com.google.cloud.pubsublite.flink.internal.reader.PartitionFinishedCondition;
25+
import com.google.cloud.pubsublite.flink.internal.reader.PartitionFinishedCondition.Factory;
26+
import com.google.cloud.pubsublite.flink.internal.reader.PartitionFinishedCondition.Result;
27+
import com.google.cloud.pubsublite.internal.ExtractStatus;
28+
import com.google.cloud.pubsublite.internal.TopicStatsClient;
29+
import com.google.cloud.pubsublite.proto.Cursor;
30+
import com.google.common.collect.ImmutableMap;
31+
import java.io.Serializable;
32+
import java.util.List;
33+
import java.util.Map;
34+
import java.util.function.Supplier;
35+
import java.util.stream.Collectors;
36+
import java.util.stream.LongStream;
37+
38+
public abstract class StopCondition implements Serializable {
39+
// Called to convert conditions with placeholder values like "HEAD" to an offset based condition.
40+
StopCondition canonicalize(
41+
SubscriptionPath path,
42+
Supplier<AdminClient> adminClient,
43+
Supplier<TopicStatsClient> topicStatsClient) {
44+
return this;
45+
}
46+
47+
abstract PartitionFinishedCondition.Factory toFinishCondition();
48+
49+
private static class ContinueIndefinitely extends StopCondition {
50+
@Override
51+
public Factory toFinishCondition() {
52+
return (subscription, partition) -> offset -> Result.CONTINUE;
53+
}
54+
}
55+
56+
private static class ReadToOffsets extends StopCondition {
57+
private final Map<Partition, Offset> offsets;
58+
59+
private ReadToOffsets(Map<Partition, Offset> offsets) {
60+
this.offsets = offsets;
61+
}
62+
63+
@Override
64+
public Factory toFinishCondition() {
65+
return (subscription, partition) ->
66+
offset -> {
67+
Offset stopOffset = offsets.getOrDefault(partition, Offset.of(0L));
68+
if (offset.value() >= stopOffset.value()) return Result.FINISH_BEFORE;
69+
if (offset.value() == stopOffset.value() - 1) return Result.FINISH_AFTER;
70+
return Result.CONTINUE;
71+
};
72+
}
73+
}
74+
75+
private static class ReadToHead extends StopCondition {
76+
private static Map<Partition, Offset> getHeadOffsets(
77+
SubscriptionPath path, AdminClient admin, TopicStatsClient topicStats) {
78+
long partitionCount;
79+
List<Cursor> cursors;
80+
try {
81+
TopicPath topicPath = TopicPath.parse(admin.getSubscription(path).get().getTopic());
82+
partitionCount = admin.getTopicPartitionCount(topicPath).get();
83+
cursors =
84+
ApiFutures.allAsList(
85+
LongStream.range(0, partitionCount)
86+
.mapToObj(
87+
partition ->
88+
topicStats.computeHeadCursor(topicPath, Partition.of(partition)))
89+
.collect(Collectors.toList()))
90+
.get();
91+
} catch (Throwable t) {
92+
throw ExtractStatus.toCanonical(t).underlying;
93+
}
94+
ImmutableMap.Builder<Partition, Offset> offsets = ImmutableMap.builder();
95+
for (int i = 0; i < partitionCount; i++) {
96+
offsets.put(Partition.of(i), Offset.of(cursors.get(i).getOffset()));
97+
}
98+
return offsets.build();
99+
}
100+
101+
@Override
102+
public StopCondition canonicalize(
103+
SubscriptionPath path,
104+
Supplier<AdminClient> adminClient,
105+
Supplier<TopicStatsClient> topicStatsClient) {
106+
return readToOffsets(getHeadOffsets(path, adminClient.get(), topicStatsClient.get()));
107+
}
108+
109+
@Override
110+
public Factory toFinishCondition() {
111+
throw new IllegalStateException("Cannot translate to stop condition before canonicalizing");
112+
}
113+
}
114+
115+
// The flink source will continue reading messages indefinitely.
116+
public static StopCondition continueIndefinitely() {
117+
return new ContinueIndefinitely();
118+
}
119+
120+
// The flink source will read to the specified offset for each partition. If no offset is supplied
121+
// for a partition, the source will not read any messages from that partition.
122+
public static StopCondition readToOffsets(Map<Partition, Offset> offsets) {
123+
return new ReadToOffsets(offsets);
124+
}
125+
126+
// Read to the head offset for every partition. Head is evaluated when the source settings are
127+
// built.
128+
public static StopCondition readToHead() {
129+
return new ReadToHead();
130+
}
131+
}

src/main/java/com/google/cloud/pubsublite/flink/internal/reader/CompletablePullSubscriberImpl.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
import com.google.api.core.ApiFuture;
1919
import com.google.cloud.pubsublite.SequencedMessage;
20-
import com.google.cloud.pubsublite.flink.PartitionFinishedCondition;
2120
import com.google.cloud.pubsublite.internal.BlockingPullSubscriber;
2221
import com.google.cloud.pubsublite.internal.CheckedApiException;
2322
import com.google.errorprone.annotations.concurrent.GuardedBy;
@@ -49,7 +48,7 @@ public synchronized Optional<SequencedMessage> messageIfAvailable() throws Check
4948
if (!m.isPresent()) {
5049
return Optional.empty();
5150
}
52-
switch (condition.partitionFinished(m.get())) {
51+
switch (condition.partitionFinished(m.get().offset())) {
5352
case CONTINUE:
5453
return m;
5554
case FINISH_AFTER:

0 commit comments

Comments
 (0)