-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-26447][SQL]Allow OrcColumnarBatchReader to return less partition columns #23387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
799f429
49ae28b
a3a5741
1b09dae
b87ea1e
1b58df8
5ed34d8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ | |
import java.io.IOException; | ||
import java.util.stream.IntStream; | ||
|
||
import com.google.common.annotations.VisibleForTesting; | ||
import org.apache.hadoop.conf.Configuration; | ||
import org.apache.hadoop.mapreduce.InputSplit; | ||
import org.apache.hadoop.mapreduce.RecordReader; | ||
|
@@ -58,17 +59,23 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> { | |
|
||
/** | ||
* The column IDs of the physical ORC file schema which are required by this reader. | ||
* -1 means this required column doesn't exist in the ORC file. | ||
* -1 means this required column is partition column, or it doesn't exist in the ORC file. | ||
* Ideally partition column should never appear in the physical file, and should only appear | ||
* in the directory name. However, Spark allows partition columns inside physical file, | ||
* but Spark will discard the values from the file, and use the partition value got from | ||
* directory name. The column order will be reserved though. | ||
*/ | ||
private int[] requestedColIds; | ||
@VisibleForTesting | ||
public int[] requestedDataColIds; | ||
|
||
// Record reader from ORC row batch. | ||
private org.apache.orc.RecordReader recordReader; | ||
|
||
private StructField[] requiredFields; | ||
|
||
// The result columnar batch for vectorized execution by whole-stage codegen. | ||
private ColumnarBatch columnarBatch; | ||
@VisibleForTesting | ||
public ColumnarBatch columnarBatch; | ||
|
||
// Writable column vectors of the result columnar batch. | ||
private WritableColumnVector[] columnVectors; | ||
|
@@ -143,75 +150,75 @@ public void initialize( | |
/** | ||
* Initialize columnar batch by setting required schema and partition information. | ||
* With this information, this creates ColumnarBatch with the full schema. | ||
* | ||
* @param orcSchema Schema from ORC file reader. | ||
* @param requiredFields All the fields that are required to return, including partition fields. | ||
* @param requestedDataColIds Requested column ids from orcSchema. -1 if not existed. | ||
* @param requestedPartitionColIds Requested column ids from partition schema. -1 if not existed. | ||
* @param partitionValues Values of partition columns. | ||
*/ | ||
public void initBatch( | ||
TypeDescription orcSchema, | ||
int[] requestedColIds, | ||
StructField[] requiredFields, | ||
StructType partitionSchema, | ||
int[] requestedDataColIds, | ||
int[] requestedPartitionColIds, | ||
InternalRow partitionValues) { | ||
batch = orcSchema.createRowBatch(capacity); | ||
assert(!batch.selectedInUse); // `selectedInUse` should be initialized with `false`. | ||
|
||
assert(requiredFields.length == requestedDataColIds.length); | ||
assert(requiredFields.length == requestedPartitionColIds.length); | ||
// If a required column is also partition column, use partition value and don't read from file. | ||
for (int i = 0; i < requiredFields.length; i++) { | ||
if (requestedPartitionColIds[i] != -1) { | ||
requestedDataColIds[i] = -1; | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this loop work as expected? The intention seems to be clear, but here, we initialized like the following. val requestedDataColIds = requestedColIds ++ Array.fill(partitionSchema.length)(-1)
val requestedPartitionColIds = Array.fill(requiredSchema.length)(-1) ++ Range(0, partitionSchema.length) So, logically, in this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. This is because in https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L191 Now It can be easily fixed in ORC V2, but to fix There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The suggested test suite also covers this logic. |
||
this.requiredFields = requiredFields; | ||
this.requestedColIds = requestedColIds; | ||
assert(requiredFields.length == requestedColIds.length); | ||
this.requestedDataColIds = requestedDataColIds; | ||
|
||
StructType resultSchema = new StructType(requiredFields); | ||
for (StructField f : partitionSchema.fields()) { | ||
resultSchema = resultSchema.add(f); | ||
} | ||
|
||
if (copyToSpark) { | ||
if (MEMORY_MODE == MemoryMode.OFF_HEAP) { | ||
columnVectors = OffHeapColumnVector.allocateColumns(capacity, resultSchema); | ||
} else { | ||
columnVectors = OnHeapColumnVector.allocateColumns(capacity, resultSchema); | ||
} | ||
|
||
// Initialize the missing columns once. | ||
// Initialize the partition columns and missing columns once. | ||
for (int i = 0; i < requiredFields.length; i++) { | ||
if (requestedColIds[i] == -1) { | ||
if (requestedPartitionColIds[i] != -1) { | ||
ColumnVectorUtils.populate(columnVectors[i], | ||
partitionValues, requestedPartitionColIds[i]); | ||
columnVectors[i].setIsConstant(); | ||
} else if (requestedDataColIds[i] == -1) { | ||
columnVectors[i].putNulls(0, capacity); | ||
columnVectors[i].setIsConstant(); | ||
} | ||
} | ||
|
||
if (partitionValues.numFields() > 0) { | ||
int partitionIdx = requiredFields.length; | ||
for (int i = 0; i < partitionValues.numFields(); i++) { | ||
ColumnVectorUtils.populate(columnVectors[i + partitionIdx], partitionValues, i); | ||
columnVectors[i + partitionIdx].setIsConstant(); | ||
} | ||
} | ||
|
||
columnarBatch = new ColumnarBatch(columnVectors); | ||
} else { | ||
// Just wrap the ORC column vector instead of copying it to Spark column vector. | ||
orcVectorWrappers = new org.apache.spark.sql.vectorized.ColumnVector[resultSchema.length()]; | ||
|
||
for (int i = 0; i < requiredFields.length; i++) { | ||
DataType dt = requiredFields[i].dataType(); | ||
int colId = requestedColIds[i]; | ||
// Initialize the missing columns once. | ||
if (colId == -1) { | ||
OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt); | ||
missingCol.putNulls(0, capacity); | ||
missingCol.setIsConstant(); | ||
orcVectorWrappers[i] = missingCol; | ||
} else { | ||
orcVectorWrappers[i] = new OrcColumnVector(dt, batch.cols[colId]); | ||
} | ||
} | ||
|
||
if (partitionValues.numFields() > 0) { | ||
int partitionIdx = requiredFields.length; | ||
for (int i = 0; i < partitionValues.numFields(); i++) { | ||
DataType dt = partitionSchema.fields()[i].dataType(); | ||
if (requestedPartitionColIds[i] != -1) { | ||
OnHeapColumnVector partitionCol = new OnHeapColumnVector(capacity, dt); | ||
ColumnVectorUtils.populate(partitionCol, partitionValues, i); | ||
ColumnVectorUtils.populate(partitionCol, partitionValues, requestedPartitionColIds[i]); | ||
partitionCol.setIsConstant(); | ||
orcVectorWrappers[partitionIdx + i] = partitionCol; | ||
orcVectorWrappers[i] = partitionCol; | ||
} else { | ||
int colId = requestedDataColIds[i]; | ||
// Initialize the missing columns once. | ||
if (colId == -1) { | ||
OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt); | ||
missingCol.putNulls(0, capacity); | ||
missingCol.setIsConstant(); | ||
orcVectorWrappers[i] = missingCol; | ||
} else { | ||
orcVectorWrappers[i] = new OrcColumnVector(dt, batch.cols[colId]); | ||
} | ||
} | ||
} | ||
|
||
|
@@ -233,7 +240,7 @@ private boolean nextBatch() throws IOException { | |
|
||
if (!copyToSpark) { | ||
for (int i = 0; i < requiredFields.length; i++) { | ||
if (requestedColIds[i] != -1) { | ||
if (requestedDataColIds[i] != -1) { | ||
((OrcColumnVector) orcVectorWrappers[i]).setBatchSize(batchSize); | ||
} | ||
} | ||
|
@@ -248,8 +255,8 @@ private boolean nextBatch() throws IOException { | |
StructField field = requiredFields[i]; | ||
WritableColumnVector toColumn = columnVectors[i]; | ||
|
||
if (requestedColIds[i] >= 0) { | ||
ColumnVector fromColumn = batch.cols[requestedColIds[i]]; | ||
if (requestedDataColIds[i] >= 0) { | ||
ColumnVector fromColumn = batch.cols[requestedDataColIds[i]]; | ||
|
||
if (fromColumn.isRepeating) { | ||
putRepeatingValues(batchSize, field, fromColumn, toColumn); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.execution.datasources.orc | ||
|
||
import org.apache.orc.TypeDescription | ||
|
||
import org.apache.spark.sql.QueryTest | ||
import org.apache.spark.sql.catalyst.InternalRow | ||
import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector} | ||
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} | ||
import org.apache.spark.sql.types.{StructField, StructType} | ||
import org.apache.spark.unsafe.types.UTF8String.fromString | ||
|
||
class OrcColumnarBatchReaderSuite extends QueryTest with SQLTestUtils with SharedSQLContext { | ||
private val dataSchema = StructType.fromDDL("col1 int, col2 int") | ||
private val partitionSchema = StructType.fromDDL("p1 string, p2 string") | ||
private val partitionValues = InternalRow(fromString("partValue1"), fromString("partValue2")) | ||
private val orcFileSchemaList = Seq( | ||
"struct<col1:int,col2:int>", "struct<col1:int,col2:int,p1:string,p2:string>", | ||
"struct<col1:int,col2:int,p1:string>", "struct<col1:int,col2:int,p2:string>") | ||
orcFileSchemaList.foreach { case schema => | ||
val orcFileSchema = TypeDescription.fromString(schema) | ||
|
||
val isConstant = classOf[WritableColumnVector].getDeclaredField("isConstant") | ||
isConstant.setAccessible(true) | ||
|
||
def getReader( | ||
requestedDataColIds: Array[Int], | ||
requestedPartitionColIds: Array[Int], | ||
resultFields: Array[StructField]): OrcColumnarBatchReader = { | ||
val reader = new OrcColumnarBatchReader(false, false, 4096) | ||
reader.initBatch( | ||
orcFileSchema, | ||
resultFields, | ||
requestedDataColIds, | ||
requestedPartitionColIds, | ||
partitionValues) | ||
reader | ||
} | ||
|
||
test(s"all partitions are requested: $schema") { | ||
val requestedDataColIds = Array(0, 1, 0, 0) | ||
val requestedPartitionColIds = Array(-1, -1, 0, 1) | ||
val reader = getReader(requestedDataColIds, requestedPartitionColIds, | ||
dataSchema.fields ++ partitionSchema.fields) | ||
assert(reader.requestedDataColIds === Array(0, 1, -1, -1)) | ||
} | ||
|
||
test(s"initBatch should initialize requested partition columns only: $schema") { | ||
val requestedDataColIds = Array(0, -1) // only `col1` is requested, `col2` doesn't exist | ||
val requestedPartitionColIds = Array(-1, 0) // only `p1` is requested | ||
val reader = getReader(requestedDataColIds, requestedPartitionColIds, | ||
Array(dataSchema.fields(0), partitionSchema.fields(0))) | ||
val batch = reader.columnarBatch | ||
assert(batch.numCols() === 2) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here we can see the result columns is pruned. |
||
|
||
assert(batch.column(0).isInstanceOf[OrcColumnVector]) | ||
assert(batch.column(1).isInstanceOf[OnHeapColumnVector]) | ||
|
||
val p1 = batch.column(1).asInstanceOf[OnHeapColumnVector] | ||
assert(isConstant.get(p1).asInstanceOf[Boolean]) // Partition column is constant. | ||
assert(p1.getUTF8String(0) === partitionValues.getUTF8String(0)) | ||
} | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need more comments here.
Ideally partition column should never appear in the physical file, and should only appear in the directory name. However, Spark is OK with partition columns inside physical file, but Spark will discard the values from the file, and use the partition value got from directory name. The column order will be reserved though.