Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Support inferring parallelism for batch read. #1936

Merged
merged 9 commits into from
Jan 22, 2021

Conversation

zhangjun0x01
Copy link
Contributor

@zhangjun0x01 zhangjun0x01 commented Dec 15, 2020

When using flink to query the iceberg table, the parallelism is the default parallelism of flink, but the number of datafiles on iceberg table is different. The user do not know how much parallelism should be used, and setting a too large parallelism will cause resource waste, setting the parallelism too small will cause the query to be slow, so we can add parallelism infer.

The function is enabled by default. the parallelism is equal to the number of read splits. Of course, the user can manually turn off the infer function. In order to prevent too many datafiles from causing excessive parallelism, we also set a max infer parallelism. When the infer parallelism exceeds the setting, use the max parallelism.

In addition, we also need to compare with the limit in the select query statement to get a more appropriate parallelism in the case of limit pushdown, for example we have a sql select * from table limit 1, and finally we infer the parallelism is 10, but we only one parallel is needed , besause we only need one data .

@github-actions github-actions bot added the flink label Dec 15, 2020
@zhangjun0x01
Copy link
Contributor Author

@openinx could you help review this pr when you have time , thanks

@zhangjun0x01 zhangjun0x01 changed the title Flink : add parallelism optimize for IcebergTableSource [WIP]Flink : add parallelism optimize for IcebergTableSource Dec 18, 2020
@zhangjun0x01 zhangjun0x01 force-pushed the parallelism_optimize branch 2 times, most recently from 5efeef4 to adb96ec Compare December 18, 2020 03:10
@zhangjun0x01 zhangjun0x01 changed the title [WIP]Flink : add parallelism optimize for IcebergTableSource Flink : add parallelism optimize for IcebergTableSource Dec 18, 2020
@rdblue rdblue changed the title Flink : add parallelism optimize for IcebergTableSource Flink: Default read parallelism to number of splits Dec 18, 2020
@rdblue rdblue requested a review from openinx December 18, 2020 23:14
Copy link
Member

@openinx openinx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the the idea about using infered parallelism, just left few comments.

@@ -195,7 +205,29 @@ public FlinkInputFormat buildFormat() {
Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null");
FlinkInputFormat format = buildFormat();
if (isBounded(context)) {
return env.createInput(format, rowTypeInfo);
int parallelism = flinkConf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about moving those lines into a separate method ?

FlinkInputSplit[] splits = format.createInputSplits(0);
splitNum = splits.length;
} catch (IOException e) {
throw new RuntimeException("get input split error.", e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: use UncheckedIOException here.

            throw new UncheckedIOException("Failed to create iceberg input splits for table: " + table, e);

Comment on lines 211 to 214
if (max < 1) {
throw new IllegalConfigurationException(
FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX.key() + " cannot be less than 1");
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Preconditions.checkState ?

parallelism = Math.min(splitNum, max);
}

parallelism = limit > 0 ? Math.min(parallelism, (int) limit) : parallelism;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be overflow when casting the long limit to integer ? I'd like to use (int) Math.min(parallelism, limit).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the parallelism is int type and the limit is long type, Math.min(parallelism, limit) will throws an exception,I add a judgment to prevent overflow.

      int limitInt = limit > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) limit;
      parallelism = limitInt > 0 ? Math.min(parallelism, limitInt) : parallelism;

testParallelismSettingTranslateAndAssert(1, tableLimit, tenv);
}

private void testParallelismSettingTranslateAndAssert(int expected, Table table, TableEnvironment tEnv) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there another way to assert the parallelism as expected value ? Here we're using flink's planner to get the ExecNode , I'm concerning that we're using flink's Internal codes which would be a big trouble when upgrading the flink version. Pls see this PR #1956

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flink 1.12 does refactor ExecNode, I found an easier way to assert parallelism, I will update it later

@@ -103,4 +124,45 @@ public void testLimitPushDown() {
Assert.assertEquals("should have 1 record", 1, mixedResult.size());
Assert.assertArrayEquals("Should produce the expected records", mixedResult.get(0), new Object[] {1, "a"});
}

@Test
public void testParallelismOptimize() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: testParallelismOptimize -> testInferedParallelism

TableEnvironment tenv = getTableEnv();

// empty table ,parallelism at least 1
Table tableEmpty = tenv.sqlQuery(String.format("SELECT * FROM %s", TABLE_NAME));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: how about introducing a small method:

 private Table sqlQuery(String sql, Object... args) {
    return getTableEnv().sqlQuery(String.format(sql, args));
  }

@zhangjun0x01 zhangjun0x01 force-pushed the parallelism_optimize branch 2 times, most recently from 21ba46c to 5d3ca11 Compare January 14, 2021 08:25
@rdblue
Copy link
Contributor

rdblue commented Jan 16, 2021

@openinx, is this something we should try to get into the 0.11.0 release?

@zhangjun0x01 zhangjun0x01 force-pushed the parallelism_optimize branch 2 times, most recently from 4769b00 to b646fb3 Compare January 16, 2021 14:12
@zhangjun0x01 zhangjun0x01 marked this pull request as draft January 18, 2021 02:30
@zhangjun0x01 zhangjun0x01 marked this pull request as ready for review January 18, 2021 03:11
@zhangjun0x01
Copy link
Contributor Author

According to @rdblue suggestion, I refactored this class, using only HadoopCatalog and Avro format

}

@Override
public DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) {
return FlinkSource.forRowData().env(execEnv).tableLoader(loader).project(getProjectedSchema()).limit(limit)
.filters(filters).properties(properties).build();
.filters(filters).flinkConf(readableConfig).properties(properties).build();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I'd like to change this builder chain like the following ( That's more easy to read the change):

  @Override
  public DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) {
    return FlinkSource.forRowData()
        .env(execEnv)
        .tableLoader(loader)
        .project(getProjectedSchema())
        .limit(limit)
        .filters(filters)
        .flinkConf(readableConfig)
        .properties(properties)
        .build();
  }

}

public static final ConfigOption<Boolean> TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM =
ConfigOptions.key("table.exec.iceberg.infer-source-parallelism").booleanType().defaultValue(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: it's more clear to make each option definition into a separate line:

      ConfigOptions.key("table.exec.iceberg.infer-source-parallelism")
          .booleanType()
          .defaultValue(true)
          .withDescription("If is false, parallelism of source are set by config.\n" +
              "If is true, source parallelism is inferred according to splits number.\n");

"If is true, source parallelism is inferred according to splits number.\n");

public static final ConfigOption<Integer> TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX =
ConfigOptions.key("table.exec.iceberg.infer-source-parallelism.max").intType().defaultValue(100)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@@ -197,7 +206,7 @@ public FlinkInputFormat buildFormat() {
TypeInformation<RowData> typeInfo = RowDataTypeInfo.of(FlinkSchemaUtil.convert(context.project()));

if (!context.isStreaming()) {
return env.createInput(format, typeInfo);
return createInputDataStream(format, context, typeInfo);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this comment, I think I did not describe the things clearly. I mean we could move the inferParallelism into a separate method, don't have to contains the DataStream constructing or chaining methods.

private int inferParallelism(FlinkInputFormat format, ScanContext context) {
   // ....
}

Comment on lines 239 to 240
int limitInt = context.limit() > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) context.limit();
parallelism = limitInt > 0 ? Math.min(parallelism, limitInt) : parallelism;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I'd like to make this code more readable:

      if (context.limit() > 0) {
        int limit = context.limit() >= Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) context.limit();
        parallelism = Math.min(parallelism, limit);
      }

     // parallelism must be positive.
      parallelism = Math.max(1, parallelism); 

@@ -70,6 +73,7 @@ public static Builder forRowData() {
private Table table;
private TableLoader tableLoader;
private TableSchema projectedSchema;
private ReadableConfig flinkConf;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For those users that write flink batch jobs in Java API , they will always pass a flink's Configuration, right ? So how about defining this as org.apache.flink.configuration.Configuraiton ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we construct the IcebergTableSource, we use the TableSourceFactory.Context#getConfiguration method to get the configuration. This method returns a ReadableConfig, so we use ReadableConfig instead of Configuration. In addition, Configuration is the implementation class of the ReadableConfig interface, so I think ReadableConfig should not has a problem

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got your point, I'd prefer to use flink's Configuration because it will be exposed to flink developers as an API in FlinkSource, using the unified Configuration will be more straightforward for them. But as you said, the TableSourceFactory#Context is exposing the ReadableConfig, I also did not find a correct way to convert ReadableConfig to Configuration. OK, I think we could use ReadableConfig here, thanks.

TypeInformation<RowData> typeInfo) {
int parallelism = flinkConf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
if (flinkConf.get(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM)) {
int max = flinkConf.get(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: use maxInterParallelism pls.

int max = flinkConf.get(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX);
Preconditions.checkState(max >= 1,
FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX.key() + " cannot be less than 1");
int splitNum = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this assignment is redundant ( from intellij).

*/
@Test
public void testInferedParallelism() throws TableNotExistException {
Assume.assumeTrue("The execute mode should be streaming mode", isStreamingJob);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the inferParallelism only affect the batch job (See FlinkSource#Builder#build)? So there's no reason that providing unit test in streaming mode ?

In my mind, Providing unit tests to check whether the inferParallelism() is returning the expected parallelism value is enough for this changes. Seems like The ITCase is validating the behavior of DataStreamSource#setParallelism , we could think it's always correct because it's a basic API in flink.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this way, we don't have to change so many codes in this class. Maybe we could just add unit tests in TestFlinkScan.java

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found that in this test method,I use the flink streaming mode,but it still enter the batch mode (here), I check the code,found that FlinkSource.Builder#build mthod judge streaming mode or batch mode by the conf of ScanContext instead of flink conf. will this confuse users?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, that's a great point. I think it will confuse users, the correct way is : Set the ScanContext's properties firstly (use the following fromFlinkConf) if someone provides a flink configuration, that is similar to the ScanContext#fromProperties:

diff --git a/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
index 2896efb3..c56e3311 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
@@ -292,10 +292,7 @@ class ScanContext implements Serializable {
       return this;
     }
 
-    Builder fromProperties(Map<String, String> properties) {
-      Configuration config = new Configuration();
-      properties.forEach(config::setString);
-
+    Builder fromFlinkConf(Configuration config) {
       return this.useSnapshotId(config.get(SNAPSHOT_ID))
           .caseSensitive(config.get(CASE_SENSITIVE))
           .asOfTimestamp(config.get(AS_OF_TIMESTAMP))
@@ -305,7 +302,14 @@ class ScanContext implements Serializable {
           .splitLookback(config.get(SPLIT_LOOKBACK))
           .splitOpenFileCost(config.get(SPLIT_FILE_OPEN_COST))
           .streaming(config.get(STREAMING))
-          .monitorInterval(config.get(MONITOR_INTERVAL))
+          .monitorInterval(config.get(MONITOR_INTERVAL));
+    }
+
+    Builder fromProperties(Map<String, String> properties) {
+      Configuration config = new Configuration();
+      properties.forEach(config::setString);
+
+      return fromFlinkConf(config)
           .nameMapping(properties.get(DEFAULT_NAME_MAPPING));
     }
 

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If someone provides both flink Configuration and iceberg's properties, then we should use the flink's Configuration values overwrite the iceberg's properties because properties is a table-level settings while the flink's Configuration is a job-level settings. It is reasonable for fine-grained configuration to ovewrite coarse-grained configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my mind, Providing unit tests to check whether the inferParallelism() is returning the expected parallelism value is enough for this changes. Seems like The ITCase is validating the behavior of DataStreamSource#setParallelism , we could think it's always correct because it's a basic API in flink.

I think it’s better not to use the inferParallelism method to get the parallelism to do assertion, because the inferParallelism method is private and is an internal method of iceberg. Just as you commented that it is best not to use the internal code of flink, I think we should try to use public APIs to get information.

The current TestFlinkTableSource class uses batch mode for unit test. In order not to modify too much code, we can move the testInferedParallelism method to other test classes, such as TestFlinkScan.java.

So I think we can use DataStream.getTransformation().getParallelism(); to get the parallelism of the flink operator. This method is public api of flink. Even if flink is upgraded in the future, it should not be modified. What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The isStreaming indicate whether the flink source is a streaming source (In our mind) , not say it's a streaming job or batch job. The hive table source also has the similar configure key :

    public static final ConfigOption<Boolean> STREAMING_SOURCE_ENABLE =
            key("streaming-source.enable")
                    .booleanType()
                    .defaultValue(false)
                    .withDescription(
                            "Enable streaming source or not.\n"
                                    + " NOTES: Please make sure that each partition/file should be written"
                                    + " atomically, otherwise the reader may get incomplete data.");

If we think this iceberg configure key is not very clear, I think we could propose another separate PR to align with hive configure key. Let's focus on this parallelism issue here, what do you think ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes,it should be a streaming source, like kafka. If necessary, we can open a separate PR to discuss this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just as you commented that it is best not to use the internal code of flink, I think we should try to use public APIs to get information.

The comment that saying we'd better not use flink's Internal API because that would introduce extra upgrade complexity (new flink version may breaks those internal API so we iceberg have to adjust the codes, finally maintaining different versions of flink will bring us a lot of burden).

Writing iceberg unit tests based on our iceberg's non-public ( we usually use package-access ) method is OK because there's no extra burden .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I update the pr,move the testInferedParallelism method to TestFlinkScanSql,use FlinkSource.Builder#inferParallelism method to do the assertion

@zhangjun0x01 zhangjun0x01 force-pushed the parallelism_optimize branch 2 times, most recently from cba8621 to 47ae11a Compare January 21, 2021 07:19
@openinx openinx added this to the Java 0.11.0 Release milestone Jan 22, 2021
Copy link
Member

@openinx openinx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those production files looks great to me overall, just left few comments about unit tests & the default value of readableConfig in FlinkSource#Builder. I also expect this PR could be merged before iceberg release 0.11.0. Thanks @zhangjun0x01 for the great work !

int inferParallelism(FlinkInputFormat format, ScanContext context) {
int parallelism = readableConfig.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM);
if (readableConfig.get(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM)) {
int maxInterParallelism = readableConfig.get(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: maxInterParallelism -> maxInferParallelism, seems like it's a typo ?

@@ -51,25 +52,29 @@
private final TableLoader loader;
private final TableSchema schema;
private final Map<String, String> properties;
private final ReadableConfig readableConfig;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Let's move this line to line60, so that the assignment order of IcebergTableSource constructor could align with these definitions.

// Because we add infer parallelism, all data files will be scanned first.
// Flink will call FlinkInputFormat#createInputSplits method to scan the data files,
// plus the operation to get the execution plan, so there are three scan event.
Assert.assertEquals("Should create 3 scans", 3, scanEventCount);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can disable the table.exec.iceberg.infer-source-parallelism for all the batch tests by default, then we don't have to change all cases from this file. Actually, we have wrote many unit tests which depends on the parallelism, for example this PR #2064. Using the inter-parallelism for batch unit tests will introduce extra complexity and instability, so I recommend to disable the infer parallelism in our batch unit tests by default:

diff --git a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
index 5b8e58cf..ab3d56ea 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
@@ -62,10 +62,17 @@ public abstract class FlinkTestBase extends AbstractTestBase {
     if (tEnv == null) {
       synchronized (this) {
         if (tEnv == null) {
-          this.tEnv = TableEnvironment.create(EnvironmentSettings
+          EnvironmentSettings settings = EnvironmentSettings
               .newInstance()
               .useBlinkPlanner()
-              .inBatchMode().build());
+              .inBatchMode()
+              .build();
+
+          TableEnvironment env = TableEnvironment.create(settings);
+          env.getConfig().getConfiguration()
+              .set(FlinkTableOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, false);
+
+          tEnv = env;
         }
       }
     }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes,I update it

FlinkInputFormat flinkInputFormat = FlinkSource.forRowData().tableLoader(tableLoader).table(table).buildFormat();
ScanContext scanContext = ScanContext.builder().build();

// Empty table ,parallelism at least 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: inter parallelism should be at least 1.

@@ -70,6 +73,7 @@ public static Builder forRowData() {
private Table table;
private TableLoader tableLoader;
private TableSchema projectedSchema;
private ReadableConfig readableConfig;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we provide a new Configuration() for this variable ? Otherwise, it will just throw NPE if people forget to provide a flinkConf in FlinkSource#Builder because we don't check the nullable in interParallelism.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes,I add the new Configuration() for default.


DataFile dataFile1 = helper.writeFile(TestHelpers.Row.of("2020-03-20", 0), writeRecords);
DataFile dataFile2 = helper.writeFile(TestHelpers.Row.of("2020-03-21", 0),
RandomGenericData.generate(SCHEMA, 2, 0L));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those random generated records will be located in partition 2020-03-21 ? I guess it's not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first I copy the code from TestFlinkScanSql#testResiduals method to gererate 2 datafiles.

I think there should be no problem about the partition. writeRecords will write to the partition 2020-03-20, and randomly generate two records into the partition 2020-03-21.

But for simplicity, I modified the code to randomly generate two records for each partition.

.format("ALTER TABLE t SET ('read.split.open-file-cost'='1', 'read.split.target-size'='%s')", maxFileLen));

// 2 splits ,the parallelism is 2
parallelism = FlinkSource.forRowData()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there're other test cases that we don't cover, it's good to cover those tests.

  1. table.exec.iceberg.infer-source-parallelism=false;
  2. table.exec.iceberg.infer-source-parallelism.max <= numberOfSplits;
  3. table.exec.iceberg.infer-source-parallelism.max > numberOfSplits;
  4. table.exec.iceberg.infer-source-parallelism.max > limit;
  5. table.exec.iceberg.infer-source-parallelism.max <= limit;

Divide those cases into small method if necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add the test case , but I did not split these test cases into different methods because they share a lot of code. If they are split, there may be a lot of duplicate code.

Copy link
Member

@openinx openinx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@openinx openinx changed the title Flink: Default read parallelism to number of splits Flink: Support inferring parallelism for batch read. Jan 22, 2021
@openinx openinx merged commit 543a6cd into apache:master Jan 22, 2021
@openinx
Copy link
Member

openinx commented Jan 22, 2021

Got this merged, thanks @zhangjun0x01 for contributing !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants