Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Upgrade version from 1.11.0 to 1.12.1 #1956

Merged
merged 11 commits into from
Feb 2, 2021

Conversation

stevenzwu
Copy link
Contributor

No description provided.

@zhangjun0x01
Copy link
Contributor

zhangjun0x01 commented Dec 18, 2020

I think we should add the support for both flink 1.11 and flink 1.12 , similar to spark2 and spark3.
If we upgrade flink 1.11 to flink 1.12 directly, users who using flink 1.11 will not be able to use iceberg。

Similar to the support for hive in flink, flink also supports all hive versions , what do you think ?

@stevenzwu
Copy link
Contributor Author

Are you suggesting that we have a module for each Flink minor version release (like 1.11 and 1.12)? I am worried that It can quickly grow out of control, as Flink does a minor version release typically every 4 months. Flink Kafka connector used to be in this model (like kafka08, kafka09, kafka10, ...). Now it moved to the universal connector and just track the latest release version of kafka-clients.

Or are you suggesting that the Flink connector code can handle all the 1.x Flink versions? I am not sure how flink hive connector handles some of the small API changes among all the versions.

As for spark2 and spark3, that is a major version upgrade. For upgrading Spark from 2.3 to 2.4, I assume Iceberg just upgrade the version in spark2 module. If in the future, Flink introduced major breaking API change and go up to 2.x, we probably should have a flink2 module in Iceberg.

Since the Flink Iceberg connector lives in the Iceberg project, I was thinking that the latest connector can just pick a Flink minor version as the paved path. That is why I was asking if we should wait for Flink 1.12.1 patch release for some bug fixes.

@stevenzwu
Copy link
Contributor Author

@JingsongLi saw the CI failure in Flink tests with Trying to access closed classloader, which I couldn't reproduce locally with ./gradlew :iceberg-flink:build. It seems like the same problem as FLINK-19843 that you fixed. I tried to close the result iterator as you did. But it doesn't seem to fix it.

@@ -83,8 +83,8 @@ public void testGetTable() {
Types.NestedField.optional(1, "strV", Types.StringType.get())));
Assert.assertEquals(
Arrays.asList(
TableColumn.of("id", DataTypes.BIGINT()),
TableColumn.of("strV", DataTypes.STRING())),
TableColumn.physical("id", DataTypes.BIGINT()),
Copy link
Member

@openinx openinx Dec 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's strange here, because I saw the TableColumn is marked as PublicEvolving, but after released flink 1.12.0 it did not have any Interface compatibility guarantee. At least, it should marked as deprecated, and keep it a major release.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenzwu I think we could change to use the loaded iceberg schema to accomplish the schema validation, so that we don't have to use the flink's TableSchema.

https://github.com/openinx/incubator-iceberg/commit/2a635719955f3eecd92685cb8f5486c46aa60095#diff-4c56ab08c19464dbe3351f71fc39345ee031a282b3e8dc1b107cbe9a1964d105R81

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thx a lot for the suggestion. will update

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, this API break was a mistake and will be fixed in the next bugfix release
https://issues.apache.org/jira/browse/FLINK-21226

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's OK, we've planned to just upgrade the flink 1.12 because we needs the newly introduced API from 1.12.0 to develop our unified iceberg flink source/sink. Thanks @twalthr for the work.

@stevenzwu
Copy link
Contributor Author

I am going to close this PR, as it seems that we probably need more discussion on how to upgrade Flink from 1.11 to 1.12

@stevenzwu stevenzwu closed this Dec 21, 2020
@openinx
Copy link
Member

openinx commented Dec 22, 2020

@stevenzwu , I think it's good time to upgrade flink from 1.11 to 1.12 after we've released the iceberg 0.11.0. About which way to accomplish the upgrade, I'm considering the same way as your proposed because of the flink compatibility issues. That means if someone just want the flink sink feature then it's better to choose iceberg 0.11.0 or 0.10.0, while if want more advanced features such as flink CDC or UPSERT stream, or the unified batch/stream iceberg source sink (the work you're working), then it's better to upgrade to use flink 1.12.

@stevenzwu stevenzwu reopened this Dec 22, 2020
@stevenzwu
Copy link
Contributor Author

stevenzwu commented Dec 22, 2020

@openinx I totally agree that we shouldn't upgrade flink to 1.12.0 until Iceberg 0.11.0 is released.

Regarding this PR, this is still some problem with the test failures Trying to access closed classloader. I tried to follow the same approach from what @JingsongLi did with FLINK-19843. Still wasn't able to fix it yet.

Might be sth related to Avro class cache as pointed out by Flink doc.

StdErr java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'.
StdErr  at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:161)
StdErr  at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:168)
StdErr  at java.base/java.lang.Class.forName0(Native Method)
StdErr  at java.base/java.lang.Class.forName(Class.java:398)
StdErr  at org.apache.iceberg.common.DynClasses$Builder.impl(DynClasses.java:68)
StdErr  at org.apache.iceberg.avro.GenericAvroReader$ReadBuilder.record(GenericAvroReader.java:85)
StdErr  at org.apache.iceberg.avro.GenericAvroReader$ReadBuilder.record(GenericAvroReader.java:72)
StdErr  at org.apache.iceberg.avro.AvroSchemaVisitor.visit(AvroSchemaVisitor.java:50)
StdErr  at org.apache.iceberg.avro.GenericAvroReader.initReader(GenericAvroReader.java:47)
StdErr  at org.apache.iceberg.avro.GenericAvroReader.setSchema(GenericAvroReader.java:53)
StdErr  at org.apache.iceberg.avro.ProjectionDatumReader.newDatumReader(ProjectionDatumReader.java:80)
StdErr  at org.apache.iceberg.avro.ProjectionDatumReader.setSchema(ProjectionDatumReader.java:69)
StdErr  at org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:132)
StdErr  at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:106)
StdErr  at org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:98)
StdErr  at org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:66)
StdErr  at org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:100)
StdErr  at org.apache.iceberg.avro.AvroIterable.getMetadata(AvroIterable.java:66)
StdErr  at org.apache.iceberg.ManifestReader.<init>(ManifestReader.java:101)
StdErr  at org.apache.iceberg.ManifestFiles.read(ManifestFiles.java:87)
StdErr  at org.apache.iceberg.ManifestFiles.open(ManifestFiles.java:192)
StdErr  at org.apache.iceberg.ManifestFiles.open(ManifestFiles.java:185)
StdErr  at org.apache.iceberg.CatalogUtil.lambda$deleteFiles$4(CatalogUtil.java:106)
StdErr  at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:404)
StdErr  at org.apache.iceberg.util.Tasks$Builder.access$300(Tasks.java:70)
StdErr  at org.apache.iceberg.util.Tasks$Builder$1.run(Tasks.java:310)
StdErr  at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
StdErr  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
StdErr  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
StdErr  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
StdErr  at java.base/java.lang.Thread.run(Thread.java:834)

@ClassRule
public static MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to use 2 here (or any value greater than 1)? At my work, we've found that some people's code runs into issues once it's distributed. I highly doubt that's likely to be the case here, given that typically those users were relatively new end users of flink, but it still might be helpful to run the tests with non-local shuffles (not in the same JVM). We could do 2 and then halve the number of slots per task manager.

Your call, but since this has been helpful for some people in my organization I thought I'd mention it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems that most Flink unit test with MiniCluster are using one taskmanager. I would stick with the convention.

On the other hand, this code is added to disable CHECK_LEAKED_CLASSLOADER. If we can fix that problem, we can stick with the AbstractTestBase base class

@stevenzwu stevenzwu force-pushed the flink12Upgrade branch 3 times, most recently from ebbf155 to 70235e3 Compare January 2, 2021 19:24
@tweise
Copy link

tweise commented Jan 3, 2021

As the Flink Iceberg connector is evolving, it would probably be better to support multiple versions of Flink. Otherwise users will lack the Iceberg connector improvements if they are not able to follow a forced Flink upgrade. Apache Beam can serve as example how this can be accomplished with a relatively small shim layer: https://github.com/apache/beam/tree/master/runners/flink

@stevenzwu
Copy link
Contributor Author

stevenzwu commented Jan 3, 2021

@tweise yeah. a few other people also asked for the same thing in issue-1951.

I looked at the shim layer approach in flink-connector-hive module. It will work with the existing code (this PR). But we will need to add an Iceberg source impl based on FLIP-27 source interface in 1.12. It is unclear how it works with the shim layer. New FLIP-27 Iceberg source code will extend from interfaces/classes available only in 1.12. I am not sure how it works unless we add a flink112 module in Iceberg.

@@ -76,7 +99,7 @@ protected TableEnvironment getTableEnv() {
TableResult tableResult = getTableEnv().executeSql(String.format(query, args));
tableResult.getJobClient().ifPresent(c -> {
try {
c.getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get();
c.getJobExecutionResult().get();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read the javadoc about TableResult again. The correct way to execute sql and get the results is:

TableResult result = tEnv.execute("select ...");
// using try-with-resources statement
try (CloseableIterator<Row> it = result.collect()) {
        it... // collect same data
}

Then I think we don't have to call c.getJobExecutionResult().get() here. How about removing the line101 ~ line 106 and the following part use try-with-resources statement ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering the same question regarding c.getJobExecutionResult().get(). Updated the PR per your suggestion.

Copy link
Contributor Author

@stevenzwu stevenzwu Jan 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, removing the JobClientgetJobExecutionResult().get() works well for Flink 1.12. It seems that it doesn't work with Flink 1.11 when I tried it in the Flink test refactor PR

@@ -107,7 +121,14 @@ public void testResiduals() throws Exception {
}

private List<Row> executeSQL(String sql) {
return Lists.newArrayList(tEnv.executeSql(sql).collect());
CloseableIterator<Row> iter = getTableEnv().executeSql(sql).collect();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

@@ -52,12 +53,25 @@ public TestFlinkScanSql(String fileFormat) {
@Override
public void before() throws IOException {
super.before();
tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build());
tEnv.executeSql(String.format(
getTableEnv().executeSql(String.format(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to change here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is to avoid creating the table environment for each test method. it is the same pattern used by FlinkTestBase.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, that sounds good to me.

Nit: here we could just use the executeSQL methods now. Maybe we could just rename the executeSQL(String sql) as sql(String query, Object... args), so that we could have the same usage that is similar to the FlinkTestBase#sql.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

really like the suggestion. will update

@stevenzwu
Copy link
Contributor Author

stevenzwu commented Jan 5, 2021

Right now, CI test failed with :iceberg-spark3-extensions:test for java 8.

org.apache.iceberg.spark.extensions.TestCopyOnWriteDelete > testDeleteWithNotExistsSubquery[catalogName = spark_catalog, implementation = org.apache.iceberg.spark.SparkSessionCatalog, config = {type=hive, default-namespace=default, clients=1, parquet-enabled=false, cache-enabled=false}, format = avro, vectorized = false] FAILED
    java.lang.RuntimeException: Failed to get table info from metastore default.table

        Caused by:
        org.apache.thrift.transport.TTransportException: java.net.SocketException: Broken pipe (Write failed)

            Caused by:
            java.net.SocketException: Broken pipe (Write failed)

I tried the same branch against the master branch in my own fork: stevenzwu#11. It also failed with connection in :iceberg-hive3:compileJava. I suspect maybe the build is taking a little longer and CI system aborted the build?

Execution failed for task ':iceberg-hive3:compileJava'.
> Could not resolve all files for configuration ':iceberg-hive3:compileClasspath'.
   > Could not resolve com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:2.7.8.
     Required by:
         project :iceberg-hive3 > org.apache.hive:hive-exec:3.1.2 > org.apache.hadoop:hadoop-yarn-registry:3.1.0 > org.apache.hadoop:hadoop-yarn-common:3.1.0
      > Could not resolve com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:2.7.8.
         > Could not parse POM https://repo.maven.apache.org/maven2/com/fasterxml/jackson/jaxrs/jackson-jaxrs-json-provider/2.7.8/jackson-jaxrs-json-provider-2.7.8.pom
            > Could not resolve com.fasterxml.jackson.jaxrs:jackson-jaxrs-providers:2.7.8.
               > Could not resolve com.fasterxml.jackson.jaxrs:jackson-jaxrs-providers:2.7.8.
                  > Could not get resource 'https://repo.maven.apache.org/maven2/com/fasterxml/jackson/jaxrs/jackson-jaxrs-providers/2.7.8/jackson-jaxrs-providers-2.7.8.pom'.
                     > Could not GET 'https://repo.maven.apache.org/maven2/com/fasterxml/jackson/jaxrs/jackson-jaxrs-providers/2.7.8/jackson-jaxrs-providers-2.7.8.pom'.
                        > Connection reset

I have the same code in a diff branch. that PR is fine with CI build: stevenzwu#8.

@zhangjun0x01
Copy link
Contributor

I have also met several times, iceberg-spark3-extensions CI test failed

@stevenzwu
Copy link
Contributor Author

@zhangjun0x01 thx a lot for confirming your encounter of the problem. I also ran into the same problem locally when running Flink tests when I can tell my laptop became super slow. Maybe some tests are super heavy and cause high GC pause or sth.

I just rebased the branch with latest master. Now CI tests all pass.


private static final Configuration config = new Configuration()
// disable classloader check as Avro may cache class/object in the serializers.
.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to disable the CoreOptions.CHECK_LEAKED_CLASSLOADER here ? Seems like we usually validation the iceberg tables results once we've terminated the flink job.

Copy link
Contributor Author

@stevenzwu stevenzwu Jan 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is already disabled here. We need to disable this check for other tests that are affected by this. CI tests were passing after disabling here. It started to fail again after I rebased with latest master.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is getting really tricky to debug. Can't reproduce it locally. Hence, can't set up break point or add debug logging to custom built Flink.

I tried to disable CoreOptions.CHECK_LEAKED_CLASSLOADER for all MiniClulster tests. That doesn't help.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I'd see how to reproduce those class loader issues.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There're still some ITCases which did not disable the CHECK_LEAKED_CLASSLOADER in current patch, so I made this commit: https://github.com/openinx/incubator-iceberg/commit/c8c2c8e8e72fd1b2051f1f946bb54c968e86c81d to re-trigger the travis CI again, let's see what will it say: https://github.com/openinx/incubator-iceberg/runs/1806279011

Copy link
Member

@openinx openinx Feb 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the travis CI pass, then @stevenzwu, you can apply this commit into this PR to retry this travis CI:

curl -s https://github.com/openinx/incubator-iceberg/commit/c8c2c8e8e72fd1b2051f1f946bb54c968e86c81d.patch | git am

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems all unit tests are passed https://github.com/openinx/incubator-iceberg/runs/1806279011 ! @stevenzwu , you could add this commit and then retry the travis CI again.

Copy link
Contributor Author

@stevenzwu stevenzwu Feb 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@openinx thx a lot for identifying the missing integration tests that require disabling classloader check. I didn't realize the need to search for StreamExecutionEnvironment.getExecutionEnvironment.

BTW, I also added MiniClusterWithClientResource to TestFlinkIcebergSinkV2, which might help speed up a tiny bit.

@openinx
Copy link
Member

openinx commented Feb 1, 2021

@stevenzwu , one more thing: we will need to enable this unit test

@Ignore("Enable this after upgrade flink to 1.12.0, because it starts to support 'CREATE TABLE IF NOT EXISTS")
after we upgrade to flink 1.12.

@stevenzwu stevenzwu force-pushed the flink12Upgrade branch 2 times, most recently from 8fd95cb to ae99bc3 Compare February 1, 2021 17:06
@stevenzwu
Copy link
Contributor Author

@openinx I enabled TestFlinkCatalogTable#testCreateTableIfNotExists() test. I have to make two adjustments. In particular, I have to remove WITH ('location'='/tmp/location')" from the SQL in 2nd create. Otherwise, HadoopCatalog fails the validation. Also, for this test purpose, it is probably not important for the 2nd create to set a new location.

Test testCreateTableIfNotExists[catalogName = testhadoop baseNamespace = ](org.apache.iceberg.flink.TestFlinkCatalogTable) failed with:
org.apache.flink.table.api.TableException: Could not execute CreateTable in path `testhadoop`.`db`.`tl`
  at org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:796)
  at org.apache.flink.table.catalog.CatalogManager.createTable(CatalogManager.java:632)
  at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:776)
  at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)
  at org.apache.iceberg.flink.FlinkTestBase.exec(FlinkTestBase.java:103)
  at org.apache.iceberg.flink.FlinkTestBase.exec(FlinkTestBase.java:107)
  at org.apache.iceberg.flink.FlinkTestBase.sql(FlinkTestBase.java:111)
  at org.apache.iceberg.flink.TestFlinkCatalogTable.testCreateTableIfNotExists(TestFlinkCatalogTable.java:139)
  ...
 Caused by: java.lang.IllegalArgumentException: Cannot set a custom location for a path-based table. Expected file:///var/folders/4d/3shff9g95pz81jmgrcn74zl00000gn/T/junit5877951167101792089/db/tl but got /tmp/location
  at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:142)
  at org.apache.iceberg.hadoop.HadoopCatalog$HadoopCatalogTableBuilder.withLocation(HadoopCatalog.java:441)
  at org.apache.iceberg.CachingCatalog$CachingTableBuilder.withLocation(CachingCatalog.java:191)
  at org.apache.iceberg.CachingCatalog.createTable(CachingCatalog.java:104)
  at org.apache.iceberg.flink.FlinkCatalog.createTable(FlinkCatalog.java:377)
  at org.apache.flink.table.catalog.CatalogManager.lambda$createTable$10(CatalogManager.java:633)
  at org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:790)

… Flink 1.12.

Have to adjust the test in a couple of places.
@@ -119,6 +123,12 @@ protected String warehouseRoot() {
}
}

protected String getFullQualifiedTableName(String tableName) {
final List<String> levels = new ArrayList<>(Arrays.asList(icebergNamespace.levels()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: in apache iceberg, we usually use the unified Lists.newArrayLists to create a new ArrayList.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will fix

@@ -107,7 +107,7 @@ public void testRenameTable() {
() -> getTableEnv().from("tl")
);
Assert.assertEquals(
Collections.singletonList(TableColumn.of("id", DataTypes.BIGINT())),
Collections.singletonList(TableColumn.physical("id", DataTypes.BIGINT())),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenzwu , do you think this issue need to fix in this PR ? I think we'd better to...

Assert.assertEquals(Maps.newHashMap(), table("tl").properties());

sql("CREATE TABLE IF NOT EXISTS tl(id BIGINT) WITH ('location'='/tmp/location')");
Assert.assertEquals("Should still be the old table.", Maps.newHashMap(), table("tl").properties());
final String uuid = UUID.randomUUID().toString();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for the unit tests improvement !

.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);

@ClassRule
public static MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw the TestFlinkIcebergSinkV2 also defines the similar mini cluster resources, then how about make it into a small methods so that the TestFlinkIcebergSinkV2 could reuse it ? For future defined mini cluster resource, we'd better also reuse this one because it will be easy to forget to disable this CHECK_LEAKED_CLASSLOADER switch for developers.

  @ClassRule
  public static MiniClusterWithClientResource miniClusterResource = createMiniClusterResource();

  @ClassRule
  public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

  public static MiniClusterWithClientResource createMiniClusterResource() {
    return new MiniClusterWithClientResource(
        new MiniClusterResourceConfiguration.Builder()
            .setNumberTaskManagers(1)
            .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
            .setConfiguration(CONFIG)
            .build());
  }

The TestFlinkIcebergSinkV2 could just use:

@ClassRule
public static MiniClusterWithClientResource miniClusterResource = MiniClusterBase.createMiniClusterResource();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a great idea. will update

@@ -37,7 +43,7 @@ public static Actions forTable(StreamExecutionEnvironment env, Table table) {
}

public static Actions forTable(Table table) {
return new Actions(StreamExecutionEnvironment.getExecutionEnvironment(), table);
return new Actions(StreamExecutionEnvironment.getExecutionEnvironment(CONFIG), table);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for this.

@stevenzwu
Copy link
Contributor Author

@openinx I addressed your last round of comments. Thanks a lot for the great feedbacks.

thx for catch the case about still using TableColumn.physical. I have fixed it. I also did a search and found no more usage.

Copy link
Member

@openinx openinx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@openinx openinx merged commit c5e6791 into apache:master Feb 2, 2021
@stevenzwu stevenzwu deleted the flink12Upgrade branch February 2, 2021 13:56
@rdblue
Copy link
Contributor

rdblue commented Feb 2, 2021

@openinx and @stevenzwu, what is the compatibility impact of this change? Does master no longer work with 1.11?

@stevenzwu
Copy link
Contributor Author

@rdblue that is correct. master won't work with Flink 1.11 anymore.

The main reason is that the new FLIP-27 Flink source implementation will depend APIs only available from Flink 1.12. there were some discussions on how to make Flink connector works with both Flink 1.11 and 1.12 like adopting the gradle magic from Beam. In the end, it seems that most people are fine with having master only working with Flink 1.12.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants