Skip to content

Partition level pruning 2 #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 47 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
6a34b15
improve the doc for "spark.memory.offHeap.size"
CodingCat Mar 7, 2016
3a2be8c
fix
CodingCat Mar 7, 2016
eb535d6
add configuration for partition_metadata
CodingCat Oct 27, 2017
f910b27
framework of CachedColumnarRDD
CodingCat Oct 27, 2017
846b032
code framework
CodingCat Oct 27, 2017
f511b6f
remove cachedcolumnarbatchRDD
CodingCat Oct 30, 2017
50f2612
temp
CodingCat Oct 31, 2017
2b945c9
'CachedColumnarRDD'
CodingCat Nov 1, 2017
89f0a98
change types
CodingCat Nov 1, 2017
5aa1808
fix compilation error
CodingCat Nov 1, 2017
a5adc56
update
CodingCat Nov 2, 2017
00b1642
fix storage level
CodingCat Nov 2, 2017
311fe5a
fix getOrCompute
CodingCat Nov 2, 2017
6411b82
evaluate with partition metadata
CodingCat Nov 2, 2017
41f6ad2
fix getOrCompute
CodingCat Nov 2, 2017
c50b743
add logging
CodingCat Nov 2, 2017
78f774f
add logging for skipped partition
CodingCat Nov 2, 2017
71456bd
try to print stats
CodingCat Nov 2, 2017
97544a6
add logging for skipped partition
CodingCat Nov 2, 2017
c131b2d
add logging for skipped partition
CodingCat Nov 2, 2017
d588fb0
add logging for skipped partition
CodingCat Nov 2, 2017
1ba1f80
refactor the code
CodingCat Nov 9, 2017
62f358d
fix compilation issue
CodingCat Nov 9, 2017
500d4fd
refactor the code
CodingCat Nov 9, 2017
63c5897
test
CodingCat Nov 9, 2017
9031eaf
fix compilation issue
CodingCat Nov 9, 2017
1692303
add missing filtering
CodingCat Nov 9, 2017
f9f3d20
test
CodingCat Nov 9, 2017
da5f06f
test
CodingCat Nov 9, 2017
2c0b6cd
fix rebundant read
CodingCat Nov 9, 2017
e1d8c43
compact iterators
CodingCat Nov 10, 2017
c900808
update
CodingCat Nov 10, 2017
2caa7fc
add first test case
CodingCat Nov 10, 2017
a9f1256
test for remove metadata block
CodingCat Nov 10, 2017
b5d6094
generate correct results when data block is removed
CodingCat Nov 11, 2017
3b51c9a
try to avoid unnecessary tasks
CodingCat Nov 21, 2017
59b0684
collect data in the fly
CodingCat Nov 21, 2017
3e972ce
fix the compilation issue
CodingCat Nov 21, 2017
35c9361
fix the compilation issue
CodingCat Nov 21, 2017
20b72a8
fix NPE
CodingCat Nov 21, 2017
bfa357a
fix NPE
CodingCat Nov 21, 2017
0fd6b68
fix imcompatibility of partition
CodingCat Nov 21, 2017
dc54be5
fix the parent inherience mechanism
CodingCat Nov 21, 2017
1afbf13
add missing filtering for cachedBatch
CodingCat Nov 22, 2017
accd549
remove CachedColumnarIterator
CodingCat Nov 24, 2017
a853ce6
fix filtering logic
CodingCat Nov 24, 2017
9d450ad
fix the failed test
CodingCat Nov 25, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion core/src/main/scala/org/apache/spark/storage/BlockId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.annotation.DeveloperApi
* If your BlockId should be serializable, be sure to add it to the BlockId.apply() method.
*/
@DeveloperApi
sealed abstract class BlockId {
abstract class BlockId {
/** A globally unique identifier for this Block. Can be used for ser/de. */
def name: String

Expand All @@ -49,6 +49,11 @@ case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
override def name: String = "rdd_" + rddId + "_" + splitIndex
}

@DeveloperApi
case class RDDPartitionMetadataBlockId(rddId: Int, splitIndex: Int) extends BlockId {
override def name: String = "rdd_" + rddId + "_" + splitIndex + ".metadata"
}

// Format of the shuffle block ids (including data and index) should be kept in sync with
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getBlockData().
@DeveloperApi
Expand Down Expand Up @@ -103,6 +108,7 @@ class UnrecognizedBlockId(name: String)
@DeveloperApi
object BlockId {
val RDD = "rdd_([0-9]+)_([0-9]+)".r
val PARTITION_METADATA = "rdd_([0-9]+)_([0-9]+).metadata".r
val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r
val SHUFFLE_DATA = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).data".r
val SHUFFLE_INDEX = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).index".r
Expand All @@ -116,6 +122,8 @@ object BlockId {
def apply(name: String): BlockId = name match {
case RDD(rddId, splitIndex) =>
RDDBlockId(rddId.toInt, splitIndex.toInt)
case PARTITION_METADATA(rddId, splitIndex) =>
RDDPartitionMetadataBlockId(rddId.toInt, splitIndex.toInt)
case SHUFFLE(shuffleId, mapId, reduceId) =>
ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt)
case SHUFFLE_DATA(shuffleId, mapId, reduceId) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ object SQLConf {
val IN_MEMORY_PARTITION_PRUNING =
buildConf("spark.sql.inMemoryColumnarStorage.partitionPruning")
.internal()
.doc("When true, enable partition pruning for in-memory columnar tables.")
.doc("When true, enable partition batch pruning for in-memory columnar tables.")
.booleanConf
.createWithDefault(true)

Expand All @@ -147,6 +147,14 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val IN_MEMORY_PARTITION_METADATA =
buildConf("spark.sql.inMemoryColumnarStorage.partitionMetadata")
.internal()
.doc("When true, spark sql will collect partition level stats for in-memory columnar" +
" tables and do coarse-grained pruning")
.booleanConf
.createWithDefault(false)

val PREFER_SORTMERGEJOIN = buildConf("spark.sql.join.preferSortMergeJoin")
.internal()
.doc("When true, prefer sort merge join over shuffle hash join.")
Expand Down Expand Up @@ -1219,6 +1227,8 @@ class SQLConf extends Serializable with Logging {

def offHeapColumnVectorEnabled: Boolean = getConf(COLUMN_VECTOR_OFFHEAP_ENABLED)

def inMemoryPartitionMetadata: Boolean = getConf(IN_MEMORY_PARTITION_METADATA)

def columnNameOfCorruptRecord: String = getConf(COLUMN_NAME_OF_CORRUPT_RECORD)

def broadcastTimeout: Long = getConf(BROADCAST_TIMEOUT)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.columnar

import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.storage.{RDDPartitionMetadataBlockId, StorageLevel}

private[columnar] class CachedColumnarRDD(
@transient private var _sc: SparkContext,
private var dataRDD: RDD[CachedBatch],
private[columnar] val containsPartitionMetadata: Boolean,
expectedStorageLevel: StorageLevel)
extends RDD[CachedBatch](_sc, Seq(new OneToOneDependency(dataRDD))) {

override def compute(split: Partition, context: TaskContext): Iterator[CachedBatch] = {
firstParent.iterator(split, context)
}

override def unpersist(blocking: Boolean = true): this.type = {
CachedColumnarRDD.allMetadataFetched.remove(id)
CachedColumnarRDD.rddIdToMetadata.remove(id)
super.unpersist(blocking)
}

override protected def getPartitions: Array[Partition] = dataRDD.partitions

override private[spark] def getOrCompute(split: Partition, context: TaskContext):
Iterator[CachedBatch] = {
val metadataBlockId = RDDPartitionMetadataBlockId(id, split.index)
val superGetOrCompute: (Partition, TaskContext) => Iterator[CachedBatch] = super.getOrCompute
SparkEnv.get.blockManager.getSingle[InternalRow](metadataBlockId).map(_ =>
superGetOrCompute(split, context)
).getOrElse {
val batchIter = superGetOrCompute(split, context)
if (containsPartitionMetadata && getStorageLevel != StorageLevel.NONE && batchIter.hasNext) {
val cachedBatch = batchIter.next()
SparkEnv.get.blockManager.putSingle(metadataBlockId, cachedBatch.stats,
expectedStorageLevel)
new InterruptibleIterator[CachedBatch](context, Iterator(cachedBatch))
} else {
batchIter
}
}
}
}

private[columnar] object CachedColumnarRDD {

private val rddIdToMetadata = new ConcurrentHashMap[Int, mutable.ArraySeq[Option[InternalRow]]]()
private val allMetadataFetched = new ConcurrentHashMap[Int, Boolean]()

def collectStats(rdd: RDD[CachedBatch]): IndexedSeq[Option[InternalRow]] = {
if (allMetadataFetched.containsKey(rdd.id)) {
rddIdToMetadata.get(rdd.id)
} else {
val updatedMetadataBlocks = rdd.partitions.indices.map {
partitionId => {
if (!rddIdToMetadata.containsKey(rdd.id)) {
val initSeq = new mutable.ArraySeq[Option[InternalRow]](rdd.partitions.length)
initSeq.indices.foreach(idx => initSeq(idx) = None)
rddIdToMetadata.put(rdd.id, initSeq)
}
rddIdToMetadata.get(rdd.id)(partitionId).orElse{
val metadata = SparkEnv.get.blockManager.getSingle[InternalRow](
RDDPartitionMetadataBlockId(rdd.id, partitionId))
rddIdToMetadata.get(rdd.id).update(partitionId, metadata)
metadata
}
}
}
if (updatedMetadataBlocks.forall(_.isDefined)) {
allMetadataFetched.put(rdd.id, true)
}
updatedMetadataBlocks
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.columnar

import org.apache.spark._
import org.apache.spark.rdd.RDD

private[columnar] class FilteredCachedColumnarPartition(
val partitionIndex: Int,
val parentPartition: Partition) extends Partition {

override def index: Int = partitionIndex
}

private class PartialDependency[T](rdd: RDD[T], partitions: Array[Partition])
extends NarrowDependency[T](rdd) {

override def getParents(partitionId: Int): Seq[Int] = {
List(partitions(partitionId).asInstanceOf[FilteredCachedColumnarPartition].
parentPartition.index)
}
}

private[columnar] class FilteredCachedColumnarRDD (
@transient private var _sc: SparkContext,
private var cachedColumnarRDD: CachedColumnarRDD,
acceptedPartitions: Seq[Partition])
extends RDD[CachedBatch](
_sc, Seq(new PartialDependency(cachedColumnarRDD, acceptedPartitions.toArray))) {

override def compute(split: Partition, context: TaskContext): Iterator[CachedBatch] = {
val filteredCachedColumnarPartition = split.asInstanceOf[FilteredCachedColumnarPartition]
firstParent.iterator(filteredCachedColumnarPartition.parentPartition, context)
}

override protected def getPartitions: Array[Partition] = acceptedPartitions.toArray

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,68 @@ object InMemoryRelation {
private[columnar]
case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow)

private[columnar] class CachedBatchIterator(
rowIterator: Iterator[InternalRow],
output: Seq[Attribute],
batchSize: Int,
useCompression: Boolean,
batchStats: LongAccumulator,
singleBatchPerPartition: Boolean) extends Iterator[CachedBatch] {

def next(): CachedBatch = {
val columnBuilders = output.map { attribute =>
ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression)
}.toArray

var rowCount = 0
var totalSize = 0L

val terminateLoop = (singleBatch: Boolean, rowIter: Iterator[InternalRow],
rowCount: Int, size: Long) => {
if (!singleBatch) {
rowIter.hasNext && rowCount < batchSize && totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE
} else {
rowIter.hasNext
}
}

while (terminateLoop(singleBatchPerPartition, rowIterator, rowCount, totalSize)) {
val row = rowIterator.next()

// Added for SPARK-6082. This assertion can be useful for scenarios when something
// like Hive TRANSFORM is used. The external data generation script used in TRANSFORM
// may result malformed rows, causing ArrayIndexOutOfBoundsException, which is somewhat
// hard to decipher.
assert(
row.numFields == columnBuilders.length,
s"Row column number mismatch, expected ${output.size} columns, " +
s"but got ${row.numFields}." +
s"\nRow content: $row")

var i = 0
totalSize = 0
while (i < row.numFields) {
columnBuilders(i).appendFrom(row, i)
totalSize += columnBuilders(i).columnStats.sizeInBytes
i += 1
}
rowCount += 1
}

batchStats.add(totalSize)

val statsInSeq = columnBuilders.flatMap(_.columnStats.collectedStatistics)

val stats = InternalRow.fromSeq(statsInSeq)

CachedBatch(rowCount, columnBuilders.map { builder =>
JavaUtils.bufferToArray(builder.build())
}, stats)
}

def hasNext: Boolean = rowIterator.hasNext
}

case class InMemoryRelation(
output: Seq[Attribute],
useCompression: Boolean,
Expand All @@ -69,6 +131,8 @@ case class InMemoryRelation(

@transient val partitionStatistics = new PartitionStatistics(output)

private val usePartitionLevelMetadata = conf.inMemoryPartitionMetadata

override def computeStats(): Statistics = {
if (batchStats.value == 0L) {
// Underlying columnar RDD hasn't been materialized, no useful statistics information
Expand All @@ -87,51 +151,14 @@ case class InMemoryRelation(

private def buildBuffers(): Unit = {
val output = child.output
val cached = child.execute().mapPartitionsInternal { rowIterator =>
new Iterator[CachedBatch] {
def next(): CachedBatch = {
val columnBuilders = output.map { attribute =>
ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression)
}.toArray

var rowCount = 0
var totalSize = 0L
while (rowIterator.hasNext && rowCount < batchSize
&& totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE) {
val row = rowIterator.next()

// Added for SPARK-6082. This assertion can be useful for scenarios when something
// like Hive TRANSFORM is used. The external data generation script used in TRANSFORM
// may result malformed rows, causing ArrayIndexOutOfBoundsException, which is somewhat
// hard to decipher.
assert(
row.numFields == columnBuilders.length,
s"Row column number mismatch, expected ${output.size} columns, " +
s"but got ${row.numFields}." +
s"\nRow content: $row")

var i = 0
totalSize = 0
while (i < row.numFields) {
columnBuilders(i).appendFrom(row, i)
totalSize += columnBuilders(i).columnStats.sizeInBytes
i += 1
}
rowCount += 1
}

batchStats.add(totalSize)

val stats = InternalRow.fromSeq(
columnBuilders.flatMap(_.columnStats.collectedStatistics))
CachedBatch(rowCount, columnBuilders.map { builder =>
JavaUtils.bufferToArray(builder.build())
}, stats)
}

def hasNext: Boolean = rowIterator.hasNext
}
}.persist(storageLevel)

val batchedRDD = child.execute().mapPartitionsInternal { rowIterator =>
new CachedBatchIterator(rowIterator, output, batchSize, useCompression, batchStats,
usePartitionLevelMetadata)
}

val cached = new CachedColumnarRDD(batchedRDD.sparkContext, batchedRDD,
usePartitionLevelMetadata, storageLevel).persist(storageLevel)

cached.setName(
tableName.map(n => s"In-memory table $n")
Expand Down
Loading