Skip to content

Ideal design of FLIP 27 integration

Brian Zhou edited this page Dec 16, 2022 · 6 revisions

Background

Flink 1.11 introduced new Data Source API as part of FLIP-27. Here is the API introduction documents. https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/sources.html

POC Code for reference

https://github.com/crazyzhou/flink-connectors/tree/new-flip-27/src/main/java/io/pravega/connectors/flink/source

Concept mapping

Source

The Source is a factory class to create the instances of the below concepts.

public class PravegaSource<T> implements Source<T, PravegaSplit, PravegaSplitEnumState>, ResultTypeQueryable<T> {}

Split

A Split represents a Pravega segment with begin and end read offset.

public class PravegaSplit implements SourceSplit, Serializable {
    private final Segment segment;
    private final long beginOffset;
    private final long endOffset;
}

SplitEnumerator

The SplitEnumerator is a single instance on Flink jobmanager. It is the "brain" of the source. It helps to discover and assign the splits.

public class PravegaSplitEnumerator implements SplitEnumerator<PravegaSplit, PravegaSplitEnumState> {}

The Enumerator State is the checkpoint storing the assigned splits. handleSourceEvent(int subtaskId, SourceEvent sourceEvent) method is a method to communicate between enumerator and reader, for now it's not used. addSplitsBack(List<PravegaSplit> splits, int subtaskId) method is called when it tries to "regionly" recover from a failure to assign unassigned splits. We will reassign them to the reader task. addReader(int subtaskId) function is called to create a reader and assign the pending splits. snapshotState() method will record the enumerator state which is all the assigned Splits.

SourceReader

The SourceReader has a default recommended Flink implementation SingleThreadMultiplexSourceReaderBase for multiplexed readers. It read splits with one thread using one SplitReader. With this recommended API, it allows us to just provide a SplitReader and RecordEmitter to implement this as a whole without considering complex sychronization with Flink internal threads.

public class PravegaSourceReader<T>
        extends SingleThreadMultiplexSourceReaderBase<EventRead<T>, T, PravegaSplit, PravegaSplitState> {}

PravegaSplitState is a mutable type of the Pravega split, it will update the latest read offset per event. It will have an additional currentOffset field to the PravegaSplit

SplitReader

The SplitReader is actually the place to perform a read call. It will contain an instance of a reader of one or more Pravega segments. It has a fetch() interface to read one or more PravegaEvents for one call.

public class PravegaSplitReader<T> implements SplitReader<PravegaEvent, PravegaSplit> {}

PravegaEvent is a wrapper class of the event, similar to Pravega's EventRead, which contains the segmentID, offset and the raw bytes being read.

RecordEmitter is actually a function to turn PravegaEvent into T. We will get the event and update the offset in PravegaSplitState to ensure the state persisted.

Current bottleneck

Split reader implementation

As Pravega client do not have such reader directly, we are now trying to use the batch reader API to read. We would initiate a number of SegmentIterators to perform the read, but the current problem is that the batch client requires to have an end offset, but normally a streaming reader does not have one.