Skip to content

Commit 5e1bb41

Browse files
feat: Add the pubsub lite source and settings (#17)
* PubsubLiteSource * adjustments * respond to comments * fix warranty * formatting * address comments
1 parent 8a79086 commit 5e1bb41

File tree

8 files changed

+370
-6
lines changed

8 files changed

+370
-6
lines changed

pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,11 @@
9696
<artifactId>flink-connector-base</artifactId>
9797
<version>1.13.0</version>
9898
</dependency>
99+
<dependency>
100+
<groupId>org.apache.flink</groupId>
101+
<artifactId>flink-metrics-core</artifactId>
102+
<version>1.13.0</version>
103+
</dependency>
99104
<dependency>
100105
<groupId>junit</groupId>
101106
<artifactId>junit</artifactId>

src/main/java/com/google/cloud/pubsublite/flink/PubsubLiteDeserializationSchema.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.io.Serializable;
2020
import javax.annotation.Nullable;
2121
import org.apache.flink.api.common.serialization.DeserializationSchema;
22+
import org.apache.flink.api.common.serialization.DeserializationSchema.InitializationContext;
2223
import org.apache.flink.api.common.typeinfo.TypeInformation;
2324

2425
public interface PubsubLiteDeserializationSchema<T> extends Serializable {
@@ -42,6 +43,23 @@ public TypeInformation<T> getProducedType() {
4243
};
4344
}
4445

46+
static PubsubLiteDeserializationSchema<SequencedMessage> sequencedMessageSchema() {
47+
return new PubsubLiteDeserializationSchema<SequencedMessage>() {
48+
@Override
49+
public void open(InitializationContext context) {}
50+
51+
@Override
52+
public SequencedMessage deserialize(com.google.cloud.pubsublite.SequencedMessage message) {
53+
return message;
54+
}
55+
56+
@Override
57+
public TypeInformation<SequencedMessage> getProducedType() {
58+
return TypeInformation.of(SequencedMessage.class);
59+
}
60+
};
61+
}
62+
4563
void open(DeserializationSchema.InitializationContext context) throws Exception;
4664

4765
/**
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Copyright 2021 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.pubsublite.flink;
17+
18+
import com.google.cloud.pubsublite.AdminClient;
19+
import com.google.cloud.pubsublite.TopicPath;
20+
import com.google.cloud.pubsublite.flink.enumerator.PartitionAssigner;
21+
import com.google.cloud.pubsublite.flink.enumerator.PubsubLiteSplitEnumerator;
22+
import com.google.cloud.pubsublite.flink.enumerator.SingleSubscriptionSplitDiscovery;
23+
import com.google.cloud.pubsublite.flink.enumerator.SplitDiscovery;
24+
import com.google.cloud.pubsublite.flink.enumerator.SplitEnumeratorCheckpointSerializer;
25+
import com.google.cloud.pubsublite.flink.enumerator.UniformPartitionAssigner;
26+
import com.google.cloud.pubsublite.flink.proto.SplitEnumeratorCheckpoint;
27+
import com.google.cloud.pubsublite.flink.reader.PubsubLiteRecordEmitter;
28+
import com.google.cloud.pubsublite.flink.reader.PubsubLiteSourceReader;
29+
import com.google.cloud.pubsublite.flink.split.SubscriptionPartitionSplit;
30+
import com.google.cloud.pubsublite.flink.split.SubscriptionPartitionSplitSerializer;
31+
import com.google.cloud.pubsublite.internal.ExtractStatus;
32+
import org.apache.flink.api.common.serialization.DeserializationSchema;
33+
import org.apache.flink.api.common.typeinfo.TypeInformation;
34+
import org.apache.flink.api.connector.source.*;
35+
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
36+
import org.apache.flink.configuration.Configuration;
37+
import org.apache.flink.core.io.SimpleVersionedSerializer;
38+
import org.apache.flink.metrics.MetricGroup;
39+
import org.apache.flink.util.UserCodeClassLoader;
40+
41+
public class PubsubLiteSource<OutputT>
42+
implements Source<OutputT, SubscriptionPartitionSplit, SplitEnumeratorCheckpoint>,
43+
ResultTypeQueryable<OutputT> {
44+
private final PubsubLiteSourceSettings<OutputT> settings;
45+
46+
public PubsubLiteSource(PubsubLiteSourceSettings<OutputT> settings) {
47+
this.settings = settings;
48+
}
49+
50+
@Override
51+
public Boundedness getBoundedness() {
52+
return settings.boundedness();
53+
}
54+
55+
@Override
56+
public SourceReader<OutputT, SubscriptionPartitionSplit> createReader(
57+
SourceReaderContext readerContext) throws Exception {
58+
PubsubLiteDeserializationSchema<OutputT> schema = settings.deserializationSchema();
59+
schema.open(
60+
new DeserializationSchema.InitializationContext() {
61+
@Override
62+
public MetricGroup getMetricGroup() {
63+
return readerContext.metricGroup();
64+
}
65+
66+
@Override
67+
public UserCodeClassLoader getUserCodeClassLoader() {
68+
return readerContext.getUserCodeClassLoader();
69+
}
70+
});
71+
return new PubsubLiteSourceReader<>(
72+
new PubsubLiteRecordEmitter<>(),
73+
settings.getCursorCommitter(),
74+
settings.getSplitReaderSupplier(),
75+
new Configuration(),
76+
readerContext);
77+
}
78+
79+
@Override
80+
public SplitEnumerator<SubscriptionPartitionSplit, SplitEnumeratorCheckpoint> createEnumerator(
81+
SplitEnumeratorContext<SubscriptionPartitionSplit> enumContext) {
82+
TopicPath topic;
83+
try (AdminClient adminClient = settings.getAdminClient()) {
84+
topic =
85+
TopicPath.parse(
86+
adminClient.getSubscription(settings.subscriptionPath()).get().getTopic());
87+
} catch (Throwable t) {
88+
throw ExtractStatus.toCanonical(t).underlying;
89+
}
90+
return new PubsubLiteSplitEnumerator(
91+
enumContext,
92+
UniformPartitionAssigner.create(),
93+
SingleSubscriptionSplitDiscovery.create(
94+
settings.getAdminClient(),
95+
settings.getCursorClient(),
96+
topic,
97+
settings.subscriptionPath()),
98+
settings.boundedness());
99+
}
100+
101+
@Override
102+
public SplitEnumerator<SubscriptionPartitionSplit, SplitEnumeratorCheckpoint> restoreEnumerator(
103+
SplitEnumeratorContext<SubscriptionPartitionSplit> enumContext,
104+
SplitEnumeratorCheckpoint checkpoint) {
105+
PartitionAssigner assigner =
106+
UniformPartitionAssigner.fromCheckpoint(checkpoint.getAssignmentsList());
107+
SplitDiscovery discovery =
108+
SingleSubscriptionSplitDiscovery.fromCheckpoint(
109+
checkpoint.getDiscovery(),
110+
assigner.listSplits(),
111+
settings.getAdminClient(),
112+
settings.getCursorClient());
113+
return new PubsubLiteSplitEnumerator(enumContext, assigner, discovery, settings.boundedness());
114+
}
115+
116+
@Override
117+
public SimpleVersionedSerializer<SubscriptionPartitionSplit> getSplitSerializer() {
118+
return new SubscriptionPartitionSplitSerializer();
119+
}
120+
121+
@Override
122+
public SimpleVersionedSerializer<SplitEnumeratorCheckpoint> getEnumeratorCheckpointSerializer() {
123+
return new SplitEnumeratorCheckpointSerializer();
124+
}
125+
126+
@Override
127+
public TypeInformation<OutputT> getProducedType() {
128+
return settings.deserializationSchema().getProducedType();
129+
}
130+
}
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
/*
2+
* Copyright 2021 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.pubsublite.flink;
17+
18+
import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
19+
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultMetadata;
20+
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;
21+
22+
import com.google.api.core.ApiFutureCallback;
23+
import com.google.api.core.ApiFutures;
24+
import com.google.api.gax.rpc.ApiException;
25+
import com.google.auto.value.AutoValue;
26+
import com.google.cloud.pubsublite.AdminClient;
27+
import com.google.cloud.pubsublite.AdminClientSettings;
28+
import com.google.cloud.pubsublite.Partition;
29+
import com.google.cloud.pubsublite.SequencedMessage;
30+
import com.google.cloud.pubsublite.SubscriptionPath;
31+
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
32+
import com.google.cloud.pubsublite.flink.reader.*;
33+
import com.google.cloud.pubsublite.flink.split.SubscriptionPartitionSplit;
34+
import com.google.cloud.pubsublite.internal.BlockingPullSubscriber;
35+
import com.google.cloud.pubsublite.internal.BlockingPullSubscriberImpl;
36+
import com.google.cloud.pubsublite.internal.CursorClient;
37+
import com.google.cloud.pubsublite.internal.CursorClientSettings;
38+
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
39+
import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
40+
import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder;
41+
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
42+
import com.google.cloud.pubsublite.proto.Cursor;
43+
import com.google.cloud.pubsublite.proto.SeekRequest;
44+
import com.google.cloud.pubsublite.v1.SubscriberServiceClient;
45+
import com.google.cloud.pubsublite.v1.SubscriberServiceSettings;
46+
import com.google.common.util.concurrent.MoreExecutors;
47+
import java.io.Serializable;
48+
import java.util.function.Consumer;
49+
import java.util.function.Supplier;
50+
import javax.annotation.Nullable;
51+
import org.apache.flink.api.connector.source.Boundedness;
52+
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
53+
import org.apache.flink.util.function.SerializableSupplier;
54+
import org.slf4j.Logger;
55+
import org.slf4j.LoggerFactory;
56+
57+
@AutoValue
58+
public abstract class PubsubLiteSourceSettings<OutputT> implements Serializable {
59+
private static final Logger LOG = LoggerFactory.getLogger(PubsubLiteSourceSettings.class);
60+
private static final long serialVersionUID = 3206181560865850636L;
61+
62+
public static <OutputT> Builder<OutputT> builder(
63+
PubsubLiteDeserializationSchema<OutputT> schema) {
64+
return new AutoValue_PubsubLiteSourceSettings.Builder<OutputT>()
65+
.setDeserializationSchema(schema)
66+
.setBoundedness(Boundedness.CONTINUOUS_UNBOUNDED)
67+
.setTimestampSelector(MessageTimestampExtractor.publishTimeExtractor())
68+
.setPartitionFinishedCondition(PartitionFinishedCondition.continueIndefinitely());
69+
}
70+
71+
public static Builder<SequencedMessage> messagesBuilder() {
72+
return builder(PubsubLiteDeserializationSchema.sequencedMessageSchema());
73+
}
74+
75+
// Required
76+
public abstract SubscriptionPath subscriptionPath();
77+
78+
// Required
79+
public abstract FlowControlSettings flowControlSettings();
80+
81+
// Optional
82+
public abstract Boundedness boundedness();
83+
84+
// Optional
85+
public abstract MessageTimestampExtractor timestampSelector();
86+
87+
// Optional
88+
public abstract PartitionFinishedCondition.Factory partitionFinishedCondition();
89+
90+
// Internal
91+
abstract PubsubLiteDeserializationSchema<OutputT> deserializationSchema();
92+
93+
abstract @Nullable SerializableSupplier<AdminClient> adminClientSupplier();
94+
95+
abstract @Nullable SerializableSupplier<CursorClient> cursorClientSupplier();
96+
97+
private static SubscriberServiceClient newSubscriberServiceClient(
98+
SubscriptionPath path, Partition partition) throws ApiException {
99+
try {
100+
SubscriberServiceSettings.Builder settingsBuilder = SubscriberServiceSettings.newBuilder();
101+
settingsBuilder =
102+
addDefaultMetadata(
103+
PubsubContext.of(PubsubContext.Framework.of("FLINK")),
104+
RoutingMetadata.of(path, partition),
105+
settingsBuilder);
106+
return SubscriberServiceClient.create(
107+
addDefaultSettings(path.location().region(), settingsBuilder));
108+
} catch (Throwable t) {
109+
throw toCanonical(t).underlying;
110+
}
111+
}
112+
113+
private static SubscriberFactory getSubscriberFactory(
114+
SubscriptionPath path, Partition partition, SeekRequest seek) {
115+
return (consumer) ->
116+
SubscriberBuilder.newBuilder()
117+
.setSubscriptionPath(path)
118+
.setPartition(partition)
119+
.setServiceClient(newSubscriberServiceClient(path, partition))
120+
.setMessageConsumer(consumer)
121+
.setInitialLocation(seek)
122+
.build();
123+
}
124+
125+
AdminClient getAdminClient() {
126+
if (adminClientSupplier() != null) {
127+
return adminClientSupplier().get();
128+
}
129+
return AdminClient.create(
130+
AdminClientSettings.newBuilder().setRegion(subscriptionPath().location().region()).build());
131+
}
132+
133+
CursorClient getCursorClient() {
134+
if (cursorClientSupplier() != null) {
135+
return cursorClientSupplier().get();
136+
}
137+
return CursorClient.create(
138+
CursorClientSettings.newBuilder()
139+
.setRegion(subscriptionPath().location().region())
140+
.build());
141+
}
142+
143+
CompletablePullSubscriber.Factory getSplitStateFactory() {
144+
return split -> {
145+
SeekRequest seek =
146+
SeekRequest.newBuilder()
147+
.setCursor(Cursor.newBuilder().setOffset(split.start().value()).build())
148+
.build();
149+
SubscriberFactory factory =
150+
getSubscriberFactory(split.subscriptionPath(), split.partition(), seek);
151+
152+
BlockingPullSubscriber b = new BlockingPullSubscriberImpl(factory, flowControlSettings());
153+
return new CompletablePullSubscriberImpl(
154+
b, partitionFinishedCondition().New(split.subscriptionPath(), split.partition()));
155+
};
156+
}
157+
158+
Supplier<SplitReader<Record<OutputT>, SubscriptionPartitionSplit>> getSplitReaderSupplier() {
159+
return () ->
160+
new DeserializingSplitReader<>(
161+
new MessageSplitReader(getSplitStateFactory()),
162+
deserializationSchema(),
163+
timestampSelector());
164+
}
165+
166+
Consumer<SubscriptionPartitionSplit> getCursorCommitter() {
167+
CursorClient client = getCursorClient();
168+
return (SubscriptionPartitionSplit split) -> {
169+
ApiFutures.addCallback(
170+
client.commitCursor(split.subscriptionPath(), split.partition(), split.start()),
171+
new ApiFutureCallback<Void>() {
172+
@Override
173+
public void onFailure(Throwable throwable) {
174+
LOG.error("Failed to commit cursor to Pub/Sub Lite ", throwable);
175+
}
176+
177+
@Override
178+
public void onSuccess(Void unused) {}
179+
},
180+
MoreExecutors.directExecutor());
181+
};
182+
}
183+
184+
@AutoValue.Builder
185+
abstract static class Builder<OutputT> {
186+
// Required
187+
public abstract Builder<OutputT> setSubscriptionPath(SubscriptionPath path);
188+
189+
// Required
190+
public abstract Builder<OutputT> setFlowControlSettings(FlowControlSettings settings);
191+
192+
// Optional
193+
public abstract Builder<OutputT> setBoundedness(Boundedness value);
194+
195+
// Optional
196+
public abstract Builder<OutputT> setTimestampSelector(MessageTimestampExtractor value);
197+
198+
// Optional
199+
public abstract Builder<OutputT> setPartitionFinishedCondition(
200+
PartitionFinishedCondition.Factory value);
201+
202+
abstract Builder<OutputT> setDeserializationSchema(
203+
PubsubLiteDeserializationSchema<OutputT> schema);
204+
205+
abstract Builder<OutputT> setAdminClientSupplier(SerializableSupplier<AdminClient> value);
206+
207+
abstract Builder<OutputT> setCursorClientSupplier(SerializableSupplier<CursorClient> value);
208+
209+
abstract PubsubLiteSourceSettings<OutputT> build();
210+
}
211+
}

src/main/java/com/google/cloud/pubsublite/flink/enumerator/PartitionAssigner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import java.util.List;
2222
import java.util.Map;
2323

24-
interface PartitionAssigner {
24+
public interface PartitionAssigner {
2525

2626
List<SplitEnumeratorCheckpoint.Assignment> checkpoint();
2727

0 commit comments

Comments
 (0)