Skip to content

FLIP 27 integration

Brian Zhou edited this page Sep 7, 2021 · 7 revisions

Background

Flink 1.11 introduced new Data Source API as part of FLIP-27. Here is the API introduction documents. https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/sources.html

Concept mapping

Source

The Source is a factory class to create the instances of the below concepts. It manages a reader group with a builder style constructor with user provided ReaderGroupConfig. We can reuse the current AbstractStreamingReaderBuilder to build such a source.

public class FlinkPravegaSource<T> implements Source<T, PravegaSplit, Checkpoint>, ResultTypeQueryable<T> {}

Split

A Split represents an EventStreamReader, but with no operations and keep stateless. The splitId will be the Pravega reader Id. To keep it serializable, we will not keep the reader inside.

public class PravegaSplit implements SourceSplit, Serializable {
    private int subtaskId;
    private String readerGroupName;

    @Override
    public String splitId() {
        return readerGroupName + "-" + subtaskId;
    }
}

SplitEnumerator

The SplitEnumerator is a single instance on Flink jobmanager. It connects to a Pravega reader group with the pravega stream. It is the "brain" of the source to initialize the reader group when it starts, then discover and assign the subtasks.

public class PravegaSplitEnumerator implements SplitEnumerator<PravegaSplit, Checkpoint> {}

start() method will help to initiate reader group manager, reader group and reset the group to the checkpoint position if checkpoint isn't null.

snapshotState() moethod will deal with the data recovery, it will call readerGroup::initiateCheckpoint to get a checkpoint for recovery. This call will be handled in another thread pool instead of the split enumerator thread.

addSplitsBack(List<PravegaSplit> splits, int subtaskId) method is called when it tries to "regionly" recover from a failure to assign unassigned splits. We will throw an intentional exception here to trigger a global recovery. This will shutdown the reader group, recreate it and recover it from the latest checkpoint.

addReader(int subtaskId) function will check with the current parallelism and update the assignment (always one-to-one mapping).

SourceReader

The SourceReader has a default recommended Flink implementation SourceReaderBase. This one constructs with three major components, SplitReader, SplitFetcherManager and RecordEmitter. With this recommended API, it allows us to just provide a SplitReader abstraction to implement this as a whole.

public class PravegaSourceReader<T>
        extends SourceReaderBase<EventRead<T>, T, PravegaSplit, PravegaSplit> {}

We can will use PravegaFetcherManager to handle the split fetch. It is based on Flink SingleThreadFetcherManager, each reader instance will have this fetcher which is single threaded and this fetcher will supply the split reader and assign all the splits assigned by the enumerator to it. Because Flink does not provide a method to close the split reader so far, when supply the split reader, we also need to record the splitReader and override its close method to close all the readers.

SplitReader

The SplitReader is actually an instance of a EventStreamReader. It has a fetch() interface to read one or more EventRead<T> from an EventStreamReader.

public class PravegaSplitReader<T> implements SplitReader<EventRead<T>, PravegaSplit>, AutoCloseable {}

handleSplitsChanges is the method the reader gets the assigned split. That's when we know the split(subtask) ID, so we will create the EventStreamReader instance in this method rather than the constructor of the class.

TODO: RecordEmitter is actually a function to turn EventRead<T> into T. We should offer a default implementation for just get the event, but we can also let user to DIY, especially when they want to index with the Pravega metadata. We have similar implementation in https://github.com/pravega/flink-connectors/issues/180