Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet: Fixes get null values for the nested field partition column #4627

Merged
merged 8 commits into from
Nov 18, 2022

Conversation

ConeyLiu
Copy link
Contributor

@ConeyLiu ConeyLiu commented Apr 25, 2022

We use ConstantReader for the partition column, and the ConstantReader field column is NullReader.NULL_COLUMN. When the ConstantReader or it's parent(when the parent has the only the constant children) wrapped into OptionReader, the OptionReader will always return null values because the following code:

@Override
public T read(T reuse) {
  if (column.currentDefinitionLevel() > definitionLevel) {  // the `ConstantReader.currentDefinitionLevel` is always 0
    return reader.read(reuse);
  }

  for (TripleIterator<?> child : children) {
    child.nextNull();
  }

  return null;
}

Closes #4626

@@ -126,6 +131,48 @@ public void testBasicProjection() throws IOException {
TestHelpers.assertRows(result, expected);
}

@Test
public void testReadPartitionColumn() throws Exception {
Assume.assumeTrue("Temporary skip ORC", FileFormat.ORC != fileFormat);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Temporary skip ORC test because #4599 is working on fixing it.


@Test
public void testReadPartitionColumn() throws Exception {
Assume.assumeTrue("Temporary skip ORC", !"orc".equals(format));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as here too, temporary skip ORC test because #4599 is working on fixing it.

@ConeyLiu ConeyLiu changed the title Parquet: Fixes got null values for partition column which paritioned by nested filed Parquet: Fixes get null values for the nested field partition column Apr 25, 2022
@ConeyLiu
Copy link
Contributor Author

Thanks, @kbendick for the detailed code reviewing. Comments have been addressed. Please take another look when you are free.

Copy link
Contributor

@kbendick kbendick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @ConeyLiu! Thanks for the PR. Sorry for the delay in reviewing it, this seems important.

I tested this locally and you are absolutely correct, this is an issue.

Could you rebase this off of latest master @ConeyLiu? There have been changes in TestFlinkInputFormat class it seems (and we're on 1.15 now, technically, but that's not too urgent).

cc @rdblue

@@ -112,11 +116,14 @@ public ParquetValueReader<RowData> struct(Types.StructType expected, GroupType s
List<ParquetValueReader<?>> reorderedFields = Lists.newArrayListWithExpectedSize(
expectedFields.size());
List<Type> types = Lists.newArrayListWithExpectedSize(expectedFields.size());
// Inferring MaxDefinitionLevel from parent field
int inferredMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Now that we're putting the field depth into the map maxDefinitionLevelsById, and we have the same check for idToConstant.containsKey(id), do we need to have this fallback?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kbendick, we could not update the maxDefinitionLevelsById if the fieldType.getId() is null, you could see it at L101.

@ConeyLiu
Copy link
Contributor Author

Thanks, @kbendick @rdblue for the review. The comments have been addressed.

@ConeyLiu
Copy link
Contributor Author

Hi @kbendick, is there any more advice for this?

Copy link
Collaborator

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice observation and fix. It looks good to me. Maybe worth to have some others look at it as well, @RussellSpitzer @aokolnychyi ?

for (Types.NestedField field : expectedFields) {
int id = field.fieldId();
if (idToConstant.containsKey(id)) {
// containsKey is used because the constant may be null
reorderedFields.add(ParquetValueReaders.constant(idToConstant.get(id)));
int fieldMaxDefinitionLevel = maxDefinitionLevelsById.getOrDefault(id, inferredMaxDefinitionLevel);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im kind of new to this code, so curious, why do we take from parent? Is it because the field is indeed null here and we will thus just take parent's definition level?

Should we call it parentMaxDefinitionLevel?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The max definition level of the current column path is: type.getMaxDefinitionLevel(currentPath()) - 1, the children should be type.getMaxDefinitionLevel(currentPath()). So the fieldMaxDefinitionLevel is the inferred children max definition level, I have updated the comments. And it is used only when we could not find the value from maxDefinitionLevelsById.

Copy link
Collaborator

@szehon-ho szehon-ho Nov 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I still think the name 'inferred' is a bit confusing, it indicates to me that it's the one that will be chosen. Will it be better to call it 'defaultMaxDefintionLevel' or 'parentMaxDefinitionLevel'?

Also just curious, what is the example of a case where we need this default value? I tried to walk through one example and found the expected field is always in the actual struct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to defaultMaxDefinitionLevel

@pvary
Copy link
Contributor

pvary commented Nov 16, 2022

@ConeyLiu: Thanks for the finding and the fix.
May I ask you to put the fix in the main branches for Spark (3.3) and Flink (1.16) first, and then with another PR we can backport to all of the relevant branches.

Do we have the same issue for ORC and Avro too?

@ConeyLiu
Copy link
Contributor Author

Thanks @szehon-ho @pvary for the review.

May I ask you to put the fix in the main branches for Spark (3.3) and Flink (1.16) first, and then with another PR we can backport to all of the relevant branches.

Updated to Spark 3.3 and Flink 1.16, the Pig part is kept.

Do we have the same issue for ORC and Avro too?

Avro is OK, while ORC has a similar problem at here #4604.

@@ -148,6 +149,9 @@ public ParquetValueReader<?> struct(
int id = fieldType.getId().intValue();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szehon-ho here the fieldType.getId() could be null, I guess this is for a compatible purpose. So I add the int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); in the following code in case we get a null value from maxDefinitionLevelsById.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Yea that's ok to me, just in my understanding I'm not entirely sure when it is ever null.

Sorry would you be able to change the comment as well to "Defaulting to parent max definition level" or something like that? Otherwise patch looks good to me

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, looks good to me.

Copy link
Collaborator

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a small nit so I'll just approve it

@szehon-ho szehon-ho merged commit 25335a0 into apache:master Nov 18, 2022
@szehon-ho
Copy link
Collaborator

Thanks @ConeyLiu for fix, and @pvary and @kbendick for additional review

@ConeyLiu
Copy link
Contributor Author

Thanks, @szehon-ho for merging this and all for the review. Will submit a backport PR to other spark/flink versions.

@ConeyLiu ConeyLiu deleted the partition_null branch November 21, 2022 02:10
pvary pushed a commit that referenced this pull request Dec 2, 2022
pvary pushed a commit that referenced this pull request Dec 2, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Get null values for for the nested field partition column
5 participants