-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parquet: Fixes get null values for the nested field partition column #4627
Conversation
@@ -126,6 +131,48 @@ public void testBasicProjection() throws IOException { | |||
TestHelpers.assertRows(result, expected); | |||
} | |||
|
|||
@Test | |||
public void testReadPartitionColumn() throws Exception { | |||
Assume.assumeTrue("Temporary skip ORC", FileFormat.ORC != fileFormat); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Temporary skip ORC test because #4599 is working on fixing it.
|
||
@Test | ||
public void testReadPartitionColumn() throws Exception { | ||
Assume.assumeTrue("Temporary skip ORC", !"orc".equals(format)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as here too, temporary skip ORC test because #4599 is working on fixing it.
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/StructRecord.java
Outdated
Show resolved
Hide resolved
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/InnerRecord.java
Outdated
Show resolved
Hide resolved
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
Outdated
Show resolved
Hide resolved
flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetReaders.java
Outdated
Show resolved
Hide resolved
Thanks, @kbendick for the detailed code reviewing. Comments have been addressed. Please take another look when you are free. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @ConeyLiu! Thanks for the PR. Sorry for the delay in reviewing it, this seems important.
I tested this locally and you are absolutely correct, this is an issue.
Could you rebase this off of latest master @ConeyLiu? There have been changes in TestFlinkInputFormat
class it seems (and we're on 1.15 now, technically, but that's not too urgent).
cc @rdblue
@@ -112,11 +116,14 @@ public ParquetValueReader<RowData> struct(Types.StructType expected, GroupType s | |||
List<ParquetValueReader<?>> reorderedFields = Lists.newArrayListWithExpectedSize( | |||
expectedFields.size()); | |||
List<Type> types = Lists.newArrayListWithExpectedSize(expectedFields.size()); | |||
// Inferring MaxDefinitionLevel from parent field | |||
int inferredMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Now that we're putting the field depth into the map maxDefinitionLevelsById
, and we have the same check for idToConstant.containsKey(id)
, do we need to have this fallback?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kbendick, we could not update the maxDefinitionLevelsById
if the fieldType.getId()
is null, you could see it at L101.
spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
Outdated
Show resolved
Hide resolved
Hi @kbendick, is there any more advice for this? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice observation and fix. It looks good to me. Maybe worth to have some others look at it as well, @RussellSpitzer @aokolnychyi ?
for (Types.NestedField field : expectedFields) { | ||
int id = field.fieldId(); | ||
if (idToConstant.containsKey(id)) { | ||
// containsKey is used because the constant may be null | ||
reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id))); | ||
int fieldMaxDefinitionLevel = maxDefinitionLevelsById.getOrDefault(id, inferredMaxDefinitionLevel); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Im kind of new to this code, so curious, why do we take from parent? Is it because the field is indeed null here and we will thus just take parent's definition level?
Should we call it parentMaxDefinitionLevel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The max definition level of the current column path is: type.getMaxDefinitionLevel(currentPath()) - 1
, the children should be type.getMaxDefinitionLevel(currentPath())
. So the fieldMaxDefinitionLevel
is the inferred children max definition level, I have updated the comments. And it is used only when we could not find the value from maxDefinitionLevelsById
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I still think the name 'inferred' is a bit confusing, it indicates to me that it's the one that will be chosen. Will it be better to call it 'defaultMaxDefintionLevel' or 'parentMaxDefinitionLevel'?
Also just curious, what is the example of a case where we need this default value? I tried to walk through one example and found the expected field is always in the actual struct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to defaultMaxDefinitionLevel
parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java
Show resolved
Hide resolved
@ConeyLiu: Thanks for the finding and the fix. Do we have the same issue for ORC and Avro too? |
ccd0ca7
to
43bb36b
Compare
Thanks @szehon-ho @pvary for the review.
Updated to Spark 3.3 and Flink 1.16, the Pig part is kept.
Avro is OK, while ORC has a similar problem at here #4604. |
@@ -148,6 +149,9 @@ public ParquetValueReader<?> struct( | |||
int id = fieldType.getId().intValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@szehon-ho here the fieldType.getId()
could be null, I guess this is for a compatible purpose. So I add the int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath());
in the following code in case we get a null value from maxDefinitionLevelsById
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Yea that's ok to me, just in my understanding I'm not entirely sure when it is ever null.
Sorry would you be able to change the comment as well to "Defaulting to parent max definition level" or something like that? Otherwise patch looks good to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, looks good to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a small nit so I'll just approve it
Thanks, @szehon-ho for merging this and all for the review. Will submit a backport PR to other spark/flink versions. |
We use
ConstantReader
for the partition column, and theConstantReader
fieldcolumn
isNullReader.NULL_COLUMN
. When theConstantReader
or it's parent(when the parent has the only the constant children) wrapped intoOptionReader
, theOptionReader
will always return null values because the following code:Closes #4626