Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Fix range distribution npe when value is null #11662

Merged
merged 9 commits into from
Dec 10, 2024

Conversation

Guosmilesmile
Copy link
Contributor

When configuring the distribution mode to RANGE, if the partition field in the data contains null values, it will cause the SortKey serialization to fail, resulting in the job continuously restarting.

If we set the partition as bucket(5, name), and some data has a null value for name, then the DataStatisticsOperator will throw an error when serializing the statistics and sending them to the coordinator.

In SortKeySerializer.serialize

  public void serialize(SortKey record, DataOutputView target) throws IOException {

    for (int i = 0; i < size; ++i) {
      int fieldId = transformedFields[i].fieldId();
      Type.TypeID typeId = transformedFields[i].type().typeId();
      switch (typeId) {
        case BOOLEAN:
          target.writeBoolean(record.get(i, Boolean.class));
          break;
        case INTEGER:
        case DATE:
          target.writeInt(record.get(i, Integer.class));
          break;
        
          ...
       }


Error log:

Caused by: org.apache.flink.util.SerializedThrowable: java.lang.NullPointerException
    at org.apache.iceberg.flink.sink.shuffle.SortKeySerializer.serialize(SortKeySerializer.java:133) ~[flink-onejar-4.0.0.jar:?]
    at org.apache.iceberg.flink.sink.shuffle.SortKeySerializer.serialize(SortKeySerializer.java:49) ~[flink-onejar-4.0.0.jar:?]
    at org.apache.flink.api.common.typeutils.base.MapSerializer.serialize(MapSerializer.java:136) ~[flink-dist-1.19.1.jar:1.19.1]
    at org.apache.iceberg.flink.sink.shuffle.DataStatisticsSerializer.serialize(DataStatisticsSerializer.java:113) ~[flink-onejar-4.0.0.jar:?]
    at org.apache.iceberg.flink.sink.shuffle.DataStatisticsSerializer.serialize(DataStatisticsSerializer.java:38) ~[flink-onejar-4.0.0.jar:?]
    at org.apache.iceberg.flink.sink.shuffle.StatisticsUtil.serializeDataStatistics(StatisticsUtil.java:46) ~[flink-onejar-4.0.0.jar:?]
    at org.apache.iceberg.flink.sink.shuffle.StatisticsEvent.createTaskStatisticsEvent(StatisticsEvent.java:51) ~[flink-onejar-4.0.0.jar:?]
    at org.apache.iceberg.flink.sink.shuffle.DataStatisticsOperator.snapshotState(DataStatisticsOperator.java:227) ~[flink-onejar-4.0.0.jar:?]
    at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:234) ~[flink-dist-1.19.1.jar:1.19.1]

We think If a field is set as a partition and a transformation function is also applied, the scenario will inevitably occur when the field contains null values. I believe that during the statistics, we can ignore the data with null values in the relevant partition field. If the sample size is small, it should not affect the job's execution. If the sample size is large, using the default round strategy would also be appropriate.

This PR mainly traverses the contents of the partition field and skips the statistics collection for any parts that contain null values.

@github-actions github-actions bot added the flink label Nov 27, 2024
@ConeyLiu
Copy link
Contributor

Thanks for the contributions. Should we better handle the null value instead of skipping it? Could you also add the UT to cover these changes?
cc @stevenzwu @pvary

@Guosmilesmile
Copy link
Contributor Author

@ConeyLiu yes,I have added UT to cover the relevant changes.

@pvary
Copy link
Contributor

pvary commented Nov 27, 2024

Seems like a hacky solution to me. We will miss all of the advantages for the range distribution mode for records which are affected. Could we just serialize the null values correctly instead?

@Guosmilesmile
Copy link
Contributor Author

@pvary Yes, you are right. We changed our approach to handle null values instead of filtering them out. Before serialization, we added a flag for each field, where true indicates that the field is null. During serialization and deserialization, we read the flag first, which allows us to correctly identify null. The related code has been updated; please take a look to see if it is feasible.

@pvary
Copy link
Contributor

pvary commented Nov 27, 2024

@Guosmilesmile: Thanks for the changes. Left some comments, but started the tests to see if this change cause any other test failures.

Please remove the 1.19, 1.18 changes for now - we usually cherry pick the changes to the other versions after the original PR for the main versions has been merged. This is better for the reviewer (smaller number of files), and better for the contributor (if there is a change request during the review, they don't have to keep the different versions in sync)

It would be nice to have an end2end test for null values too.
Currently we only tests that the statistics are collected correctly, but there might be some issues when applying the stats. It would be nice to have a test for this case too.

@pvary
Copy link
Contributor

pvary commented Nov 27, 2024

@Guosmilesmile: Please apply spotless to remove the formatting failures:

./gradlew spotlessApply

@Guosmilesmile
Copy link
Contributor Author

@pvary Thank you very much for your guidance. I have made the changes according to your suggestions. Please take a look and see if they are feasible.

@pvary
Copy link
Contributor

pvary commented Nov 27, 2024

Thanks for the changes @Guosmilesmile!
One question remains from my side:

It would be nice to have an end2end test for null values too.
Currently we only tests that the statistics are collected correctly, but there might be some issues when applying the stats. It would be nice to have a test for this case too.

@Guosmilesmile
Copy link
Contributor Author

@pvary I have added test cases for DataStatisticsOperator and DataStatisticsCoordinator, specifically for the null scenario. I verified the ProcessElement and EventHand scenarios, simulating the process of data being processed by the operation and receiving statistical information that contains null values for handling. Could you please take a look and see if this is feasible? Thank you!

@Guosmilesmile
Copy link
Contributor Author

@pvary I have also added a unit test for the scenario of building from the table to the sink, which includes data containing null values. Could you please review it as well? Thank you very much!

dataStatisticsCoordinator.start();
tasksReady(dataStatisticsCoordinator);

SortKey NullSortKey = Fixtures.SORT_KEY.copy();
Copy link
Contributor

@pvary pvary Nov 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The variable name should start with lower case letter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pvary I have change it , please check it, thank you

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pvary I'm sorry, there was a little mistake just now. Please try again. Thank you very much.

@Guosmilesmile
Copy link
Contributor Author

@pvary I'm sorry, I just re-uploaded the missing parts. Please help me trigger the test again. I apologize for the inconvenience.

@@ -124,6 +124,14 @@ public void serialize(SortKey record, DataOutputView target) throws IOException
for (int i = 0; i < size; ++i) {
int fieldId = transformedFields[i].fieldId();
Type.TypeID typeId = transformedFields[i].type().typeId();
Object value = record.get(i, Object.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will add a boolean flag overhead for every field. but I can't think of any better alternative. so this is fine to me.

However, ideally we need to figure out how to evolve it in a compatible way. probably leverage TypeSerializerSnapshot

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenzwu I have looked at the current implementation in Flink, and the serialization for null values references the implementation of NullableSerializer. The underlying approach is also to add a boolean flag at the beginning of the field to distinguish, which is quite similar to the approach in this PR. Can we use the current method to fix this issue for now, or are there any other better handling methods?

NullableSerializer<T> extends TypeSerializer<T>

    @Override
    public void serialize(T record, DataOutputView target) throws IOException {
        if (record == null) {
            target.writeBoolean(true);
            target.write(padding);
        } else {
            target.writeBoolean(false);
            originalSerializer.serialize(record, target);
        }
    }

    @Override
    public T deserialize(DataInputView source) throws IOException {
        boolean isNull = deserializeNull(source);
        return isNull ? null : originalSerializer.deserialize(source);
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevenzwu: What do you think about a serializer version, something like the SimpleVersionedSerializer uses. Maybe don't store the version for every key, but inherit the version from the statistics object?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nulls are not allowed everywhere in Flink. Primitive types generally don't support null. In other places, Flink uses boolean flags to encode nulls, e.g. in PojoSerializer: https://github.com/apache/flink/blob/b9c92371dba07b6bfe4368d7b6d7f7c575b4c603/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L392

KeyBy operations are not allowed on null values.

+1 to Steven's suggestion to version the serializer. We already have SortKeySerializerSnapshot. We just need to add a version check for the new boolean serializer because this is an incompatible change which requires serializer migration.

@Guosmilesmile
Copy link
Contributor Author

Guosmilesmile commented Nov 28, 2024

@pvary Because a new flag has been added, the testSerializationSize test case needs to be modified to reflect the corresponding size value + 1byte for the flag . It has been adjusted. I have run the tests locally myself, and all unit tests have passed . I would appreciate it if you could start the tests again.

Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The approach here generally looks good to me once we address the remaining comments.

@@ -124,6 +124,14 @@ public void serialize(SortKey record, DataOutputView target) throws IOException
for (int i = 0; i < size; ++i) {
int fieldId = transformedFields[i].fieldId();
Type.TypeID typeId = transformedFields[i].type().typeId();
Object value = record.get(i, Object.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nulls are not allowed everywhere in Flink. Primitive types generally don't support null. In other places, Flink uses boolean flags to encode nulls, e.g. in PojoSerializer: https://github.com/apache/flink/blob/b9c92371dba07b6bfe4368d7b6d7f7c575b4c603/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L392

KeyBy operations are not allowed on null values.

+1 to Steven's suggestion to version the serializer. We already have SortKeySerializerSnapshot. We just need to add a version check for the new boolean serializer because this is an incompatible change which requires serializer migration.

@Guosmilesmile
Copy link
Contributor Author

Guosmilesmile commented Nov 28, 2024

@mxm Thank you very much for your suggestions. I need to add a version check in SortKeySerializerSnapshot. If the state is restored from an old version, I will directly return TypeSerializerSchemaCompatibility.incompatible(). Additionally, if the readVersion in readSnapshot is an old version, I will throw an exception indicating that the version is not supported. Is this approach feasible?

@Guosmilesmile
Copy link
Contributor Author

@mxm @stevenzwu @pvary Thank you all for your suggestions. I have submitted a version that mainly modifies the SortKeySerializerSnapshot and implements version detection. If the version does not match, it will return the corresponding message to avoid parsing exceptions and data errors when restoring from an old state to a new state. I would appreciate it if you could take a look and see if this modification is feasible

@mxm
Copy link
Contributor

mxm commented Nov 29, 2024

@Guosmilesmile Ideally, we want to return compatibleAfterMigration() and make sure the old and new serializer can be instantiated.

@Guosmilesmile
Copy link
Contributor Author

Guosmilesmile commented Nov 29, 2024

@mxm I'm sorry, I'm a bit confused and would like to ask for your advice. Which part do I need to modify?

  1. Based on the current modifications, simply change TypeSerializerSchemaCompatibility.incompatible() to compatibleAfterMigration().
  2. Copy a class call OldSortKeySerializer from SortKeySerializer, keep the serialize and deserialize with old version ?

Please provide detailed advice. Thank you very much.

Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @Guosmilesmile! Please see the suggestions inline. Let me know if you have questions.

Comment on lines 313 to 376
if (readVersion == 1) {
readV1(in);
} else {
throw new IllegalArgumentException("Unknown read version: " + readVersion);
switch (readVersion) {
case 1:
throw new UnsupportedOperationException(
String.format(
"No longer supported version [%s]. Please upgrade first . ", readVersion));
case 2:
readV2(in);
break;
Copy link
Contributor

@mxm mxm Nov 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading the snapshot isn't any different for V1/V2. The readV1() or readV2() methods are identical. We can just keep the current one. However, we need to save the serializer version as an int field which we can read in restoreSerializer() to pass it into the serializer, just like the schema and the sort order. We can then serialize/deserialize depending on the version used.

For example, in the serializer we would do:

if (version == 1) {
  // Don't read/write null indicator
} else {
  // Read/write null indicator
}

Comment on lines 347 to 391
if (oldSerializerSnapshot.getCurrentVersion() != this.getCurrentVersion()) {
return TypeSerializerSchemaCompatibility.incompatible();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (oldSerializerSnapshot.getCurrentVersion() != this.getCurrentVersion()) {
return TypeSerializerSchemaCompatibility.incompatible();
}
if (oldSerializerSnapshot.getCurrentVersion() == 1 && this.getCurrentVersion() == 2) {
return TypeSerializerSchemaCompatibility.compatibleAfterMigration();
}

@Guosmilesmile
Copy link
Contributor Author

Guosmilesmile commented Nov 29, 2024

Hi @mxm! Thank you very much for your suggestions. Following your advice, I added a version number to SortKeySerializer through restoreSerializer, implementing different serialization methods for v1 and v2.

Based on the performance in the test job, the TaskManager chieve state compatibility. However, the JobManager fails to recover due to theDataStatisticsCoordinator, which leads to a failure to start because the SortKeySerializer used inCompletedStatisticsSerializer is instantiated directly.

In resetToCheckpoint, I can only get byte[] checkpointData, and the coordinator directly uses SortKeySerializer to deserialize the checkpoint data. How can I obtain the version number in resetToCheckpoint and switch back to the current version number at the end? This is the problem I am currently facing.

My approach is to save the version number of the SortKeySerializer in the checkpointData. This will allow the CompletedStatisticsSerializer to be linked with the version of SortKeySerializer.

CheckpointData now is checkpointId + StatisticsType + SortKey,We can change to checkpointId + SortKeySerializerVersion + StatisticsType + SortKey. Due to the modification of the SortKey serialization method, it is not possible to append the version number at the end of the checkpointData, as the last position is occupied by the SortKey. Therefore, the version number can only be added at the beginning.

I will modify the serialize and deserialize methods of the CompletedStatisticsSerializer to provide both v1 and v2. During the restore process, first attempt to parse using v2; if that fails, I will then try v1 to retrieve the version number of the state. After restoring the state with the version of SortKeySerializer , I will switch SortKeySerializer to the latest version number.

Running the test job can resolve the issue. The related code has been submitted, and the unit tests have also been added. Could you please help take a look at whether this solution is feasible? Thank you very much!

Here is error log


2024-11-29 19:44:15,670 ERROR org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator [] - Failed to reset the coordinator to checkpoint and start.
java.io.UncheckedIOException: Fail to deserialize aggregated statistics
    at org.apache.iceberg.flink.sink.shuffle.StatisticsUtil.deserializeCompletedStatistics(StatisticsUtil.java:81) ~[flink-onejar-4.0.0-test.jar:?]
    at org.apache.iceberg.flink.sink.shuffle.DataStatisticsCoordinator.resetToCheckpoint(DataStatisticsCoordinator.java:372) ~[flink-onejar-4.0.0-test.jar:?]
    at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.resetAndStart(RecreateOnResetOperatorCoordinator.java:418) ~[flink-dist-1.19.1.jar:1.19.1]
    at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$resetToCheckpoint$7(RecreateOnResetOperatorCoordinator.java:157) ~[flink-dist-1.19.1.jar:1.19.1]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) ~[?:?]
    at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:883) ~[?:?]
    at java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2257) ~[?:?]
    at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.resetToCheckpoint(RecreateOnResetOperatorCoordinator.java:144) ~[flink-dist-1.19.1.jar:1.19.1]
    at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.resetToCheckpoint(OperatorCoordinatorHolder.java:303) ~[flink-dist-1.19.1.jar:1.19.1]
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreStateToCoordinators(CheckpointCoordinator.java:2128) ~[flink-dist-1.19.1.jar:1.19.1]
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1837) ~[flink-dist-1.19.1.jar:1.19.1]
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1939) ~[flink-dist-1.19.1.jar:1.19.1]
    at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:239) ~[flink-dist-1.19.1.jar:1.19.1]
    at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:214) ~[flink-dist-1.19.1.jar:1.19.1]
    at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:381) ~[flink-dist-1.19.1.jar:1.19.1]
    at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:224) ~[flink-dist-1.19.1.jar:1.19.1]
    at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:140) ~[flink-dist-1.19.1.jar:1.19.1]
    at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:162) ~[flink-dist-1.19.1.jar:1.19.1]
    at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:121) ~[flink-dist-1.19.1.jar:1.19.1]
    at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:379) ~[flink-dist-1.19.1.jar:1.19.1]
    at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:356) ~[flink-dist-1.19.1.jar:1.19.1]
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128) ~[flink-dist-1.19.1.jar:1.19.1]
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100) ~[flink-dist-1.19.1.jar:1.19.1]
    at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ~[flink-dist-1.19.1.jar:1.19.1]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) [?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
    at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.io.UTFDataFormatException: malformed input: partial character at end
    at org.apache.flink.core.memory.DataInputDeserializer.readUTF(DataInputDeserializer.java:290) ~[flink-dist-1.19.1.jar:1.19.1]
    at org.apache.iceberg.flink.sink.shuffle.SortKeySerializer.deserialize(SortKeySerializer.java:262) ~[flink-onejar-4.0.0-test.jar:?]
    at org.apache.iceberg.flink.sink.shuffle.SortKeySerializer.deserialize(SortKeySerializer.java:219) ~[flink-onejar-4.0.0-test.jar:?]
    at org.apache.iceberg.flink.sink.shuffle.SortKeySerializer.deserialize(SortKeySerializer.java:49) ~[flink-onejar-4.0.0-test.jar:?]
    at org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:153) ~[flink-dist-1.19.1.jar:1.19.1]
    at org.apache.iceberg.flink.sink.shuffle.CompletedStatisticsSerializer.deserialize(CompletedStatisticsSerializer.java:99) ~[flink-onejar-4.0.0-test.jar:?]
    at org.apache.iceberg.flink.sink.shuffle.CompletedStatisticsSerializer.deserialize(CompletedStatisticsSerializer.java:38) ~[flink-onejar-4.0.0-test.jar:?]
    at org.apache.iceberg.flink.sink.shuffle.StatisticsUtil.deserializeCompletedStatistics(StatisticsUtil.java:79) ~[flink-onejar-4.0.0-test.jar:?]
    ... 27 more

…rsion and null sort key,and add UT for the change
@mxm
Copy link
Contributor

mxm commented Dec 2, 2024

Thanks for the update @Guosmilesmile! Unfortunately, we don't have a way to encode the serializer version for all serializers, so a best-effort approach to retry with a different serializer version on restore failure may be the best we can do.

this.version = 1;
break;
case 2:
readV1(in);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove the version suffix in the method name? It handles both versions.

@@ -93,15 +126,35 @@ public void serialize(CompletedStatistics record, DataOutputView target) throws

@Override
public CompletedStatistics deserialize(DataInputView source) throws IOException {
long checkpointId = source.readLong();
changeSortKeySerializerVersion(source.readInt());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK We were not writing the serializer version before this change. So not sure this works when restoring the serializer to read old data.

Comment on lines 81 to 86
try {
DataInputDeserializer input = new DataInputDeserializer(bytes);
return statisticsSerializer.deserializeV1(input);
} catch (IOException ioException) {
throw new UncheckedIOException("Fail to deserialize aggregated statistics", ioException);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, we're retrying here in case the restore fails.

@Guosmilesmile
Copy link
Contributor Author

Guosmilesmile commented Dec 2, 2024

@mxm Thank you very much for your suggestions. I have made the necessary modifications, and I encountered an occasional scenario where deserializing v1 checkpointData use v2 was successful, but the value is wrong and case another exception in other side, so I added a validation for the contents of CompletedStatistics. I appreciate you taking the time out of your busy schedule to review it again. I am very grateful for your guidance.

@Guosmilesmile
Copy link
Contributor Author

@mxm @pvary @stevenzwu Thank you all very much for your guidance. I would appreciate it if you could take another look and let me know what needs to be done next to keep things moving forward.

Copy link
Contributor

@mxm mxm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this looks good, but I'll defer to @stevenzwu and @pvary for the final approval.

@Guosmilesmile
Copy link
Contributor Author

I think this looks good, but I'll defer to @stevenzwu and @pvary for the final approval.

@mxm Thank you very much for taking the time to review my code. The relevant comments have been added.

@pvary @stevenzwu I would appreciate it if both of you could take a look for the final approval.

@Guosmilesmile Guosmilesmile requested a review from pvary December 9, 2024 23:10
@pvary
Copy link
Contributor

pvary commented Dec 10, 2024

@stevenzwu: any last minute comments?

@pvary pvary merged commit ac6509a into apache:main Dec 10, 2024
20 checks passed
@pvary
Copy link
Contributor

pvary commented Dec 10, 2024

Thanks @Guosmilesmile for the PR, and @mxm and @stevenzwu for the review!

@Guosmilesmile please prepare the backport commits to the other Flink versions.

Thanks, Peter

@Guosmilesmile
Copy link
Contributor Author

Thanks @Guosmilesmile for the PR, and @mxm and @stevenzwu for the review!

@Guosmilesmile please prepare the backport commits to the other Flink versions.

Thanks, Peter

Thank you all for your reviews @mxm @stevenzwu @pvary .

@pvary The related backport has been submitted, see #11745. Thank you!

zachdisc pushed a commit to zachdisc/iceberg that referenced this pull request Dec 23, 2024
pvary pushed a commit that referenced this pull request Jan 3, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants