Skip to content

[SPARK-26447][SQL]Allow OrcColumnarBatchReader to return less partition columns #23387

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.stream.IntStream;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
Expand Down Expand Up @@ -58,17 +59,23 @@ public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {

/**
* The column IDs of the physical ORC file schema which are required by this reader.
* -1 means this required column doesn't exist in the ORC file.
* -1 means this required column is partition column, or it doesn't exist in the ORC file.
Copy link
Contributor

@cloud-fan cloud-fan Dec 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need more comments here.

Ideally partition column should never appear in the physical file, and should only appear in the directory name. However, Spark is OK with partition columns inside physical file, but Spark will discard the values from the file, and use the partition value got from directory name. The column order will be reserved though.

* Ideally partition column should never appear in the physical file, and should only appear
* in the directory name. However, Spark allows partition columns inside physical file,
* but Spark will discard the values from the file, and use the partition value got from
* directory name. The column order will be reserved though.
*/
private int[] requestedColIds;
@VisibleForTesting
public int[] requestedDataColIds;

// Record reader from ORC row batch.
private org.apache.orc.RecordReader recordReader;

private StructField[] requiredFields;

// The result columnar batch for vectorized execution by whole-stage codegen.
private ColumnarBatch columnarBatch;
@VisibleForTesting
public ColumnarBatch columnarBatch;

// Writable column vectors of the result columnar batch.
private WritableColumnVector[] columnVectors;
Expand Down Expand Up @@ -143,75 +150,75 @@ public void initialize(
/**
* Initialize columnar batch by setting required schema and partition information.
* With this information, this creates ColumnarBatch with the full schema.
*
* @param orcSchema Schema from ORC file reader.
* @param requiredFields All the fields that are required to return, including partition fields.
* @param requestedDataColIds Requested column ids from orcSchema. -1 if not existed.
* @param requestedPartitionColIds Requested column ids from partition schema. -1 if not existed.
* @param partitionValues Values of partition columns.
*/
public void initBatch(
TypeDescription orcSchema,
int[] requestedColIds,
StructField[] requiredFields,
StructType partitionSchema,
int[] requestedDataColIds,
int[] requestedPartitionColIds,
InternalRow partitionValues) {
batch = orcSchema.createRowBatch(capacity);
assert(!batch.selectedInUse); // `selectedInUse` should be initialized with `false`.

assert(requiredFields.length == requestedDataColIds.length);
assert(requiredFields.length == requestedPartitionColIds.length);
// If a required column is also partition column, use partition value and don't read from file.
for (int i = 0; i < requiredFields.length; i++) {
if (requestedPartitionColIds[i] != -1) {
requestedDataColIds[i] = -1;
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this loop work as expected? The intention seems to be clear, but here, we initialized like the following.

val requestedDataColIds = requestedColIds ++ Array.fill(partitionSchema.length)(-1)
val requestedPartitionColIds = Array.fill(requiredSchema.length)(-1) ++ Range(0, partitionSchema.length)

So, logically, in this for loop, the range of i satisfying requestedPartitionColIds != -1 seems to be filled with Array.fill(partitionSchema.length)(-1)? Did I understand correct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This is because in https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L191
the require schema always filter out the all partition columns.

Now It can be easily fixed in ORC V2, but to fix FileFormat it may affect the Parquet reader as well.
In this PR, I will check the requestedPartitionColIds in requiredSchema, so that it will be easier if the someday the improvement is made for FileFormat.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The suggested test suite also covers this logic.

this.requiredFields = requiredFields;
this.requestedColIds = requestedColIds;
assert(requiredFields.length == requestedColIds.length);
this.requestedDataColIds = requestedDataColIds;

StructType resultSchema = new StructType(requiredFields);
for (StructField f : partitionSchema.fields()) {
resultSchema = resultSchema.add(f);
}

if (copyToSpark) {
if (MEMORY_MODE == MemoryMode.OFF_HEAP) {
columnVectors = OffHeapColumnVector.allocateColumns(capacity, resultSchema);
} else {
columnVectors = OnHeapColumnVector.allocateColumns(capacity, resultSchema);
}

// Initialize the missing columns once.
// Initialize the partition columns and missing columns once.
for (int i = 0; i < requiredFields.length; i++) {
if (requestedColIds[i] == -1) {
if (requestedPartitionColIds[i] != -1) {
ColumnVectorUtils.populate(columnVectors[i],
partitionValues, requestedPartitionColIds[i]);
columnVectors[i].setIsConstant();
} else if (requestedDataColIds[i] == -1) {
columnVectors[i].putNulls(0, capacity);
columnVectors[i].setIsConstant();
}
}

if (partitionValues.numFields() > 0) {
int partitionIdx = requiredFields.length;
for (int i = 0; i < partitionValues.numFields(); i++) {
ColumnVectorUtils.populate(columnVectors[i + partitionIdx], partitionValues, i);
columnVectors[i + partitionIdx].setIsConstant();
}
}

columnarBatch = new ColumnarBatch(columnVectors);
} else {
// Just wrap the ORC column vector instead of copying it to Spark column vector.
orcVectorWrappers = new org.apache.spark.sql.vectorized.ColumnVector[resultSchema.length()];

for (int i = 0; i < requiredFields.length; i++) {
DataType dt = requiredFields[i].dataType();
int colId = requestedColIds[i];
// Initialize the missing columns once.
if (colId == -1) {
OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt);
missingCol.putNulls(0, capacity);
missingCol.setIsConstant();
orcVectorWrappers[i] = missingCol;
} else {
orcVectorWrappers[i] = new OrcColumnVector(dt, batch.cols[colId]);
}
}

if (partitionValues.numFields() > 0) {
int partitionIdx = requiredFields.length;
for (int i = 0; i < partitionValues.numFields(); i++) {
DataType dt = partitionSchema.fields()[i].dataType();
if (requestedPartitionColIds[i] != -1) {
OnHeapColumnVector partitionCol = new OnHeapColumnVector(capacity, dt);
ColumnVectorUtils.populate(partitionCol, partitionValues, i);
ColumnVectorUtils.populate(partitionCol, partitionValues, requestedPartitionColIds[i]);
partitionCol.setIsConstant();
orcVectorWrappers[partitionIdx + i] = partitionCol;
orcVectorWrappers[i] = partitionCol;
} else {
int colId = requestedDataColIds[i];
// Initialize the missing columns once.
if (colId == -1) {
OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt);
missingCol.putNulls(0, capacity);
missingCol.setIsConstant();
orcVectorWrappers[i] = missingCol;
} else {
orcVectorWrappers[i] = new OrcColumnVector(dt, batch.cols[colId]);
}
}
}

Expand All @@ -233,7 +240,7 @@ private boolean nextBatch() throws IOException {

if (!copyToSpark) {
for (int i = 0; i < requiredFields.length; i++) {
if (requestedColIds[i] != -1) {
if (requestedDataColIds[i] != -1) {
((OrcColumnVector) orcVectorWrappers[i]).setBatchSize(batchSize);
}
}
Expand All @@ -248,8 +255,8 @@ private boolean nextBatch() throws IOException {
StructField field = requiredFields[i];
WritableColumnVector toColumn = columnVectors[i];

if (requestedColIds[i] >= 0) {
ColumnVector fromColumn = batch.cols[requestedColIds[i]];
if (requestedDataColIds[i] >= 0) {
ColumnVector fromColumn = batch.cols[requestedDataColIds[i]];

if (fromColumn.isRepeating) {
putRepeatingValues(batchSize, field, fromColumn, toColumn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,15 @@ class OrcFileFormat
// after opening a file.
val iter = new RecordReaderIterator(batchReader)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))

val requestedDataColIds = requestedColIds ++ Array.fill(partitionSchema.length)(-1)
val requestedPartitionColIds =
Array.fill(requiredSchema.length)(-1) ++ Range(0, partitionSchema.length)
batchReader.initialize(fileSplit, taskAttemptContext)
batchReader.initBatch(
reader.getSchema,
requestedColIds,
requiredSchema.fields,
partitionSchema,
resultSchema.fields,
requestedDataColIds,
requestedPartitionColIds,
file.partitionValues)

iter.asInstanceOf[Iterator[InternalRow]]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.orc

import org.apache.orc.TypeDescription

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector}
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.unsafe.types.UTF8String.fromString

class OrcColumnarBatchReaderSuite extends QueryTest with SQLTestUtils with SharedSQLContext {
private val dataSchema = StructType.fromDDL("col1 int, col2 int")
private val partitionSchema = StructType.fromDDL("p1 string, p2 string")
private val partitionValues = InternalRow(fromString("partValue1"), fromString("partValue2"))
private val orcFileSchemaList = Seq(
"struct<col1:int,col2:int>", "struct<col1:int,col2:int,p1:string,p2:string>",
"struct<col1:int,col2:int,p1:string>", "struct<col1:int,col2:int,p2:string>")
orcFileSchemaList.foreach { case schema =>
val orcFileSchema = TypeDescription.fromString(schema)

val isConstant = classOf[WritableColumnVector].getDeclaredField("isConstant")
isConstant.setAccessible(true)

def getReader(
requestedDataColIds: Array[Int],
requestedPartitionColIds: Array[Int],
resultFields: Array[StructField]): OrcColumnarBatchReader = {
val reader = new OrcColumnarBatchReader(false, false, 4096)
reader.initBatch(
orcFileSchema,
resultFields,
requestedDataColIds,
requestedPartitionColIds,
partitionValues)
reader
}

test(s"all partitions are requested: $schema") {
val requestedDataColIds = Array(0, 1, 0, 0)
val requestedPartitionColIds = Array(-1, -1, 0, 1)
val reader = getReader(requestedDataColIds, requestedPartitionColIds,
dataSchema.fields ++ partitionSchema.fields)
assert(reader.requestedDataColIds === Array(0, 1, -1, -1))
}

test(s"initBatch should initialize requested partition columns only: $schema") {
val requestedDataColIds = Array(0, -1) // only `col1` is requested, `col2` doesn't exist
val requestedPartitionColIds = Array(-1, 0) // only `p1` is requested
val reader = getReader(requestedDataColIds, requestedPartitionColIds,
Array(dataSchema.fields(0), partitionSchema.fields(0)))
val batch = reader.columnarBatch
assert(batch.numCols() === 2)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we can see the result columns is pruned.


assert(batch.column(0).isInstanceOf[OrcColumnVector])
assert(batch.column(1).isInstanceOf[OnHeapColumnVector])

val p1 = batch.column(1).asInstanceOf[OnHeapColumnVector]
assert(isConstant.get(p1).asInstanceOf[Boolean]) // Partition column is constant.
assert(p1.getUTF8String(0) === partitionValues.getUTF8String(0))
}
}
}