Skip to content

[SPARK-26447][SQL]Allow OrcColumnarBatchReader to return less partition columns #23387

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

gengliangwang
Copy link
Member

What changes were proposed in this pull request?

Currently OrcColumnarBatchReader returns all the partition column values in the batch read.
In data source V2, we can improve it by returning the required partition column values only.

This PR is part of #23383 . As @cloud-fan suggested, create a new PR to make review easier.

Also, this PR doesn't improve OrcFileFormat, since in the method buildReaderWithPartitionValues, the requiredSchema filter out all the partition columns, so we can't know which partition column is required.

How was this patch tested?

Unit test

@@ -58,10 +58,16 @@

/**
* The column IDs of the physical ORC file schema which are required by this reader.
* -1 means this required column doesn't exist in the ORC file.
* -1 means this required column is partition column, or it doesn't exist in the ORC file.
Copy link
Contributor

@cloud-fan cloud-fan Dec 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need more comments here.

Ideally partition column should never appear in the physical file, and should only appear in the directory name. However, Spark is OK with partition columns inside physical file, but Spark will discard the values from the file, and use the partition value got from directory name. The column order will be reserved though.

*/
private int[] requestedColIds;

/**
* The column IDs of the ORC file partition schema which are required by this reader.
* -1 means this required column doesn't exist in the ORC partition columns.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-1 means this required column is not a partition column

*/
private int[] requestedColIds;

/**
* The column IDs of the ORC file partition schema which are required by this reader.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not column ID, it's the index of the partition column

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SparkQA
Copy link

SparkQA commented Dec 26, 2018

Test build #100457 has finished for PR 23387 at commit 799f429.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

adding @dongjoon-hyun

@dongjoon-hyun
Copy link
Member

Thank you for pinging me, @HyukjinKwon .

val requestedColIds = requestedColIdsOrEmptyFile.get
assert(requestedColIds.length == requiredSchema.length,
val requestedColIds =
requestedColIdsOrEmptyFile.get ++ Array.fill(partitionSchema.length)(-1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This semantic change also affects non-vectorized code path. Can we isolate the scope of this PR?

assert(requestedColIds.length == requiredSchema.length,
val requestedColIds =
requestedColIdsOrEmptyFile.get ++ Array.fill(partitionSchema.length)(-1)
assert(requestedColIds.length == resultSchema.length,
"[BUG] requested column IDs do not match required schema")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you adjust the error message according to your PR? required schema is not used in the new assertion.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @gengliangwang .
It seems that we missed new test cases for the new feature suggested here; Allow OrcColumnarBatchReader to return less partition columns. Could you add some?

@gengliangwang
Copy link
Member Author

gengliangwang commented Dec 27, 2018

@cloud-fan @dongjoon-hyun Thanks a lot for the review.

@dongjoon-hyun I tried adding a test case for the improvement. But the implementation seems too low level for constructing and initializing OrcColumnarBatchReader.
This PR is mainly for Orc V2 migration. It is a independent PR since the code context is a bit complex. I prefer to add test case for checking if the output row of Orc V2 reader is pruned.
Is that OK to you?

@cloud-fan
Copy link
Contributor

LGTM. How would you add test? IIUC this is just a code refactor now, nothing will be changed. It will become a real optimization when migrating orc to v2 API.

@SparkQA
Copy link

SparkQA commented Dec 27, 2018

Test build #100473 has finished for PR 23387 at commit 49ae28b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

if (requestedPartitionColIds[i] != -1) {
requestedDataColIds[i] = -1;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this loop work as expected? The intention seems to be clear, but here, we initialized like the following.

val requestedDataColIds = requestedColIds ++ Array.fill(partitionSchema.length)(-1)
val requestedPartitionColIds = Array.fill(requiredSchema.length)(-1) ++ Range(0, partitionSchema.length)

So, logically, in this for loop, the range of i satisfying requestedPartitionColIds != -1 seems to be filled with Array.fill(partitionSchema.length)(-1)? Did I understand correct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This is because in https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L191
the require schema always filter out the all partition columns.

Now It can be easily fixed in ORC V2, but to fix FileFormat it may affect the Parquet reader as well.
In this PR, I will check the requestedPartitionColIds in requiredSchema, so that it will be easier if the someday the improvement is made for FileFormat.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The suggested test suite also covers this logic.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Dec 27, 2018

@cloud-fan and @gengliangwang .
Could you review and merge gengliangwang#3 to this PR?
This PR is claiming an improvement by returning the required partition column values only. We had better add the test coverage on the newly added logic here.

IIUC this is just a code refactor now, nothing will be changed

@gengliangwang
Copy link
Member Author

@dongjoon-hyun Thanks for the test suite. I have merge and updated the test case. Please review it again.

val reader = getReader(requestedDataColIds, requestedPartitionColIds,
Array(dataSchema.fields(0), partitionSchema.fields(0)))
val batch = reader.columnarBatch
assert(batch.numCols() === 2)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we can see the result columns is pruned.

@SparkQA
Copy link

SparkQA commented Dec 28, 2018

Test build #100489 has finished for PR 23387 at commit 1b09dae.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 28, 2018

Test build #100491 has finished for PR 23387 at commit b87ea1e.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Retest this please

@SparkQA
Copy link

SparkQA commented Dec 28, 2018

Test build #100493 has finished for PR 23387 at commit b87ea1e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Dec 28, 2018

Test build #100498 has finished for PR 23387 at commit 1b58df8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -539,6 +539,25 @@ object PartitioningUtils {
}).asNullable
}

def requestedPartitionColumnIds(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have a more intuitive name? This function name looks weird to me because requestedPartitionColumnIds returns full schema.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The returned value depends on the parameter requiredSchema. The parameter can be full schema or requested schema.
Do you have suggestion for the method name?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't mind, I prefer to revert the last commit, 1b58df8 .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I have reverted it.

@SparkQA
Copy link

SparkQA commented Jan 1, 2019

Test build #100613 has finished for PR 23387 at commit 5ed34d8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in e2dbafd Jan 3, 2019
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
…ion columns

## What changes were proposed in this pull request?

Currently OrcColumnarBatchReader returns all the partition column values in the batch read.
In data source V2, we can improve it by returning the required partition column values only.

This PR is part of apache#23383 . As cloud-fan suggested, create a new PR to make review easier.

Also, this PR doesn't improve `OrcFileFormat`, since in the method `buildReaderWithPartitionValues`, the `requiredSchema` filter out all the partition columns, so we can't know which partition column is required.

## How was this patch tested?

Unit test

Closes apache#23387 from gengliangwang/refactorOrcColumnarBatch.

Lead-authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Co-authored-by: Gengliang Wang <ltnwgl@gmail.com>
Co-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants