[flink] Implement FLIP-314 LineageVertexProvider for source and sink connectors#7311
[flink] Implement FLIP-314 LineageVertexProvider for source and sink connectors#7311jsingh-yelp wants to merge 1 commit intoapache:masterfrom
Conversation
7fc3948 to
de23686
Compare
de23686 to
c1e883e
Compare
| </dependency> | ||
| <dependency> | ||
| <groupId>org.apache.flink</groupId> | ||
| <artifactId>flink-table-api-java-bridge</artifactId> |
There was a problem hiding this comment.
This is required because since now we have to import DataStreamScanProvider in flink 2-common. Code ref: https://github.com/apache/flink/blob/c0479c74f2b736aaed00b4d9fb1b14ad296e562d/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/connector/source/DataStreamScanProvider.java#L40
|
@JingsongLi can I please have a review on this |
|
Cool, @yunfengzhou-hub can you take a look? |
yunfengzhou-hub
left a comment
There was a problem hiding this comment.
Thanks for the PR. Generally LGTM. Left one comment as below.
| } | ||
|
|
||
| private static class LineageAwarePaimonDataStreamScanProvider | ||
| implements DataStreamScanProvider, LineageVertexProvider { |
There was a problem hiding this comment.
Compared with delegate pattern, would it be better to directly have PaimonDataStreamScanProvider implement LineageVertexProvider?
Same for LineageAwarePaimonDataStreamSinkProvider.
There was a problem hiding this comment.
Thanks for the review @yunfengzhou-hub. The reason for the delegate pattern is backward compatibility. PaimonDataStreamScanProvider lives in paimon-flink-common, which is built against multiple Flink versions, including ones older than 1.20. Since LineageVertexProvider only exists in Flink 1.20+, having PaimonDataStreamScanProvider directly implement it would cause a ClassNotFoundException at build/load time for older Flink versions.
By using the delegate pattern with a version-specific DataStreamProviderFactory, the lineage-aware wrapper is only created in the paimon-flink2-common module (which targets Flink 2.x), and the older Flink modules never reference LineageVertexProvider at all.
Purpose
LineageVertexProviderwhich helps flink (FLIP-314) to generate lineage events with additional details which flink don't have access to.Tests
API and Format
PaimonDataStreamScanProviderandPaimonDataStreamSinkProviderare constructed callers now use createProvider() factory methods instead of direct constructors. This is necessary because the FLIP-314 implementation requires the returnedScanRuntimeProvider/SinkRuntimeProviderto also implementLineageVertexProvider, an interface that only exists in Flink 1.20+. To maintain backward compatibility with older Flink versions, the factory methods delegate to a version-specificDataStreamProviderFactory: the Flink 2.x variant wraps the provider with lineage support, while the Flink 1.x stub returns it unchanged.Documentation
Note: This change also depends on two other changes and one of them is already merged. Other two changes: