Skip to content

add configuration for partition_metadata #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 36 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
c3f1a9b
improve the doc for "spark.memory.offHeap.size"
CodingCat Mar 7, 2016
6a2b3ca
fix
CodingCat Mar 7, 2016
6e37fa2
add configuration for partition_metadata
CodingCat Oct 27, 2017
aa70660
framework of CachedColumnarRDD
CodingCat Oct 27, 2017
d138082
code framework
CodingCat Oct 27, 2017
a72d779
remove cachedcolumnarbatchRDD
CodingCat Oct 30, 2017
0fe35f8
fix styly error
CodingCat Oct 30, 2017
9e34243
temp
CodingCat Oct 31, 2017
677ca81
'CachedColumnarRDD'
CodingCat Nov 1, 2017
df1d796
change types
CodingCat Nov 1, 2017
08fd085
fix compilation error
CodingCat Nov 1, 2017
d4fc2b7
update
CodingCat Nov 2, 2017
97a63d6
fix storage level
CodingCat Nov 2, 2017
a24b7bb
fix getOrCompute
CodingCat Nov 2, 2017
0e8e639
evaluate with partition metadata
CodingCat Nov 2, 2017
b89d58b
fix getOrCompute
CodingCat Nov 2, 2017
3f2eae7
add logging
CodingCat Nov 2, 2017
507c1a2
add logging for skipped partition
CodingCat Nov 2, 2017
40d441c
try to print stats
CodingCat Nov 2, 2017
520e5aa
add logging for skipped partition
CodingCat Nov 2, 2017
885808f
add logging for skipped partition
CodingCat Nov 2, 2017
37b5971
add logging for skipped partition
CodingCat Nov 2, 2017
4dbfe37
refactor the code
CodingCat Nov 9, 2017
6165838
fix compilation issue
CodingCat Nov 9, 2017
05f2267
refactor the code
CodingCat Nov 9, 2017
bcafe82
test
CodingCat Nov 9, 2017
5b888d3
fix compilation issue
CodingCat Nov 9, 2017
977b93f
add missing filtering
CodingCat Nov 9, 2017
9c9bcad
test
CodingCat Nov 9, 2017
56a4307
test
CodingCat Nov 9, 2017
7936033
fix rebundant read
CodingCat Nov 9, 2017
3b6bfa2
compact iterators
CodingCat Nov 10, 2017
963ca0a
update
CodingCat Nov 10, 2017
d4f12b1
add first test case
CodingCat Nov 10, 2017
46d68db
test for remove metadata block
CodingCat Nov 10, 2017
77cf789
generate correct results when data block is removed
CodingCat Nov 11, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion core/src/main/scala/org/apache/spark/storage/BlockId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.annotation.DeveloperApi
* If your BlockId should be serializable, be sure to add it to the BlockId.apply() method.
*/
@DeveloperApi
sealed abstract class BlockId {
abstract class BlockId {
/** A globally unique identifier for this Block. Can be used for ser/de. */
def name: String

Expand All @@ -49,6 +49,11 @@ case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
override def name: String = "rdd_" + rddId + "_" + splitIndex
}

@DeveloperApi
case class RDDPartitionMetadataBlockId(rddId: Int, splitIndex: Int) extends BlockId {
override def name: String = "rdd_" + rddId + "_" + splitIndex + ".metadata"
}

// Format of the shuffle block ids (including data and index) should be kept in sync with
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getBlockData().
@DeveloperApi
Expand Down Expand Up @@ -103,6 +108,7 @@ class UnrecognizedBlockId(name: String)
@DeveloperApi
object BlockId {
val RDD = "rdd_([0-9]+)_([0-9]+)".r
val PARTITION_METADATA = "rdd_([0-9]+)_([0-9]+).metadata".r
val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r
val SHUFFLE_DATA = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).data".r
val SHUFFLE_INDEX = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).index".r
Expand All @@ -116,6 +122,8 @@ object BlockId {
def apply(name: String): BlockId = name match {
case RDD(rddId, splitIndex) =>
RDDBlockId(rddId.toInt, splitIndex.toInt)
case PARTITION_METADATA(rddId, splitIndex) =>
RDDPartitionMetadataBlockId(rddId.toInt, splitIndex.toInt)
case SHUFFLE(shuffleId, mapId, reduceId) =>
ShuffleBlockId(shuffleId.toInt, mapId.toInt, reduceId.toInt)
case SHUFFLE_DATA(shuffleId, mapId, reduceId) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,18 @@ object SQLConf {
val IN_MEMORY_PARTITION_PRUNING =
buildConf("spark.sql.inMemoryColumnarStorage.partitionPruning")
.internal()
.doc("When true, enable partition pruning for in-memory columnar tables.")
.doc("When true, enable partition batch pruning for in-memory columnar tables.")
.booleanConf
.createWithDefault(true)

val IN_MEMORY_PARTITION_METADATA =
buildConf("spark.sql.inMemoryColumnarStorage.partitionMetadata")
.internal()
.doc("When true, spark sql will collect partition level stats for in-memory columnar" +
" tables and do coarse-grained pruning")
.booleanConf
.createWithDefault(false)

val PREFER_SORTMERGEJOIN = buildConf("spark.sql.join.preferSortMergeJoin")
.internal()
.doc("When true, prefer sort merge join over shuffle hash join.")
Expand Down Expand Up @@ -1134,6 +1142,8 @@ class SQLConf extends Serializable with Logging {

def inMemoryPartitionPruning: Boolean = getConf(IN_MEMORY_PARTITION_PRUNING)

def inMemoryPartitionMetadata: Boolean = getConf(IN_MEMORY_PARTITION_METADATA)

def columnNameOfCorruptRecord: String = getConf(COLUMN_NAME_OF_CORRUPT_RECORD)

def broadcastTimeout: Long = getConf(BROADCAST_TIMEOUT)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.columnar

import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.storage.{RDDPartitionMetadataBlockId, StorageLevel}

class CachedColumnarRDD(
@transient private var _sc: SparkContext,
private var dataRDD: RDD[CachedBatch],
containsPartitionMetadata: Boolean,
expectedStorageLevel: StorageLevel)
extends RDD[CachedBatch](_sc, Seq(new OneToOneDependency(dataRDD))) {

override def compute(split: Partition, context: TaskContext): Iterator[CachedBatch] = {
firstParent.iterator(split, context)
}

override protected def getPartitions: Array[Partition] = dataRDD.partitions

override private[spark] def getOrCompute(split: Partition, context: TaskContext):
Iterator[CachedBatch] = {
val metadataBlockId = RDDPartitionMetadataBlockId(id, split.index)
val superGetOrCompute: (Partition, TaskContext) => Iterator[CachedBatch] = super.getOrCompute
SparkEnv.get.blockManager.getSingle[InternalRow](metadataBlockId).map(metadataBlock =>
new InterruptibleIterator[CachedBatch](context,
new CachedColumnarIterator(metadataBlock, split, context, superGetOrCompute))
).getOrElse {
val batchIter = superGetOrCompute(split, context)
if (containsPartitionMetadata && getStorageLevel != StorageLevel.NONE && batchIter.hasNext) {
val cachedBatch = batchIter.next()
SparkEnv.get.blockManager.putSingle(metadataBlockId, cachedBatch.stats,
expectedStorageLevel)
new InterruptibleIterator[CachedBatch](context, Iterator(cachedBatch))
} else {
batchIter
}
}
}
}

private[columnar] class CachedColumnarIterator(
val partitionStats: InternalRow,
partition: Partition,
context: TaskContext,
fetchRDDPartition: (Partition, TaskContext) => Iterator[CachedBatch])
extends Iterator[CachedBatch] {

private var delegate: Iterator[CachedBatch] = _

override def hasNext: Boolean = {
if (delegate == null) {
delegate = fetchRDDPartition(partition, context)
}
delegate.hasNext
}

override def next(): CachedBatch = {
delegate.next()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,68 @@ object InMemoryRelation {
private[columnar]
case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow)

private[columnar] class CachedBatchIterator(
rowIterator: Iterator[InternalRow],
output: Seq[Attribute],
batchSize: Int,
useCompression: Boolean,
batchStats: LongAccumulator,
singleBatchPerPartition: Boolean) extends Iterator[CachedBatch] {

def next(): CachedBatch = {
val columnBuilders = output.map { attribute =>
ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression)
}.toArray

var rowCount = 0
var totalSize = 0L

val terminateLoop = (singleBatch: Boolean, rowIter: Iterator[InternalRow],
rowCount: Int, size: Long) => {
if (!singleBatch) {
rowIter.hasNext && rowCount < batchSize && totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE
} else {
rowIter.hasNext
}
}

while (terminateLoop(singleBatchPerPartition, rowIterator, rowCount, totalSize)) {
val row = rowIterator.next()

// Added for SPARK-6082. This assertion can be useful for scenarios when something
// like Hive TRANSFORM is used. The external data generation script used in TRANSFORM
// may result malformed rows, causing ArrayIndexOutOfBoundsException, which is somewhat
// hard to decipher.
assert(
row.numFields == columnBuilders.length,
s"Row column number mismatch, expected ${output.size} columns, " +
s"but got ${row.numFields}." +
s"\nRow content: $row")

var i = 0
totalSize = 0
while (i < row.numFields) {
columnBuilders(i).appendFrom(row, i)
totalSize += columnBuilders(i).columnStats.sizeInBytes
i += 1
}
rowCount += 1
}

batchStats.add(totalSize)

val statsInSeq = columnBuilders.flatMap(_.columnStats.collectedStatistics)

val stats = InternalRow.fromSeq(statsInSeq)

CachedBatch(rowCount, columnBuilders.map { builder =>
JavaUtils.bufferToArray(builder.build())
}, stats)
}

def hasNext: Boolean = rowIterator.hasNext
}

case class InMemoryRelation(
output: Seq[Attribute],
useCompression: Boolean,
Expand All @@ -69,6 +131,8 @@ case class InMemoryRelation(

@transient val partitionStatistics = new PartitionStatistics(output)

private val usePartitionLevelMetadata = conf.inMemoryPartitionMetadata

override def computeStats(): Statistics = {
if (batchStats.value == 0L) {
// Underlying columnar RDD hasn't been materialized, no useful statistics information
Expand All @@ -87,51 +151,14 @@ case class InMemoryRelation(

private def buildBuffers(): Unit = {
val output = child.output
val cached = child.execute().mapPartitionsInternal { rowIterator =>
new Iterator[CachedBatch] {
def next(): CachedBatch = {
val columnBuilders = output.map { attribute =>
ColumnBuilder(attribute.dataType, batchSize, attribute.name, useCompression)
}.toArray

var rowCount = 0
var totalSize = 0L
while (rowIterator.hasNext && rowCount < batchSize
&& totalSize < ColumnBuilder.MAX_BATCH_SIZE_IN_BYTE) {
val row = rowIterator.next()

// Added for SPARK-6082. This assertion can be useful for scenarios when something
// like Hive TRANSFORM is used. The external data generation script used in TRANSFORM
// may result malformed rows, causing ArrayIndexOutOfBoundsException, which is somewhat
// hard to decipher.
assert(
row.numFields == columnBuilders.length,
s"Row column number mismatch, expected ${output.size} columns, " +
s"but got ${row.numFields}." +
s"\nRow content: $row")

var i = 0
totalSize = 0
while (i < row.numFields) {
columnBuilders(i).appendFrom(row, i)
totalSize += columnBuilders(i).columnStats.sizeInBytes
i += 1
}
rowCount += 1
}

batchStats.add(totalSize)

val stats = InternalRow.fromSeq(
columnBuilders.flatMap(_.columnStats.collectedStatistics))
CachedBatch(rowCount, columnBuilders.map { builder =>
JavaUtils.bufferToArray(builder.build())
}, stats)
}

def hasNext: Boolean = rowIterator.hasNext
}
}.persist(storageLevel)

val batchedRDD = child.execute().mapPartitionsInternal { rowIterator =>
new CachedBatchIterator(rowIterator, output, batchSize, useCompression, batchStats,
usePartitionLevelMetadata)
}

val cached = new CachedColumnarRDD(batchedRDD.sparkContext, batchedRDD,
usePartitionLevelMetadata, storageLevel).persist(storageLevel)

cached.setName(
tableName.map(n => s"In-memory table $n")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@

package org.apache.spark.sql.execution.columnar

import org.apache.spark.{InterruptibleIterator, SparkEnv}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{Predicate => GenPredicate, _}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec}
import org.apache.spark.sql.execution.vectorized._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.storage.RDDPartitionMetadataBlockId


case class InMemoryTableScanExec(
Expand Down Expand Up @@ -180,37 +184,49 @@ case class InMemoryTableScanExec(

private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning

private def doFilterCachedBatches(
cachedBatchIterator: Iterator[CachedBatch],
partitionStatsSchema: Seq[AttributeReference],
partitionFilter: GenPredicate): Iterator[CachedBatch] = {
val schemaIndex = partitionStatsSchema.zipWithIndex
cachedBatchIterator.filter { cachedBatch =>
if (!partitionFilter.eval(cachedBatch.stats)) {
logDebug {
val statsString = schemaIndex.map { case (a, i) =>
val value = cachedBatch.stats.get(i, a.dataType)
s"${a.name}: $value"
}.mkString(", ")
s"Skipping partition based on stats $statsString"
}
false
} else {
true
}
}
}

private def filteredCachedBatches(): RDD[CachedBatch] = {
// Using these variables here to avoid serialization of entire objects (if referenced directly)
// within the map Partitions closure.
val schema = relation.partitionStatistics.schema
val schemaIndex = schema.zipWithIndex
val buffers = relation.cachedColumnBuffers

buffers.mapPartitionsWithIndexInternal { (index, cachedBatchIterator) =>

val partitionFilter = newPredicate(
partitionFilters.reduceOption(And).getOrElse(Literal(true)),
schema)
partitionFilter.initialize(index)

// Do partition batch pruning if enabled
if (inMemoryPartitionPruningEnabled) {
cachedBatchIterator.filter { cachedBatch =>
if (!partitionFilter.eval(cachedBatch.stats)) {
logDebug {
val statsString = schemaIndex.map { case (a, i) =>
val value = cachedBatch.stats.get(i, a.dataType)
s"${a.name}: $value"
}.mkString(", ")
s"Skipping partition based on stats $statsString"
}
false
} else {
true
}
}
} else {
cachedBatchIterator
cachedBatchIterator.asInstanceOf[InterruptibleIterator[_]].delegate match {
case cachedIter: CachedColumnarIterator
if !partitionFilter.eval(cachedIter.partitionStats) =>
// scalastyle:off
println(s"skipped partition $index")
Iterator()
case _ =>
doFilterCachedBatches(cachedBatchIterator, schema, partitionFilter)
// scalastyle:on
}
}
}
Expand Down
Loading