Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,9 @@ private void cluster() throws Exception {
.addSink(new ClusteringCommitSink(conf))
.name("clustering_commit")
.uid("uid_clustering_commit")
.setParallelism(1);
.setParallelism(1)
.getTransformation()
.setMaxParallelism(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, just need to fix the compatibility issues: https://github.com/apache/hudi/actions/runs/6864369897/job/18665977391?pr=10090

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that only Flink 1.16+ has this setParallelism API ?

Copy link
Contributor Author

@hehuiyuan hehuiyuan Nov 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that only Flink 1.16+ has this setParallelism API ?

Maybe 1.18+, add setMaxParallelism method on 2023-04.
https://issues.apache.org/jira/browse/FLINK-31873


env.execute("flink_hudi_clustering_" + clusteringInstant.getTimestamp());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,9 @@ private void compact() throws Exception {
.addSink(new CompactionCommitSink(conf))
.name("compaction_commit")
.uid("uid_compaction_commit")
.setParallelism(1);
.setParallelism(1)
.getTransformation()
.setMaxParallelism(1);

env.execute("flink_hudi_compaction_" + String.join(",", compactionInstantTimes));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,10 +410,11 @@ public static DataStream<Object> hoodieStreamWrite(Configuration conf, DataStrea
* @return the compaction pipeline
*/
public static DataStreamSink<CompactionCommitEvent> compact(Configuration conf, DataStream<Object> dataStream) {
return dataStream.transform("compact_plan_generate",
DataStreamSink<CompactionCommitEvent> compactionCommitEventDataStream = dataStream.transform("compact_plan_generate",
TypeInformation.of(CompactionPlanEvent.class),
new CompactionPlanOperator(conf))
.setParallelism(1) // plan generate must be singleton
.setMaxParallelism(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this line compatible with flink release before 1.18?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compatible.

Only the DataStreamSink Class doesn't have setMaxParallelism method before 1.18.
https://issues.apache.org/jira/browse/FLINK-31873

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SingleOutputStreamOperator Class was returned when call Datastream.transform method.
SingleOutputStreamOperator class contains the setMaxParallelism method.

For example : flink 1.14
https://github.com/apache/flink/blob/08dbb6d5eff46e72af412643f80ac353636821de/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L155

// make the distribution strategy deterministic to avoid concurrent modifications
// on the same bucket files
.keyBy(plan -> plan.getOperation().getFileGroupId().getFileId())
Expand All @@ -424,6 +425,8 @@ public static DataStreamSink<CompactionCommitEvent> compact(Configuration conf,
.addSink(new CompactionCommitSink(conf))
.name("compact_commit")
.setParallelism(1); // compaction commit should be singleton
compactionCommitEventDataStream.getTransformation().setMaxParallelism(1);
return compactionCommitEventDataStream;
}

/**
Expand Down Expand Up @@ -452,6 +455,7 @@ public static DataStreamSink<ClusteringCommitEvent> cluster(Configuration conf,
TypeInformation.of(ClusteringPlanEvent.class),
new ClusteringPlanOperator(conf))
.setParallelism(1) // plan generate must be singleton
.setMaxParallelism(1) // plan generate must be singleton
.keyBy(plan ->
// make the distribution strategy deterministic to avoid concurrent modifications
// on the same bucket files
Expand All @@ -465,15 +469,19 @@ public static DataStreamSink<ClusteringCommitEvent> cluster(Configuration conf,
ExecNodeUtil.setManagedMemoryWeight(clusteringStream.getTransformation(),
conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
}
return clusteringStream.addSink(new ClusteringCommitSink(conf))
DataStreamSink<ClusteringCommitEvent> clusteringCommitEventDataStream = clusteringStream.addSink(new ClusteringCommitSink(conf))
.name("clustering_commit")
.setParallelism(1); // clustering commit should be singleton
clusteringCommitEventDataStream.getTransformation().setMaxParallelism(1);
return clusteringCommitEventDataStream;
}

public static DataStreamSink<Object> clean(Configuration conf, DataStream<Object> dataStream) {
return dataStream.addSink(new CleanFunction<>(conf))
DataStreamSink<Object> cleanCommitDataStream = dataStream.addSink(new CleanFunction<>(conf))
.setParallelism(1)
.name("clean_commits");
cleanCommitDataStream.getTransformation().setMaxParallelism(1);
return cleanCommitDataStream;
}

public static DataStreamSink<Object> dummySink(DataStream<Object> dataStream) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv)
SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor"))
.uid(Pipelines.opUID("split_monitor", conf))
.setParallelism(1)
.setMaxParallelism(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the setMaxParallelism takes effect with per-operator scope or global scope?

Copy link
Contributor Author

@hehuiyuan hehuiyuan Nov 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this line compatible with flink release before 1.18?

.keyBy(MergeOnReadInputSplit::getFileId)
.transform("split_reader", typeInfo, factory)
.uid(Pipelines.opUID("split_reader", conf))
Expand Down