Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ protected Transformation<Object> createSinkTransformation(
outputObject = ((SinkFunctionProvider) runtimeProvider).createSinkFunction();
} else if (runtimeProvider instanceof SinkV2Provider) {
outputObject = ((SinkV2Provider) runtimeProvider).createSink();
} else if (runtimeProvider instanceof DataStreamSinkProvider) {
outputObject = runtimeProvider;
}

Optional<LineageVertex> lineageVertexOpt =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ protected Transformation<RowData> translateToPlanInternal(
.getTransformation();
metadata.fill(sourceTransform);
} else if (provider instanceof DataStreamScanProvider) {
lineageVertex = TableLineageUtils.extractLineageDataset(provider);
sourceTransform =
((DataStreamScanProvider) provider)
.produceDataStream(createProviderContext(metadata, config), env)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@

package org.apache.flink.table.planner.lineage;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.lineage.DefaultLineageDataset;
import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.Catalog;
Expand All @@ -29,12 +35,17 @@
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.data.RowData;

import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

Expand Down Expand Up @@ -114,4 +125,126 @@ void testCreateTableLineageDatasetWithoutCatalog() {
assertThat(tableLineageDataset.catalogContext().getCatalogName()).isEmpty();
assertThat(tableLineageDataset.name()).isEqualTo(objectIdentifier.asSummaryString());
}

@Test
void testExtractLineageDatasetFromDataStreamScanProvider() {
DataStreamScanProvider provider =
new DataStreamScanProvider() {
@Override
public DataStream<RowData> produceDataStream(
ProviderContext providerContext,
StreamExecutionEnvironment execEnv) {
return null;
}

@Override
public boolean isBounded() {
return false;
}
};

// A plain DataStreamScanProvider without LineageVertexProvider should return empty
Optional<LineageVertex> result = TableLineageUtils.extractLineageDataset(provider);
assertThat(result).isEmpty();
}

@Test
void testExtractLineageDatasetFromDataStreamScanProviderWithLineage() {
DataStreamScanProvider provider =
new TestDataStreamScanProviderWithLineage("test-source", "test://namespace");

Optional<LineageVertex> result = TableLineageUtils.extractLineageDataset(provider);
assertThat(result).isPresent();
assertThat(result.get().datasets()).hasSize(1);
assertThat(result.get().datasets().get(0).name()).isEqualTo("test-source");
assertThat(result.get().datasets().get(0).namespace()).isEqualTo("test://namespace");
}

@Test
void testExtractLineageDatasetFromDataStreamSinkProvider() {
DataStreamSinkProvider provider =
new DataStreamSinkProvider() {
@Override
public DataStreamSink<?> consumeDataStream(
ProviderContext providerContext,
DataStream<RowData> dataStream) {
return null;
}
};

// A plain DataStreamSinkProvider without LineageVertexProvider should return empty
Optional<LineageVertex> result = TableLineageUtils.extractLineageDataset(provider);
assertThat(result).isEmpty();
}

@Test
void testExtractLineageDatasetFromDataStreamSinkProviderWithLineage() {
DataStreamSinkProvider provider =
new TestDataStreamSinkProviderWithLineage("test-sink", "test://namespace");

Optional<LineageVertex> result = TableLineageUtils.extractLineageDataset(provider);
assertThat(result).isPresent();
assertThat(result.get().datasets()).hasSize(1);
assertThat(result.get().datasets().get(0).name()).isEqualTo("test-sink");
assertThat(result.get().datasets().get(0).namespace()).isEqualTo("test://namespace");
}

/**
* A {@link DataStreamScanProvider} that also implements {@link LineageVertexProvider}, similar
* to how Apache Paimon's PaimonDataStreamScanProvider works.
*/
private static class TestDataStreamScanProviderWithLineage
implements DataStreamScanProvider, LineageVertexProvider {
private final String name;
private final String namespace;

TestDataStreamScanProviderWithLineage(String name, String namespace) {
this.name = name;
this.namespace = namespace;
}

@Override
public DataStream<RowData> produceDataStream(
ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
return null;
}

@Override
public boolean isBounded() {
return false;
}

@Override
public LineageVertex getLineageVertex() {
return () ->
List.of(new DefaultLineageDataset(name, namespace, Collections.emptyMap()));
}
}

/**
* A {@link DataStreamSinkProvider} that also implements {@link LineageVertexProvider}, similar
* to how Apache Paimon's PaimonDataStreamSinkProvider works.
*/
private static class TestDataStreamSinkProviderWithLineage
implements DataStreamSinkProvider, LineageVertexProvider {
private final String name;
private final String namespace;

TestDataStreamSinkProviderWithLineage(String name, String namespace) {
this.name = name;
this.namespace = namespace;
}

@Override
public DataStreamSink<?> consumeDataStream(
ProviderContext providerContext, DataStream<RowData> dataStream) {
return null;
}

@Override
public LineageVertex getLineageVertex() {
return () ->
List.of(new DefaultLineageDataset(name, namespace, Collections.emptyMap()));
}
}
}