-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-26447][SQL]Allow OrcColumnarBatchReader to return less partition columns #23387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-26447][SQL]Allow OrcColumnarBatchReader to return less partition columns #23387
Conversation
@@ -58,10 +58,16 @@ | |||
|
|||
/** | |||
* The column IDs of the physical ORC file schema which are required by this reader. | |||
* -1 means this required column doesn't exist in the ORC file. | |||
* -1 means this required column is partition column, or it doesn't exist in the ORC file. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need more comments here.
Ideally partition column should never appear in the physical file, and should only appear in the directory name. However, Spark is OK with partition columns inside physical file, but Spark will discard the values from the file, and use the partition value got from directory name. The column order will be reserved though.
*/ | ||
private int[] requestedColIds; | ||
|
||
/** | ||
* The column IDs of the ORC file partition schema which are required by this reader. | ||
* -1 means this required column doesn't exist in the ORC partition columns. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-1 means this required column is not a partition column
*/ | ||
private int[] requestedColIds; | ||
|
||
/** | ||
* The column IDs of the ORC file partition schema which are required by this reader. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's not column ID, it's the index of the partition column
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test build #100457 has finished for PR 23387 at commit
|
adding @dongjoon-hyun |
Thank you for pinging me, @HyukjinKwon . |
val requestedColIds = requestedColIdsOrEmptyFile.get | ||
assert(requestedColIds.length == requiredSchema.length, | ||
val requestedColIds = | ||
requestedColIdsOrEmptyFile.get ++ Array.fill(partitionSchema.length)(-1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This semantic change also affects non-vectorized code path. Can we isolate the scope of this PR?
assert(requestedColIds.length == requiredSchema.length, | ||
val requestedColIds = | ||
requestedColIdsOrEmptyFile.get ++ Array.fill(partitionSchema.length)(-1) | ||
assert(requestedColIds.length == resultSchema.length, | ||
"[BUG] requested column IDs do not match required schema") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you adjust the error message according to your PR? required schema
is not used in the new assertion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @gengliangwang .
It seems that we missed new test cases for the new feature suggested here; Allow OrcColumnarBatchReader to return less partition columns
. Could you add some?
@cloud-fan @dongjoon-hyun Thanks a lot for the review. @dongjoon-hyun I tried adding a test case for the improvement. But the implementation seems too low level for constructing and initializing |
LGTM. How would you add test? IIUC this is just a code refactor now, nothing will be changed. It will become a real optimization when migrating orc to v2 API. |
Test build #100473 has finished for PR 23387 at commit
|
if (requestedPartitionColIds[i] != -1) { | ||
requestedDataColIds[i] = -1; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this loop work as expected? The intention seems to be clear, but here, we initialized like the following.
val requestedDataColIds = requestedColIds ++ Array.fill(partitionSchema.length)(-1)
val requestedPartitionColIds = Array.fill(requiredSchema.length)(-1) ++ Range(0, partitionSchema.length)
So, logically, in this for
loop, the range of i
satisfying requestedPartitionColIds != -1
seems to be filled with Array.fill(partitionSchema.length)(-1)
? Did I understand correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. This is because in https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L191
the require schema always filter out the all partition columns.
Now It can be easily fixed in ORC V2, but to fix FileFormat
it may affect the Parquet reader as well.
In this PR, I will check the requestedPartitionColIds
in requiredSchema
, so that it will be easier if the someday the improvement is made for FileFormat
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The suggested test suite also covers this logic.
@cloud-fan and @gengliangwang .
|
Add `OrcColumnarBatchReaderSuite`
@dongjoon-hyun Thanks for the test suite. I have merge and updated the test case. Please review it again. |
val reader = getReader(requestedDataColIds, requestedPartitionColIds, | ||
Array(dataSchema.fields(0), partitionSchema.fields(0))) | ||
val batch = reader.columnarBatch | ||
assert(batch.numCols() === 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we can see the result columns is pruned.
Test build #100489 has finished for PR 23387 at commit
|
Test build #100491 has finished for PR 23387 at commit
|
Retest this please |
Test build #100493 has finished for PR 23387 at commit
|
Test build #100498 has finished for PR 23387 at commit
|
@@ -539,6 +539,25 @@ object PartitioningUtils { | |||
}).asNullable | |||
} | |||
|
|||
def requestedPartitionColumnIds( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a more intuitive name? This function name looks weird to me because requestedPartitionColumnIds
returns full schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The returned value depends on the parameter requiredSchema
. The parameter can be full schema or requested schema.
Do you have suggestion for the method name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you don't mind, I prefer to revert the last commit, 1b58df8 .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I have reverted it.
This reverts commit 1b58df8.
Test build #100613 has finished for PR 23387 at commit
|
thanks, merging to master! |
…ion columns ## What changes were proposed in this pull request? Currently OrcColumnarBatchReader returns all the partition column values in the batch read. In data source V2, we can improve it by returning the required partition column values only. This PR is part of apache#23383 . As cloud-fan suggested, create a new PR to make review easier. Also, this PR doesn't improve `OrcFileFormat`, since in the method `buildReaderWithPartitionValues`, the `requiredSchema` filter out all the partition columns, so we can't know which partition column is required. ## How was this patch tested? Unit test Closes apache#23387 from gengliangwang/refactorOrcColumnarBatch. Lead-authored-by: Gengliang Wang <gengliang.wang@databricks.com> Co-authored-by: Gengliang Wang <ltnwgl@gmail.com> Co-authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Currently OrcColumnarBatchReader returns all the partition column values in the batch read.
In data source V2, we can improve it by returning the required partition column values only.
This PR is part of #23383 . As @cloud-fan suggested, create a new PR to make review easier.
Also, this PR doesn't improve
OrcFileFormat
, since in the methodbuildReaderWithPartitionValues
, therequiredSchema
filter out all the partition columns, so we can't know which partition column is required.How was this patch tested?
Unit test