-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: Upgrade version from 1.11.0 to 1.12.1 #1956
Conversation
I think we should add the support for both flink 1.11 and flink 1.12 , similar to spark2 and spark3. Similar to the support for hive in flink, flink also supports all hive versions , what do you think ? |
Are you suggesting that we have a module for each Flink minor version release (like 1.11 and 1.12)? I am worried that It can quickly grow out of control, as Flink does a minor version release typically every 4 months. Flink Kafka connector used to be in this model (like kafka08, kafka09, kafka10, ...). Now it moved to the universal connector and just track the latest release version of kafka-clients. Or are you suggesting that the Flink connector code can handle all the 1.x Flink versions? I am not sure how flink hive connector handles some of the small API changes among all the versions. As for spark2 and spark3, that is a major version upgrade. For upgrading Spark from 2.3 to 2.4, I assume Iceberg just upgrade the version in spark2 module. If in the future, Flink introduced major breaking API change and go up to 2.x, we probably should have a flink2 module in Iceberg. Since the Flink Iceberg connector lives in the Iceberg project, I was thinking that the latest connector can just pick a Flink minor version as the paved path. That is why I was asking if we should wait for Flink 1.12.1 patch release for some bug fixes. |
@JingsongLi saw the CI failure in Flink tests with |
flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
Outdated
Show resolved
Hide resolved
@@ -83,8 +83,8 @@ public void testGetTable() { | |||
Types.NestedField.optional(1, "strV", Types.StringType.get()))); | |||
Assert.assertEquals( | |||
Arrays.asList( | |||
TableColumn.of("id", DataTypes.BIGINT()), | |||
TableColumn.of("strV", DataTypes.STRING())), | |||
TableColumn.physical("id", DataTypes.BIGINT()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's strange here, because I saw the TableColumn
is marked as PublicEvolving
, but after released flink 1.12.0 it did not have any Interface compatibility guarantee. At least, it should marked as deprecated
, and keep it a major release.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stevenzwu I think we could change to use the loaded iceberg schema to accomplish the schema validation, so that we don't have to use the flink's TableSchema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thx a lot for the suggestion. will update
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, this API break was a mistake and will be fixed in the next bugfix release
https://issues.apache.org/jira/browse/FLINK-21226
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's OK, we've planned to just upgrade the flink 1.12 because we needs the newly introduced API from 1.12.0 to develop our unified iceberg flink source/sink. Thanks @twalthr for the work.
I am going to close this PR, as it seems that we probably need more discussion on how to upgrade Flink from 1.11 to 1.12 |
@stevenzwu , I think it's good time to upgrade flink from 1.11 to 1.12 after we've released the iceberg 0.11.0. About which way to accomplish the upgrade, I'm considering the same way as your proposed because of the flink compatibility issues. That means if someone just want the flink sink feature then it's better to choose iceberg 0.11.0 or 0.10.0, while if want more advanced features such as flink CDC or UPSERT stream, or the unified batch/stream iceberg source sink (the work you're working), then it's better to upgrade to use flink 1.12. |
@openinx I totally agree that we shouldn't upgrade flink to 1.12.0 until Iceberg 0.11.0 is released. Regarding this PR, this is still some problem with the test failures Might be sth related to Avro class cache as pointed out by Flink doc.
|
61279cc
to
2c721b1
Compare
@ClassRule | ||
public static MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource( | ||
new MiniClusterResourceConfiguration.Builder() | ||
.setNumberTaskManagers(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to use 2
here (or any value greater than 1)? At my work, we've found that some people's code runs into issues once it's distributed. I highly doubt that's likely to be the case here, given that typically those users were relatively new end users of flink, but it still might be helpful to run the tests with non-local shuffles (not in the same JVM). We could do 2 and then halve the number of slots per task manager.
Your call, but since this has been helpful for some people in my organization I thought I'd mention it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems that most Flink unit test with MiniCluster are using one taskmanager. I would stick with the convention.
On the other hand, this code is added to disable CHECK_LEAKED_CLASSLOADER
. If we can fix that problem, we can stick with the AbstractTestBase
base class
ebbf155
to
70235e3
Compare
As the Flink Iceberg connector is evolving, it would probably be better to support multiple versions of Flink. Otherwise users will lack the Iceberg connector improvements if they are not able to follow a forced Flink upgrade. Apache Beam can serve as example how this can be accomplished with a relatively small shim layer: https://github.com/apache/beam/tree/master/runners/flink |
70235e3
to
0f2c797
Compare
@tweise yeah. a few other people also asked for the same thing in issue-1951. I looked at the shim layer approach in |
0f2c797
to
558e3f5
Compare
@@ -76,7 +99,7 @@ protected TableEnvironment getTableEnv() { | |||
TableResult tableResult = getTableEnv().executeSql(String.format(query, args)); | |||
tableResult.getJobClient().ifPresent(c -> { | |||
try { | |||
c.getJobExecutionResult(Thread.currentThread().getContextClassLoader()).get(); | |||
c.getJobExecutionResult().get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I read the javadoc about TableResult again. The correct way to execute sql and get the results is:
TableResult result = tEnv.execute("select ...");
// using try-with-resources statement
try (CloseableIterator<Row> it = result.collect()) {
it... // collect same data
}
Then I think we don't have to call c.getJobExecutionResult().get()
here. How about removing the line101 ~ line 106 and the following part use try-with-resources
statement ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was wondering the same question regarding c.getJobExecutionResult().get()
. Updated the PR per your suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, removing the JobClientgetJobExecutionResult().get()
works well for Flink 1.12. It seems that it doesn't work with Flink 1.11 when I tried it in the Flink test refactor PR
@@ -107,7 +121,14 @@ public void testResiduals() throws Exception { | |||
} | |||
|
|||
private List<Row> executeSQL(String sql) { | |||
return Lists.newArrayList(tEnv.executeSql(sql).collect()); | |||
CloseableIterator<Row> iter = getTableEnv().executeSql(sql).collect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar issues here. ( see https://github.com/apache/iceberg/pull/1956/files#r551679707 )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
@@ -52,12 +53,25 @@ public TestFlinkScanSql(String fileFormat) { | |||
@Override | |||
public void before() throws IOException { | |||
super.before(); | |||
tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()); | |||
tEnv.executeSql(String.format( | |||
getTableEnv().executeSql(String.format( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to change here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is to avoid creating the table environment for each test method. it is the same pattern used by FlinkTestBase
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, that sounds good to me.
Nit: here we could just use the executeSQL
methods now. Maybe we could just rename the executeSQL(String sql)
as sql(String query, Object... args)
, so that we could have the same usage that is similar to the FlinkTestBase#sql.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
really like the suggestion. will update
558e3f5
to
4f1f1d4
Compare
Right now, CI test failed with
I tried the same branch against the master branch in my own fork: stevenzwu#11. It also failed with connection in
I have the same code in a diff branch. that PR is fine with CI build: stevenzwu#8. |
I have also met several times, |
4f1f1d4
to
f33e94c
Compare
@zhangjun0x01 thx a lot for confirming your encounter of the problem. I also ran into the same problem locally when running Flink tests when I can tell my laptop became super slow. Maybe some tests are super heavy and cause high GC pause or sth. I just rebased the branch with latest master. Now CI tests all pass. |
|
||
private static final Configuration config = new Configuration() | ||
// disable classloader check as Avro may cache class/object in the serializers. | ||
.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to disable the CoreOptions.CHECK_LEAKED_CLASSLOADER
here ? Seems like we usually validation the iceberg tables results once we've terminated the flink job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is already disabled here. We need to disable this check for other tests that are affected by this. CI tests were passing after disabling here. It started to fail again after I rebased with latest master.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is getting really tricky to debug. Can't reproduce it locally. Hence, can't set up break point or add debug logging to custom built Flink.
I tried to disable CoreOptions.CHECK_LEAKED_CLASSLOADER
for all MiniClulster tests. That doesn't help.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I'd see how to reproduce those class loader issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There're still some ITCases which did not disable the CHECK_LEAKED_CLASSLOADER
in current patch, so I made this commit: https://github.com/openinx/incubator-iceberg/commit/c8c2c8e8e72fd1b2051f1f946bb54c968e86c81d to re-trigger the travis CI again, let's see what will it say: https://github.com/openinx/incubator-iceberg/runs/1806279011
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the travis CI pass, then @stevenzwu, you can apply this commit into this PR to retry this travis CI:
curl -s https://github.com/openinx/incubator-iceberg/commit/c8c2c8e8e72fd1b2051f1f946bb54c968e86c81d.patch | git am
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems all unit tests are passed https://github.com/openinx/incubator-iceberg/runs/1806279011 ! @stevenzwu , you could add this commit and then retry the travis CI again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@openinx thx a lot for identifying the missing integration tests that require disabling classloader check. I didn't realize the need to search for StreamExecutionEnvironment.getExecutionEnvironment
.
BTW, I also added MiniClusterWithClientResource
to TestFlinkIcebergSinkV2
, which might help speed up a tiny bit.
@stevenzwu , one more thing: we will need to enable this unit test
|
8fd95cb
to
ae99bc3
Compare
…anged: (1) like '%' or '%%' now matches null (2) NOT IN filter push down may not maintain the same order
…because Flink tests can still use Flink's classloader to read and validate table result after the Flink job is terminated.
393f970
to
a7c983b
Compare
@openinx I enabled
|
… Flink 1.12. Have to adjust the test in a couple of places.
a7c983b
to
b05c00e
Compare
@@ -119,6 +123,12 @@ protected String warehouseRoot() { | |||
} | |||
} | |||
|
|||
protected String getFullQualifiedTableName(String tableName) { | |||
final List<String> levels = new ArrayList<>(Arrays.asList(icebergNamespace.levels())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: in apache iceberg, we usually use the unified Lists.newArrayLists
to create a new ArrayList.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will fix
@@ -107,7 +107,7 @@ public void testRenameTable() { | |||
() -> getTableEnv().from("tl") | |||
); | |||
Assert.assertEquals( | |||
Collections.singletonList(TableColumn.of("id", DataTypes.BIGINT())), | |||
Collections.singletonList(TableColumn.physical("id", DataTypes.BIGINT())), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stevenzwu , do you think this issue need to fix in this PR ? I think we'd better to...
Assert.assertEquals(Maps.newHashMap(), table("tl").properties()); | ||
|
||
sql("CREATE TABLE IF NOT EXISTS tl(id BIGINT) WITH ('location'='/tmp/location')"); | ||
Assert.assertEquals("Should still be the old table.", Maps.newHashMap(), table("tl").properties()); | ||
final String uuid = UUID.randomUUID().toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for the unit tests improvement !
.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false); | ||
|
||
@ClassRule | ||
public static MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw the TestFlinkIcebergSinkV2 also defines the similar mini cluster resources, then how about make it into a small methods so that the TestFlinkIcebergSinkV2 could reuse it ? For future defined mini cluster resource, we'd better also reuse this one because it will be easy to forget to disable this CHECK_LEAKED_CLASSLOADER
switch for developers.
@ClassRule
public static MiniClusterWithClientResource miniClusterResource = createMiniClusterResource();
@ClassRule
public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
public static MiniClusterWithClientResource createMiniClusterResource() {
return new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
.setConfiguration(CONFIG)
.build());
}
The TestFlinkIcebergSinkV2 could just use:
@ClassRule
public static MiniClusterWithClientResource miniClusterResource = MiniClusterBase.createMiniClusterResource();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a great idea. will update
@@ -37,7 +43,7 @@ public static Actions forTable(StreamExecutionEnvironment env, Table table) { | |||
} | |||
|
|||
public static Actions forTable(Table table) { | |||
return new Actions(StreamExecutionEnvironment.getExecutionEnvironment(), table); | |||
return new Actions(StreamExecutionEnvironment.getExecutionEnvironment(CONFIG), table); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for this.
@openinx I addressed your last round of comments. Thanks a lot for the great feedbacks. thx for catch the case about still using |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@openinx and @stevenzwu, what is the compatibility impact of this change? Does master no longer work with 1.11? |
@rdblue that is correct. master won't work with Flink 1.11 anymore. The main reason is that the new FLIP-27 Flink source implementation will depend APIs only available from Flink 1.12. there were some discussions on how to make Flink connector works with both Flink 1.11 and 1.12 like adopting the gradle magic from Beam. In the end, it seems that most people are fine with having master only working with Flink 1.12. |
No description provided.