-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: Support inferring parallelism for batch read. #1936
Conversation
@openinx could you help review this pr when you have time , thanks |
fa6e236
to
d1b1b50
Compare
5efeef4
to
adb96ec
Compare
adb96ec
to
9cfe082
Compare
9cfe082
to
af2df6f
Compare
af2df6f
to
6820d4e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the the idea about using infered parallelism, just left few comments.
flink/src/main/java/org/apache/iceberg/flink/FlinkTableOptions.java
Outdated
Show resolved
Hide resolved
@@ -195,7 +205,29 @@ public FlinkInputFormat buildFormat() { | |||
Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null"); | |||
FlinkInputFormat format = buildFormat(); | |||
if (isBounded(context)) { | |||
return env.createInput(format, rowTypeInfo); | |||
int parallelism = flinkConf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about moving those lines into a separate method ?
FlinkInputSplit[] splits = format.createInputSplits(0); | ||
splitNum = splits.length; | ||
} catch (IOException e) { | ||
throw new RuntimeException("get input split error.", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: use UncheckedIOException
here.
throw new UncheckedIOException("Failed to create iceberg input splits for table: " + table, e);
if (max < 1) { | ||
throw new IllegalConfigurationException( | ||
FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX.key() + " cannot be less than 1"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Preconditions.checkState ?
parallelism = Math.min(splitNum, max); | ||
} | ||
|
||
parallelism = limit > 0 ? Math.min(parallelism, (int) limit) : parallelism; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be overflow when casting the long limit
to integer ? I'd like to use (int) Math.min(parallelism, limit)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the parallelism
is int type and the limit
is long type, Math.min(parallelism, limit)
will throws an exception,I add a judgment to prevent overflow.
int limitInt = limit > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) limit;
parallelism = limitInt > 0 ? Math.min(parallelism, limitInt) : parallelism;
testParallelismSettingTranslateAndAssert(1, tableLimit, tenv); | ||
} | ||
|
||
private void testParallelismSettingTranslateAndAssert(int expected, Table table, TableEnvironment tEnv) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there another way to assert the parallelism as expected value ? Here we're using flink's planner to get the ExecNode
, I'm concerning that we're using flink's Internal
codes which would be a big trouble when upgrading the flink version. Pls see this PR #1956
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flink 1.12 does refactor ExecNode
, I found an easier way to assert parallelism, I will update it later
@@ -103,4 +124,45 @@ public void testLimitPushDown() { | |||
Assert.assertEquals("should have 1 record", 1, mixedResult.size()); | |||
Assert.assertArrayEquals("Should produce the expected records", mixedResult.get(0), new Object[] {1, "a"}); | |||
} | |||
|
|||
@Test | |||
public void testParallelismOptimize() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: testParallelismOptimize
-> testInferedParallelism
TableEnvironment tenv = getTableEnv(); | ||
|
||
// empty table ,parallelism at least 1 | ||
Table tableEmpty = tenv.sqlQuery(String.format("SELECT * FROM %s", TABLE_NAME)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: how about introducing a small method:
private Table sqlQuery(String sql, Object... args) {
return getTableEnv().sqlQuery(String.format(sql, args));
}
21ba46c
to
5d3ca11
Compare
@openinx, is this something we should try to get into the 0.11.0 release? |
4769b00
to
b646fb3
Compare
According to @rdblue suggestion, I refactored this class, using only |
52a3468
to
a8ef6a3
Compare
} | ||
|
||
@Override | ||
public DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) { | ||
return FlinkSource.forRowData().env(execEnv).tableLoader(loader).project(getProjectedSchema()).limit(limit) | ||
.filters(filters).properties(properties).build(); | ||
.filters(filters).flinkConf(readableConfig).properties(properties).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I'd like to change this builder chain like the following ( That's more easy to read the change):
@Override
public DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) {
return FlinkSource.forRowData()
.env(execEnv)
.tableLoader(loader)
.project(getProjectedSchema())
.limit(limit)
.filters(filters)
.flinkConf(readableConfig)
.properties(properties)
.build();
}
} | ||
|
||
public static final ConfigOption<Boolean> TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM = | ||
ConfigOptions.key("table.exec.iceberg.infer-source-parallelism").booleanType().defaultValue(true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: it's more clear to make each option definition into a separate line:
ConfigOptions.key("table.exec.iceberg.infer-source-parallelism")
.booleanType()
.defaultValue(true)
.withDescription("If is false, parallelism of source are set by config.\n" +
"If is true, source parallelism is inferred according to splits number.\n");
"If is true, source parallelism is inferred according to splits number.\n"); | ||
|
||
public static final ConfigOption<Integer> TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX = | ||
ConfigOptions.key("table.exec.iceberg.infer-source-parallelism.max").intType().defaultValue(100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
@@ -197,7 +206,7 @@ public FlinkInputFormat buildFormat() { | |||
TypeInformation<RowData> typeInfo = RowDataTypeInfo.of(FlinkSchemaUtil.convert(context.project())); | |||
|
|||
if (!context.isStreaming()) { | |||
return env.createInput(format, typeInfo); | |||
return createInputDataStream(format, context, typeInfo); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this comment, I think I did not describe the things clearly. I mean we could move the inferParallelism
into a separate method, don't have to contains the DataStream constructing or chaining methods.
private int inferParallelism(FlinkInputFormat format, ScanContext context) {
// ....
}
int limitInt = context.limit() > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) context.limit(); | ||
parallelism = limitInt > 0 ? Math.min(parallelism, limitInt) : parallelism; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I'd like to make this code more readable:
if (context.limit() > 0) {
int limit = context.limit() >= Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) context.limit();
parallelism = Math.min(parallelism, limit);
}
// parallelism must be positive.
parallelism = Math.max(1, parallelism);
@@ -70,6 +73,7 @@ public static Builder forRowData() { | |||
private Table table; | |||
private TableLoader tableLoader; | |||
private TableSchema projectedSchema; | |||
private ReadableConfig flinkConf; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For those users that write flink batch jobs in Java API , they will always pass a flink's Configuration
, right ? So how about defining this as org.apache.flink.configuration.Configuraiton
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we construct the IcebergTableSource
, we use the TableSourceFactory.Context#getConfiguration
method to get the configuration. This method returns a ReadableConfig, so we use ReadableConfig instead of Configuration. In addition, Configuration is the implementation class of the ReadableConfig interface, so I think ReadableConfig should not has a problem
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got your point, I'd prefer to use flink's Configuration
because it will be exposed to flink developers as an API in FlinkSource
, using the unified Configuration
will be more straightforward for them. But as you said, the TableSourceFactory#Context is exposing the ReadableConfig
, I also did not find a correct way to convert ReadableConfig
to Configuration
. OK, I think we could use ReadableConfig
here, thanks.
TypeInformation<RowData> typeInfo) { | ||
int parallelism = flinkConf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM); | ||
if (flinkConf.get(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM)) { | ||
int max = flinkConf.get(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: use maxInterParallelism pls.
int max = flinkConf.get(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX); | ||
Preconditions.checkState(max >= 1, | ||
FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX.key() + " cannot be less than 1"); | ||
int splitNum = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: this assignment is redundant ( from intellij).
*/ | ||
@Test | ||
public void testInferedParallelism() throws TableNotExistException { | ||
Assume.assumeTrue("The execute mode should be streaming mode", isStreamingJob); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't the inferParallelism only affect the batch job (See FlinkSource#Builder#build)? So there's no reason that providing unit test in streaming mode ?
In my mind, Providing unit tests to check whether the inferParallelism()
is returning the expected parallelism value is enough for this changes. Seems like The ITCase is validating the behavior of DataStreamSource#setParallelism , we could think it's always correct because it's a basic API in flink.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this way, we don't have to change so many codes in this class. Maybe we could just add unit tests in TestFlinkScan.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found that in this test method,I use the flink streaming mode,but it still enter the batch mode (here), I check the code,found that FlinkSource.Builder#build
mthod judge streaming mode or batch mode by the conf of ScanContext instead of flink conf. will this confuse users?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, that's a great point. I think it will confuse users, the correct way is : Set the ScanContext's properties firstly (use the following fromFlinkConf
) if someone provides a flink configuration, that is similar to the ScanContext#fromProperties:
diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index 2896efb3..c56e3311 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -292,10 +292,7 @@ class ScanContext implements Serializable {
return this;
}
- Builder fromProperties(Map<String, String> properties) {
- Configuration config = new Configuration();
- properties.forEach(config::setString);
-
+ Builder fromFlinkConf(Configuration config) {
return this.useSnapshotId(config.get(SNAPSHOT_ID))
.caseSensitive(config.get(CASE_SENSITIVE))
.asOfTimestamp(config.get(AS_OF_TIMESTAMP))
@@ -305,7 +302,14 @@ class ScanContext implements Serializable {
.splitLookback(config.get(SPLIT_LOOKBACK))
.splitOpenFileCost(config.get(SPLIT_FILE_OPEN_COST))
.streaming(config.get(STREAMING))
- .monitorInterval(config.get(MONITOR_INTERVAL))
+ .monitorInterval(config.get(MONITOR_INTERVAL));
+ }
+
+ Builder fromProperties(Map<String, String> properties) {
+ Configuration config = new Configuration();
+ properties.forEach(config::setString);
+
+ return fromFlinkConf(config)
.nameMapping(properties.get(DEFAULT_NAME_MAPPING));
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If someone provides both flink Configuration
and iceberg's properties
, then we should use the flink's Configuration
values overwrite the iceberg's properties because properties
is a table-level settings while the flink's Configuration
is a job-level settings. It is reasonable for fine-grained configuration to ovewrite coarse-grained configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my mind, Providing unit tests to check whether the
inferParallelism()
is returning the expected parallelism value is enough for this changes. Seems like The ITCase is validating the behavior of DataStreamSource#setParallelism , we could think it's always correct because it's a basic API in flink.
I think it’s better not to use the inferParallelism
method to get the parallelism to do assertion, because the inferParallelism
method is private and is an internal method of iceberg. Just as you commented that it is best not to use the internal code of flink, I think we should try to use public APIs to get information.
The current TestFlinkTableSource
class uses batch mode for unit test. In order not to modify too much code, we can move the testInferedParallelism
method to other test classes, such as TestFlinkScan.java
.
So I think we can use DataStream.getTransformation().getParallelism();
to get the parallelism of the flink operator. This method is public api of flink. Even if flink is upgraded in the future, it should not be modified. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The isStreaming
indicate whether the flink source is a streaming source (In our mind) , not say it's a streaming job or batch job. The hive table source also has the similar configure key :
public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE =
key("streaming-source.enable")
.booleanType()
.defaultValue(false)
.withDescription(
"Enable streaming source or not.\n"
+ " NOTES: Please make sure that each partition/file should be written"
+ " atomically, otherwise the reader may get incomplete data.");
If we think this iceberg configure key is not very clear, I think we could propose another separate PR to align with hive configure key. Let's focus on this parallelism issue here, what do you think ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes,it should be a streaming source, like kafka. If necessary, we can open a separate PR to discuss this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just as you commented that it is best not to use the internal code of flink, I think we should try to use public APIs to get information.
The comment that saying we'd better not use flink's Internal API because that would introduce extra upgrade complexity (new flink version may breaks those internal API so we iceberg have to adjust the codes, finally maintaining different versions of flink will bring us a lot of burden).
Writing iceberg unit tests based on our iceberg's non-public ( we usually use package-access ) method is OK because there's no extra burden .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I update the pr,move the testInferedParallelism
method to TestFlinkScanSql,use FlinkSource.Builder#inferParallelism
method to do the assertion
cba8621
to
47ae11a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those production files looks great to me overall, just left few comments about unit tests & the default value of readableConfig in FlinkSource#Builder. I also expect this PR could be merged before iceberg release 0.11.0. Thanks @zhangjun0x01 for the great work !
int inferParallelism(FlinkInputFormat format, ScanContext context) { | ||
int parallelism = readableConfig.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM); | ||
if (readableConfig.get(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM)) { | ||
int maxInterParallelism = readableConfig.get(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: maxInterParallelism
-> maxInferParallelism
, seems like it's a typo ?
@@ -51,25 +52,29 @@ | |||
private final TableLoader loader; | |||
private final TableSchema schema; | |||
private final Map<String, String> properties; | |||
private final ReadableConfig readableConfig; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Let's move this line to line60, so that the assignment order of IcebergTableSource
constructor could align with these definitions.
// Because we add infer parallelism, all data files will be scanned first. | ||
// Flink will call FlinkInputFormat#createInputSplits method to scan the data files, | ||
// plus the operation to get the execution plan, so there are three scan event. | ||
Assert.assertEquals("Should create 3 scans", 3, scanEventCount); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can disable the table.exec.iceberg.infer-source-parallelism
for all the batch tests by default, then we don't have to change all cases from this file. Actually, we have wrote many unit tests which depends on the parallelism
, for example this PR #2064. Using the inter-parallelism for batch unit tests will introduce extra complexity and instability, so I recommend to disable the infer parallelism in our batch unit tests by default:
diff --git a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
index 5b8e58cf..ab3d56ea 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
@@ -62,10 +62,17 @@ public abstract class FlinkTestBase extends AbstractTestBase {
if (tEnv == null) {
synchronized (this) {
if (tEnv == null) {
- this.tEnv = TableEnvironment.create(EnvironmentSettings
+ EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
- .inBatchMode().build());
+ .inBatchMode()
+ .build();
+
+ TableEnvironment env = TableEnvironment.create(settings);
+ env.getConfig().getConfiguration()
+ .set(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, false);
+
+ tEnv = env;
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes,I update it
FlinkInputFormat flinkInputFormat = FlinkSource.forRowData().tableLoader(tableLoader).table(table).buildFormat(); | ||
ScanContext scanContext = ScanContext.builder().build(); | ||
|
||
// Empty table ,parallelism at least 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: inter parallelism should be at least 1.
@@ -70,6 +73,7 @@ public static Builder forRowData() { | |||
private Table table; | |||
private TableLoader tableLoader; | |||
private TableSchema projectedSchema; | |||
private ReadableConfig readableConfig; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we provide a new Configuration()
for this variable ? Otherwise, it will just throw NPE if people forget to provide a flinkConf
in FlinkSource#Builder because we don't check the nullable in interParallelism
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes,I add the new Configuration()
for default.
|
||
DataFile dataFile1 = helper.writeFile(TestHelpers.Row.of("2020-03-20", 0), writeRecords); | ||
DataFile dataFile2 = helper.writeFile(TestHelpers.Row.of("2020-03-21", 0), | ||
RandomGenericData.generate(SCHEMA, 2, 0L)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those random generated records will be located in partition 2020-03-21
? I guess it's not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At first I copy the code from TestFlinkScanSql#testResiduals
method to gererate 2 datafiles.
I think there should be no problem about the partition. writeRecords
will write to the partition 2020-03-20
, and randomly generate two records into the partition 2020-03-21
.
But for simplicity, I modified the code to randomly generate two records for each partition.
.format("ALTER TABLE t SET ('read.split.open-file-cost'='1', 'read.split.target-size'='%s')", maxFileLen)); | ||
|
||
// 2 splits ,the parallelism is 2 | ||
parallelism = FlinkSource.forRowData() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there're other test cases that we don't cover, it's good to cover those tests.
- table.exec.iceberg.infer-source-parallelism=false;
- table.exec.iceberg.infer-source-parallelism.max <= numberOfSplits;
- table.exec.iceberg.infer-source-parallelism.max > numberOfSplits;
- table.exec.iceberg.infer-source-parallelism.max > limit;
- table.exec.iceberg.infer-source-parallelism.max <= limit;
Divide those cases into small method if necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add the test case , but I did not split these test cases into different methods because they share a lot of code. If they are split, there may be a lot of duplicate code.
47ae11a
to
5f656f8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Got this merged, thanks @zhangjun0x01 for contributing ! |
When using flink to query the iceberg table, the parallelism is the default parallelism of flink, but the number of datafiles on iceberg table is different. The user do not know how much parallelism should be used, and setting a too large parallelism will cause resource waste, setting the parallelism too small will cause the query to be slow, so we can add parallelism infer.
The function is enabled by default. the parallelism is equal to the number of read splits. Of course, the user can manually turn off the infer function. In order to prevent too many datafiles from causing excessive parallelism, we also set a max infer parallelism. When the infer parallelism exceeds the setting, use the max parallelism.
In addition, we also need to compare with the limit in the
select
query statement to get a more appropriate parallelism in the case of limit pushdown, for example we have a sqlselect * from table limit 1
, and finally we infer the parallelism is 10, but we only one parallel is needed , besause we only need one data .