Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
// Build Source Operator
DataSourceTranslator sourceTranslator = new DataSourceTranslator();
DataStream<Event> stream =
sourceTranslator.translate(pipelineDef.getSource(), env, pipelineDef.getConfig());
sourceTranslator.translate(
pipelineDef.getSource(), env, pipelineDef.getConfig(), parallelism);

// Build PreTransformOperator for processing Schema Event
TransformTranslator transformTranslator = new TransformTranslator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.factories.DataSourceFactory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.source.DataSource;
import org.apache.flink.cdc.common.source.EventSourceProvider;
import org.apache.flink.cdc.common.source.FlinkSourceFunctionProvider;
Expand All @@ -41,12 +40,14 @@
public class DataSourceTranslator {

public DataStreamSource<Event> translate(
SourceDef sourceDef, StreamExecutionEnvironment env, Configuration pipelineConfig) {
SourceDef sourceDef,
StreamExecutionEnvironment env,
Configuration pipelineConfig,
int sourceParallelism) {
// Create data source
DataSource dataSource = createDataSource(sourceDef, env, pipelineConfig);

// Get source provider
final int sourceParallelism = pipelineConfig.get(PipelineOptions.PIPELINE_PARALLELISM);
EventSourceProvider eventSourceProvider = dataSource.getEventSourceProvider();
if (eventSourceProvider instanceof FlinkSourceProvider) {
// Source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public class SchemaRegistryRequestHandler implements Closeable {
private final Set<Integer> flushedSinkWriters;

/** Status of the execution of current schema change request. */
private boolean isSchemaChangeApplying;
private volatile boolean isSchemaChangeApplying;
/** Executor service to execute schema change. */
private final ExecutorService schemaChangeThreadPool;

Expand Down