-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: Fix range distribution npe when value is null #11662
Conversation
Thanks for the contributions. Should we better handle the null value instead of skipping it? Could you also add the UT to cover these changes? |
@ConeyLiu yes,I have added UT to cover the relevant changes. |
Seems like a hacky solution to me. We will miss all of the advantages for the range distribution mode for records which are affected. Could we just serialize the |
@pvary Yes, you are right. We changed our approach to handle null values instead of filtering them out. Before serialization, we added a flag for each field, where true indicates that the field is null. During serialization and deserialization, we read the flag first, which allows us to correctly identify null. The related code has been updated; please take a look to see if it is feasible. |
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java
Show resolved
Hide resolved
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java
Show resolved
Hide resolved
...20/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java
Outdated
Show resolved
Hide resolved
@Guosmilesmile: Thanks for the changes. Left some comments, but started the tests to see if this change cause any other test failures. Please remove the 1.19, 1.18 changes for now - we usually cherry pick the changes to the other versions after the original PR for the main versions has been merged. This is better for the reviewer (smaller number of files), and better for the contributor (if there is a change request during the review, they don't have to keep the different versions in sync) It would be nice to have an end2end test for null values too. |
@Guosmilesmile: Please apply spotless to remove the formatting failures:
|
@pvary Thank you very much for your guidance. I have made the changes according to your suggestions. Please take a look and see if they are feasible. |
Thanks for the changes @Guosmilesmile!
|
@pvary I have added test cases for DataStatisticsOperator and DataStatisticsCoordinator, specifically for the null scenario. I verified the ProcessElement and EventHand scenarios, simulating the process of data being processed by the operation and receiving statistical information that contains null values for handling. Could you please take a look and see if this is feasible? Thank you! |
@pvary I have also added a unit test for the scenario of building from the table to the sink, which includes data containing null values. Could you please review it as well? Thank you very much! |
dataStatisticsCoordinator.start(); | ||
tasksReady(dataStatisticsCoordinator); | ||
|
||
SortKey NullSortKey = Fixtures.SORT_KEY.copy(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: The variable name should start with lower case letter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pvary I have change it , please check it, thank you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pvary I'm sorry, there was a little mistake just now. Please try again. Thank you very much.
@pvary I'm sorry, I just re-uploaded the missing parts. Please help me trigger the test again. I apologize for the inconvenience. |
@@ -124,6 +124,14 @@ public void serialize(SortKey record, DataOutputView target) throws IOException | |||
for (int i = 0; i < size; ++i) { | |||
int fieldId = transformedFields[i].fieldId(); | |||
Type.TypeID typeId = transformedFields[i].type().typeId(); | |||
Object value = record.get(i, Object.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will add a boolean flag overhead for every field. but I can't think of any better alternative. so this is fine to me.
However, ideally we need to figure out how to evolve it in a compatible way. probably leverage TypeSerializerSnapshot
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stevenzwu I have looked at the current implementation in Flink, and the serialization for null values references the implementation of NullableSerializer. The underlying approach is also to add a boolean flag at the beginning of the field to distinguish, which is quite similar to the approach in this PR. Can we use the current method to fix this issue for now, or are there any other better handling methods?
NullableSerializer<T> extends TypeSerializer<T>
@Override
public void serialize(T record, DataOutputView target) throws IOException {
if (record == null) {
target.writeBoolean(true);
target.write(padding);
} else {
target.writeBoolean(false);
originalSerializer.serialize(record, target);
}
}
@Override
public T deserialize(DataInputView source) throws IOException {
boolean isNull = deserializeNull(source);
return isNull ? null : originalSerializer.deserialize(source);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stevenzwu: What do you think about a serializer version, something like the SimpleVersionedSerializer uses. Maybe don't store the version for every key, but inherit the version from the statistics object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nulls are not allowed everywhere in Flink. Primitive types generally don't support null. In other places, Flink uses boolean flags to encode nulls, e.g. in PojoSerializer: https://github.com/apache/flink/blob/b9c92371dba07b6bfe4368d7b6d7f7c575b4c603/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L392
KeyBy operations are not allowed on null values.
+1 to Steven's suggestion to version the serializer. We already have SortKeySerializerSnapshot. We just need to add a version check for the new boolean serializer because this is an incompatible change which requires serializer migration.
@pvary Because a new flag has been added, the testSerializationSize test case needs to be modified to reflect the corresponding size value + 1byte for the flag . It has been adjusted. I have run the tests locally myself, and all unit tests have passed . I would appreciate it if you could start the tests again. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The approach here generally looks good to me once we address the remaining comments.
@@ -124,6 +124,14 @@ public void serialize(SortKey record, DataOutputView target) throws IOException | |||
for (int i = 0; i < size; ++i) { | |||
int fieldId = transformedFields[i].fieldId(); | |||
Type.TypeID typeId = transformedFields[i].type().typeId(); | |||
Object value = record.get(i, Object.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nulls are not allowed everywhere in Flink. Primitive types generally don't support null. In other places, Flink uses boolean flags to encode nulls, e.g. in PojoSerializer: https://github.com/apache/flink/blob/b9c92371dba07b6bfe4368d7b6d7f7c575b4c603/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L392
KeyBy operations are not allowed on null values.
+1 to Steven's suggestion to version the serializer. We already have SortKeySerializerSnapshot. We just need to add a version check for the new boolean serializer because this is an incompatible change which requires serializer migration.
@mxm Thank you very much for your suggestions. I need to add a version check in SortKeySerializerSnapshot. If the state is restored from an old version, I will directly return TypeSerializerSchemaCompatibility.incompatible(). Additionally, if the readVersion in readSnapshot is an old version, I will throw an exception indicating that the version is not supported. Is this approach feasible? |
@mxm @stevenzwu @pvary Thank you all for your suggestions. I have submitted a version that mainly modifies the SortKeySerializerSnapshot and implements version detection. If the version does not match, it will return the corresponding message to avoid parsing exceptions and data errors when restoring from an old state to a new state. I would appreciate it if you could take a look and see if this modification is feasible |
@Guosmilesmile Ideally, we want to return compatibleAfterMigration() and make sure the old and new serializer can be instantiated. |
@mxm I'm sorry, I'm a bit confused and would like to ask for your advice. Which part do I need to modify?
Please provide detailed advice. Thank you very much. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @Guosmilesmile! Please see the suggestions inline. Let me know if you have questions.
if (readVersion == 1) { | ||
readV1(in); | ||
} else { | ||
throw new IllegalArgumentException("Unknown read version: " + readVersion); | ||
switch (readVersion) { | ||
case 1: | ||
throw new UnsupportedOperationException( | ||
String.format( | ||
"No longer supported version [%s]. Please upgrade first . ", readVersion)); | ||
case 2: | ||
readV2(in); | ||
break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reading the snapshot isn't any different for V1/V2. The readV1()
or readV2()
methods are identical. We can just keep the current one. However, we need to save the serializer version as an int
field which we can read in restoreSerializer()
to pass it into the serializer, just like the schema and the sort order. We can then serialize/deserialize depending on the version used.
For example, in the serializer we would do:
if (version == 1) {
// Don't read/write null indicator
} else {
// Read/write null indicator
}
if (oldSerializerSnapshot.getCurrentVersion() != this.getCurrentVersion()) { | ||
return TypeSerializerSchemaCompatibility.incompatible(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (oldSerializerSnapshot.getCurrentVersion() != this.getCurrentVersion()) { | |
return TypeSerializerSchemaCompatibility.incompatible(); | |
} | |
if (oldSerializerSnapshot.getCurrentVersion() == 1 && this.getCurrentVersion() == 2) { | |
return TypeSerializerSchemaCompatibility.compatibleAfterMigration(); | |
} |
Hi @mxm! Thank you very much for your suggestions. Following your advice, I added a version number to SortKeySerializer through restoreSerializer, implementing different serialization methods for v1 and v2. Based on the performance in the test job, the TaskManager chieve state compatibility. However, the JobManager fails to recover due to the In My approach is to save the version number of the SortKeySerializer in the checkpointData. This will allow the CompletedStatisticsSerializer to be linked with the version of SortKeySerializer. CheckpointData now is checkpointId + StatisticsType + SortKey,We can change to checkpointId + SortKeySerializerVersion + StatisticsType + SortKey. Due to the modification of the SortKey serialization method, it is not possible to append the version number at the end of the checkpointData, as the last position is occupied by the SortKey. Therefore, the version number can only be added at the beginning. I will modify the serialize and deserialize methods of the CompletedStatisticsSerializer to provide both v1 and v2. During the restore process, first attempt to parse using v2; if that fails, I will then try v1 to retrieve the version number of the state. After restoring the state with the version of SortKeySerializer , I will switch SortKeySerializer to the latest version number. Running the test job can resolve the issue. The related code has been submitted, and the unit tests have also been added. Could you please help take a look at whether this solution is feasible? Thank you very much! Here is error log
|
…rsion and null sort key,and add UT for the change
cd3d93d
to
a49b87b
Compare
Thanks for the update @Guosmilesmile! Unfortunately, we don't have a way to encode the serializer version for all serializers, so a best-effort approach to retry with a different serializer version on restore failure may be the best we can do. |
this.version = 1; | ||
break; | ||
case 2: | ||
readV1(in); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove the version suffix in the method name? It handles both versions.
@@ -93,15 +126,35 @@ public void serialize(CompletedStatistics record, DataOutputView target) throws | |||
|
|||
@Override | |||
public CompletedStatistics deserialize(DataInputView source) throws IOException { | |||
long checkpointId = source.readLong(); | |||
changeSortKeySerializerVersion(source.readInt()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAIK We were not writing the serializer version before this change. So not sure this works when restoring the serializer to read old data.
try { | ||
DataInputDeserializer input = new DataInputDeserializer(bytes); | ||
return statisticsSerializer.deserializeV1(input); | ||
} catch (IOException ioException) { | ||
throw new UncheckedIOException("Fail to deserialize aggregated statistics", ioException); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, we're retrying here in case the restore fails.
…rsion,and add UT for the change
…rsion,and add UT for the change
@mxm Thank you very much for your suggestions. I have made the necessary modifications, and I encountered an occasional scenario where deserializing v1 checkpointData use v2 was successful, but the value is wrong and case another exception in other side, so I added a validation for the contents of CompletedStatistics. I appreciate you taking the time out of your busy schedule to review it again. I am very grateful for your guidance. |
…rsion,and add UT for the change
…rsion,and add UT for the change
@mxm @pvary @stevenzwu Thank you all very much for your guidance. I would appreciate it if you could take another look and let me know what needs to be done next to keep things moving forward. |
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/CompletedStatistics.java
Show resolved
Hide resolved
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/StatisticsUtil.java
Show resolved
Hide resolved
…rsion,and add UT for the change
c412954
to
fb226bb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this looks good, but I'll defer to @stevenzwu and @pvary for the final approval.
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/StatisticsUtil.java
Outdated
Show resolved
Hide resolved
…rsion,and add UT for the change
@mxm Thank you very much for taking the time to review my code. The relevant comments have been added. @pvary @stevenzwu I would appreciate it if both of you could take a look for the final approval. |
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java
Outdated
Show resolved
Hide resolved
…rsion,and add UT for the change
@stevenzwu: any last minute comments? |
Thanks @Guosmilesmile for the PR, and @mxm and @stevenzwu for the review! @Guosmilesmile please prepare the backport commits to the other Flink versions. Thanks, Peter |
Thank you all for your reviews @mxm @stevenzwu @pvary . @pvary The related backport has been submitted, see #11745. Thank you! |
…to Flink 1.18 and 1.19 (#11745)
When configuring the distribution mode to RANGE, if the partition field in the data contains null values, it will cause the SortKey serialization to fail, resulting in the job continuously restarting.
If we set the partition as
bucket(5, name)
, and some data has a null value forname
, then theDataStatisticsOperator
will throw an error when serializing the statistics and sending them to the coordinator.In SortKeySerializer.serialize
Error log:
We think If a field is set as a partition and a transformation function is also applied, the scenario will inevitably occur when the field contains null values. I believe that during the statistics, we can ignore the data with null values in the relevant partition field. If the sample size is small, it should not affect the job's execution. If the sample size is large, using the default round strategy would also be appropriate.
This PR mainly traverses the contents of the partition field and skips the statistics collection for any parts that contain null values.