Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,19 @@
package org.apache.flink.cdc.composer.flink;

import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.factories.DataSinkFactory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.composer.PipelineComposer;
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.cdc.composer.definition.PipelineDef;
import org.apache.flink.cdc.composer.definition.SinkDef;
import org.apache.flink.cdc.composer.flink.coordination.OperatorIDGenerator;
import org.apache.flink.cdc.composer.flink.translator.DataSinkTranslator;
import org.apache.flink.cdc.composer.flink.translator.DataSourceTranslator;
import org.apache.flink.cdc.composer.flink.translator.PartitioningTranslator;
import org.apache.flink.cdc.composer.flink.translator.SchemaOperatorTranslator;
import org.apache.flink.cdc.composer.flink.translator.TransformTranslator;
import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
import org.apache.flink.cdc.runtime.serializer.event.EventSerializer;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand Down Expand Up @@ -128,7 +123,9 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE));

// Build DataSink in advance as schema operator requires MetadataApplier
DataSink dataSink = createDataSink(pipelineDef.getSink(), pipelineDef.getConfig());
DataSinkTranslator sinkTranslator = new DataSinkTranslator();
DataSink dataSink =
sinkTranslator.createDataSink(pipelineDef.getSink(), pipelineDef.getConfig(), env);

stream =
schemaOperatorTranslator.translate(
Expand All @@ -150,7 +147,6 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
dataSink.getDataChangeEventHashFunctionProvider());

// Build Sink Operator
DataSinkTranslator sinkTranslator = new DataSinkTranslator();
sinkTranslator.translate(
pipelineDef.getSink(), stream, dataSink, schemaOperatorIDGenerator.generate());

Expand All @@ -161,24 +157,6 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
env, pipelineDef.getConfig().get(PipelineOptions.PIPELINE_NAME), isBlocking);
}

private DataSink createDataSink(SinkDef sinkDef, Configuration pipelineConfig) {
// Search the data sink factory
DataSinkFactory sinkFactory =
FactoryDiscoveryUtils.getFactoryByIdentifier(
sinkDef.getType(), DataSinkFactory.class);

// Include sink connector JAR
FactoryDiscoveryUtils.getJarPathByIdentifier(sinkDef.getType(), DataSinkFactory.class)
.ifPresent(jar -> FlinkEnvironmentUtils.addJar(env, jar));

// Create data sink
return sinkFactory.createDataSink(
new FactoryHelper.DefaultContext(
sinkDef.getConfig(),
pipelineConfig,
Thread.currentThread().getContextClassLoader()));
}

private void addFrameworkJars() {
try {
Set<URI> frameworkJars = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,17 @@
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.factories.DataSinkFactory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.sink.DataSink;
import org.apache.flink.cdc.common.sink.EventSinkProvider;
import org.apache.flink.cdc.common.sink.FlinkSinkFunctionProvider;
import org.apache.flink.cdc.common.sink.FlinkSinkProvider;
import org.apache.flink.cdc.composer.definition.SinkDef;
import org.apache.flink.cdc.composer.flink.FlinkEnvironmentUtils;
import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
import org.apache.flink.cdc.runtime.operators.sink.DataSinkFunctionOperator;
import org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperatorFactory;
import org.apache.flink.core.io.SimpleVersionedSerializer;
Expand All @@ -53,6 +58,25 @@ public class DataSinkTranslator {
private static final String SINK_WRITER_PREFIX = "Sink Writer: ";
private static final String SINK_COMMITTER_PREFIX = "Sink Committer: ";

public DataSink createDataSink(
SinkDef sinkDef, Configuration pipelineConfig, StreamExecutionEnvironment env) {
// Search the data sink factory
DataSinkFactory sinkFactory =
FactoryDiscoveryUtils.getFactoryByIdentifier(
sinkDef.getType(), DataSinkFactory.class);

// Include sink connector JAR
FactoryDiscoveryUtils.getJarPathByIdentifier(sinkFactory)
.ifPresent(jar -> FlinkEnvironmentUtils.addJar(env, jar));

// Create data sink
return sinkFactory.createDataSink(
new FactoryHelper.DefaultContext(
sinkDef.getConfig(),
pipelineConfig,
Thread.currentThread().getContextClassLoader()));
}

public void translate(
SinkDef sinkDef,
DataStream<Event> input,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,8 @@ public class DataSourceTranslator {

public DataStreamSource<Event> translate(
SourceDef sourceDef, StreamExecutionEnvironment env, Configuration pipelineConfig) {
// Search the data source factory
DataSourceFactory sourceFactory =
FactoryDiscoveryUtils.getFactoryByIdentifier(
sourceDef.getType(), DataSourceFactory.class);

// Create data source
DataSource dataSource =
sourceFactory.createDataSource(
new FactoryHelper.DefaultContext(
sourceDef.getConfig(),
pipelineConfig,
Thread.currentThread().getContextClassLoader()));

// Add source JAR to environment
FactoryDiscoveryUtils.getJarPathByIdentifier(sourceDef.getType(), DataSourceFactory.class)
.ifPresent(jar -> FlinkEnvironmentUtils.addJar(env, jar));
DataSource dataSource = createDataSource(sourceDef, env, pipelineConfig);

// Get source provider
final int sourceParallelism = pipelineConfig.get(PipelineOptions.PIPELINE_PARALLELISM);
Expand Down Expand Up @@ -91,6 +77,24 @@ public DataStreamSource<Event> translate(
}
}

private DataSource createDataSource(
SourceDef sourceDef, StreamExecutionEnvironment env, Configuration pipelineConfig) {
// Search the data source factory
DataSourceFactory sourceFactory =
FactoryDiscoveryUtils.getFactoryByIdentifier(
sourceDef.getType(), DataSourceFactory.class);
// Add source JAR to environment
FactoryDiscoveryUtils.getJarPathByIdentifier(sourceFactory)
.ifPresent(jar -> FlinkEnvironmentUtils.addJar(env, jar));
DataSource dataSource =
sourceFactory.createDataSource(
new FactoryHelper.DefaultContext(
sourceDef.getConfig(),
pipelineConfig,
Thread.currentThread().getContextClassLoader()));
return dataSource;
}

private String generateDefaultSourceName(SourceDef sourceDef) {
return String.format("Flink CDC Event Source: %s", sourceDef.getType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;

import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
Expand All @@ -42,6 +44,7 @@ public class FactoryDiscoveryUtils {
private FactoryDiscoveryUtils() {}

/** Returns the {@link Factory} for the given identifier. */
@Nonnull
@SuppressWarnings("unchecked")
public static <T extends Factory> T getFactoryByIdentifier(
String identifier, Class<T> factoryClass) {
Expand Down Expand Up @@ -88,10 +91,8 @@ public static <T extends Factory> T getFactoryByIdentifier(
/**
* Return the path of the jar file that contains the {@link Factory} for the given identifier.
*/
public static <T extends Factory> Optional<URL> getJarPathByIdentifier(
String identifier, Class<T> factoryClass) {
public static <T extends Factory> Optional<URL> getJarPathByIdentifier(T factory) {
try {
T factory = getFactoryByIdentifier(identifier, factoryClass);
URL url = factory.getClass().getProtectionDomain().getCodeSource().getLocation();
String urlString = url.toString();
if (urlString.contains("usrlib")) {
Expand All @@ -110,7 +111,9 @@ public static <T extends Factory> Optional<URL> getJarPathByIdentifier(
return Optional.of(url);
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to search JAR by factory identifier \"%s\"", identifier),
String.format(
"Failed to search JAR by factory identifier \"%s\"",
factory.identifier()),
e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,4 @@ void getFactoryByIdentifier() {
.hasMessageStartingWith(
"Cannot find factory with identifier \"data-sink-factory-3\" in the classpath");
}

@Test
void getJarPathByIdentifier() {
assertThat(
FactoryDiscoveryUtils.getJarPathByIdentifier(
"data-source-factory-1", Factory.class))
.isNotPresent();
}
}