Skip to content

Commit 2d287cb

Browse files
feat: Restructure flink connector for new internal APIs and reduce API surface (#173)
* Restructure flink connector for new internal APIs and reduce API surface. * Restructure flink connector for new internal APIs and reduce API surface. * Restructure flink connector for new internal APIs and reduce API surface. * Restructure flink connector for new internal APIs and reduce API surface. * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 0cfef6a commit 2d287cb

File tree

53 files changed

+516
-1222
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+516
-1222
lines changed

pom.xml

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,15 @@
55
<parent>
66
<groupId>com.google.cloud</groupId>
77
<artifactId>google-cloud-pubsublite-parent</artifactId>
8-
<version>1.6.3</version>
8+
<version>1.9.2</version>
99
</parent>
1010
<modelVersion>4.0.0</modelVersion>
1111
<packaging>jar</packaging>
1212
<artifactId>google-cloud-pubsublite-flink</artifactId>
1313
<version>0.1.0-SNAPSHOT</version>
1414
<properties>
15-
<flink.version>1.13.2</flink.version>
16-
<pubsublite.version>1.2.0</pubsublite.version>
15+
<flink.version>1.12.5</flink.version>
16+
<pubsublite.version>${project.parent.version}</pubsublite.version>
1717
</properties>
1818
<build>
1919
<extensions>
@@ -107,12 +107,12 @@
107107
</dependency>
108108
<dependency>
109109
<groupId>org.apache.flink</groupId>
110-
<artifactId>flink-streaming-java_2.11</artifactId>
110+
<artifactId>flink-streaming-java_2.12</artifactId>
111111
<version>${flink.version}</version>
112112
</dependency>
113113
<dependency>
114114
<groupId>org.apache.flink</groupId>
115-
<artifactId>flink-runtime_2.11</artifactId>
115+
<artifactId>flink-runtime_2.12</artifactId>
116116
<version>${flink.version}</version>
117117
</dependency>
118118
<dependency>
@@ -144,13 +144,13 @@
144144
</dependency>
145145
<dependency>
146146
<groupId>org.apache.flink</groupId>
147-
<artifactId>flink-test-utils_2.11</artifactId>
147+
<artifactId>flink-test-utils_2.12</artifactId>
148148
<version>${flink.version}</version>
149149
<scope>test</scope>
150150
</dependency>
151151
<dependency>
152152
<groupId>org.apache.flink</groupId>
153-
<artifactId>flink-runtime_2.11</artifactId>
153+
<artifactId>flink-runtime_2.12</artifactId>
154154
<version>${flink.version}</version>
155155
<type>test-jar</type>
156156
<scope>test</scope>

src/main/java/com/google/cloud/pubsublite/flink/MessageTimestampExtractor.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,17 @@
2222

2323
public interface MessageTimestampExtractor extends Serializable {
2424
static MessageTimestampExtractor publishTimeExtractor() {
25-
return (MessageTimestampExtractor)
26-
m -> Timestamp.fromProto(m.publishTime()).toDate().toInstant();
25+
return m -> Timestamp.fromProto(m.publishTime()).toDate().toInstant();
2726
}
2827

2928
static MessageTimestampExtractor eventTimeExtractor() {
30-
return (MessageTimestampExtractor)
31-
m -> {
32-
if (m.message().eventTime().isPresent()) {
33-
return Timestamp.fromProto(m.message().eventTime().get()).toDate().toInstant();
34-
}
35-
return Timestamp.fromProto(m.publishTime()).toDate().toInstant();
36-
};
29+
return m -> {
30+
if (m.message().eventTime().isPresent()) {
31+
return Timestamp.fromProto(m.message().eventTime().get()).toDate().toInstant();
32+
}
33+
return Timestamp.fromProto(m.publishTime()).toDate().toInstant();
34+
};
3735
}
3836

39-
Instant timestamp(SequencedMessage m) throws Exception;
37+
Instant timestamp(SequencedMessage m);
4038
}

src/main/java/com/google/cloud/pubsublite/flink/PubsubLiteSink.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,16 @@
2222
import com.google.cloud.pubsublite.flink.internal.sink.SerializingPublisher;
2323
import com.google.errorprone.annotations.concurrent.GuardedBy;
2424
import java.time.Instant;
25+
import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
2526
import org.apache.flink.configuration.Configuration;
2627
import org.apache.flink.runtime.state.FunctionInitializationContext;
2728
import org.apache.flink.runtime.state.FunctionSnapshotContext;
2829
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
2930
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
3031

3132
public class PubsubLiteSink<T> extends RichSinkFunction<T> implements CheckpointedFunction {
33+
private static final long serialVersionUID = 849752028745098L;
34+
3235
private final PubsubLiteSinkSettings<T> settings;
3336

3437
@GuardedBy("this")
@@ -39,8 +42,7 @@ public PubsubLiteSink(PubsubLiteSinkSettings<T> settings) {
3942
}
4043

4144
@Override
42-
public void initializeState(FunctionInitializationContext functionInitializationContext)
43-
throws Exception {}
45+
public void initializeState(FunctionInitializationContext functionInitializationContext) {}
4446

4547
@Override
4648
public synchronized void snapshotState(FunctionSnapshotContext functionSnapshotContext)
@@ -60,11 +62,15 @@ public synchronized void invoke(T value, Context context) throws Exception {
6062
@Override
6163
public synchronized void open(Configuration parameters) throws Exception {
6264
super.open(parameters);
65+
settings
66+
.serializationSchema()
67+
.open(
68+
RuntimeContextInitializationContextAdapters.serializationAdapter(
69+
getRuntimeContext(), metricGroup -> metricGroup.addGroup("user")));
6370
publisher =
6471
new SerializingPublisher<>(
6572
new MessagePublisher(
66-
PerServerPublisherCache.getOrCreate(settings.getPublisherConfig()),
67-
settings.maxBytesOutstanding()),
73+
PerServerPublisherCache.getOrCreate(settings), settings.maxBytesOutstanding()),
6874
settings.serializationSchema());
6975
}
7076

src/main/java/com/google/cloud/pubsublite/flink/PubsubLiteSinkSettings.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818
import com.google.auto.value.AutoValue;
1919
import com.google.cloud.pubsublite.Message;
2020
import com.google.cloud.pubsublite.TopicPath;
21-
import com.google.cloud.pubsublite.flink.internal.sink.PublisherOptions;
2221
import java.io.Serializable;
2322

2423
@AutoValue
2524
public abstract class PubsubLiteSinkSettings<InputT> implements Serializable {
25+
private static final long serialVersionUID = 24356890238740987L;
2626
public static final int DEFAULT_MAX_BYTES_OUTSTANDING = 100 * 1024 * 1024;
2727
// Create a builder which will accept messages of type InputT and serialize them using the
2828
// provided serialization schema.
@@ -46,10 +46,6 @@ public static Builder<Message> messagesBuilder() {
4646
// Internal.
4747
abstract PubsubLiteSerializationSchema<InputT> serializationSchema();
4848

49-
PublisherOptions getPublisherConfig() {
50-
return PublisherOptions.create(topicPath());
51-
}
52-
5349
@AutoValue.Builder
5450
public abstract static class Builder<InputT> {
5551
// Required.

src/main/java/com/google/cloud/pubsublite/flink/PubsubLiteSource.java

Lines changed: 31 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,26 @@
1515
*/
1616
package com.google.cloud.pubsublite.flink;
1717

18-
import com.google.cloud.pubsublite.AdminClient;
19-
import com.google.cloud.pubsublite.TopicPath;
20-
import com.google.cloud.pubsublite.flink.internal.enumerator.PartitionAssigner;
21-
import com.google.cloud.pubsublite.flink.internal.enumerator.PubsubLiteSplitEnumerator;
22-
import com.google.cloud.pubsublite.flink.internal.enumerator.SingleSubscriptionSplitDiscovery;
23-
import com.google.cloud.pubsublite.flink.internal.enumerator.SplitDiscovery;
24-
import com.google.cloud.pubsublite.flink.internal.enumerator.SplitEnumeratorCheckpointSerializer;
25-
import com.google.cloud.pubsublite.flink.internal.enumerator.UniformPartitionAssigner;
26-
import com.google.cloud.pubsublite.flink.internal.reader.PubsubLiteRecordEmitter;
27-
import com.google.cloud.pubsublite.flink.internal.reader.PubsubLiteSourceReader;
28-
import com.google.cloud.pubsublite.flink.internal.split.SubscriptionPartitionSplit;
29-
import com.google.cloud.pubsublite.flink.internal.split.SubscriptionPartitionSplitSerializer;
18+
import com.google.cloud.pubsublite.flink.internal.source.SourceAssembler;
19+
import com.google.cloud.pubsublite.flink.internal.source.enumerator.PartitionAssigner;
20+
import com.google.cloud.pubsublite.flink.internal.source.enumerator.PubsubLiteSplitEnumerator;
21+
import com.google.cloud.pubsublite.flink.internal.source.enumerator.SingleSubscriptionSplitDiscovery;
22+
import com.google.cloud.pubsublite.flink.internal.source.enumerator.SplitDiscovery;
23+
import com.google.cloud.pubsublite.flink.internal.source.enumerator.SplitEnumeratorCheckpointSerializer;
24+
import com.google.cloud.pubsublite.flink.internal.source.enumerator.UniformPartitionAssigner;
25+
import com.google.cloud.pubsublite.flink.internal.source.reader.PubsubLiteRecordEmitter;
26+
import com.google.cloud.pubsublite.flink.internal.source.reader.PubsubLiteSourceReader;
27+
import com.google.cloud.pubsublite.flink.internal.source.split.SubscriptionPartitionSplit;
28+
import com.google.cloud.pubsublite.flink.internal.source.split.SubscriptionPartitionSplitSerializer;
3029
import com.google.cloud.pubsublite.flink.proto.SplitEnumeratorCheckpoint;
31-
import com.google.cloud.pubsublite.internal.ExtractStatus;
3230
import org.apache.flink.api.common.serialization.DeserializationSchema;
3331
import org.apache.flink.api.common.typeinfo.TypeInformation;
34-
import org.apache.flink.api.connector.source.*;
32+
import org.apache.flink.api.connector.source.Boundedness;
33+
import org.apache.flink.api.connector.source.Source;
34+
import org.apache.flink.api.connector.source.SourceReader;
35+
import org.apache.flink.api.connector.source.SourceReaderContext;
36+
import org.apache.flink.api.connector.source.SplitEnumerator;
37+
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
3538
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
3639
import org.apache.flink.configuration.Configuration;
3740
import org.apache.flink.core.io.SimpleVersionedSerializer;
@@ -49,7 +52,7 @@ public PubsubLiteSource(PubsubLiteSourceSettings<OutputT> settings) {
4952

5053
@Override
5154
public Boundedness getBoundedness() {
52-
return settings.boundedness();
55+
return Boundedness.CONTINUOUS_UNBOUNDED;
5356
}
5457

5558
@Override
@@ -65,37 +68,30 @@ public MetricGroup getMetricGroup() {
6568

6669
@Override
6770
public UserCodeClassLoader getUserCodeClassLoader() {
68-
return readerContext.getUserCodeClassLoader();
71+
return null;
6972
}
7073
});
74+
SourceAssembler<OutputT> assembler = new SourceAssembler<>(settings);
7175
return new PubsubLiteSourceReader<>(
7276
new PubsubLiteRecordEmitter<>(),
73-
settings.getCursorClient(),
74-
settings.getSplitReaderSupplier(),
77+
assembler.getCursorClientRemoveThis(),
78+
assembler.getSplitReaderSupplier(),
7579
new Configuration(),
7680
readerContext);
7781
}
7882

7983
@Override
8084
public SplitEnumerator<SubscriptionPartitionSplit, SplitEnumeratorCheckpoint> createEnumerator(
8185
SplitEnumeratorContext<SubscriptionPartitionSplit> enumContext) {
82-
TopicPath topic;
83-
try (AdminClient adminClient = settings.getAdminClient()) {
84-
topic =
85-
TopicPath.parse(
86-
adminClient.getSubscription(settings.subscriptionPath()).get().getTopic());
87-
} catch (Throwable t) {
88-
throw ExtractStatus.toCanonical(t).underlying;
89-
}
86+
SourceAssembler<OutputT> assembler = new SourceAssembler<>(settings);
9087
return new PubsubLiteSplitEnumerator(
9188
enumContext,
9289
UniformPartitionAssigner.create(),
9390
SingleSubscriptionSplitDiscovery.create(
94-
settings.getAdminClient(),
95-
settings.getCursorClient(),
96-
topic,
97-
settings.subscriptionPath()),
98-
settings.boundedness());
91+
assembler.newAdminClient(),
92+
assembler.getCursorClientRemoveThis(),
93+
assembler.getTopicPath(),
94+
settings.subscriptionPath()));
9995
}
10096

10197
@Override
@@ -104,13 +100,14 @@ public SplitEnumerator<SubscriptionPartitionSplit, SplitEnumeratorCheckpoint> re
104100
SplitEnumeratorCheckpoint checkpoint) {
105101
PartitionAssigner assigner =
106102
UniformPartitionAssigner.fromCheckpoint(checkpoint.getAssignmentsList());
103+
SourceAssembler<OutputT> assembler = new SourceAssembler<>(settings);
107104
SplitDiscovery discovery =
108105
SingleSubscriptionSplitDiscovery.fromCheckpoint(
109106
checkpoint.getDiscovery(),
110107
assigner.listSplits(),
111-
settings.getAdminClient(),
112-
settings.getCursorClient());
113-
return new PubsubLiteSplitEnumerator(enumContext, assigner, discovery, settings.boundedness());
108+
assembler.newAdminClient(),
109+
assembler.getCursorClientRemoveThis());
110+
return new PubsubLiteSplitEnumerator(enumContext, assigner, discovery);
114111
}
115112

116113
@Override

0 commit comments

Comments
 (0)