Skip to content

Commit

Permalink
[SPARK-11088][SQL] Merges partition values using UnsafeProjection
Browse files Browse the repository at this point in the history
`DataSourceStrategy.mergeWithPartitionValues` is essentially a projection implemented in a quite inefficient way. This PR optimizes this method with `UnsafeProjection` to avoid unnecessary boxing costs.

Author: Cheng Lian <lian@databricks.com>

Closes apache#9104 from liancheng/spark-11088.faster-partition-values-merging.
  • Loading branch information
liancheng authored and marmbrus committed Oct 19, 2015
1 parent 16906ef commit 8b877cc
Showing 1 changed file with 24 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,29 +140,30 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val sharedHadoopConf = SparkHadoopUtil.get.conf
val confBroadcast =
relation.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf))
val partitionColumnNames = partitionColumns.fieldNames.toSet

// Now, we create a scan builder, which will be used by pruneFilterProject. This scan builder
// will union all partitions and attach partition values if needed.
val scanBuilder = {
(columns: Seq[Attribute], filters: Array[Filter]) => {
(requiredColumns: Seq[Attribute], filters: Array[Filter]) => {
val requiredDataColumns =
requiredColumns.filterNot(c => partitionColumnNames.contains(c.name))

// Builds RDD[Row]s for each selected partition.
val perPartitionRows = partitions.map { case Partition(partitionValues, dir) =>
val partitionColNames = partitionColumns.fieldNames

// Don't scan any partition columns to save I/O. Here we are being optimistic and
// assuming partition columns data stored in data files are always consistent with those
// partition values encoded in partition directory paths.
val needed = columns.filterNot(a => partitionColNames.contains(a.name))
val dataRows =
relation.buildScan(needed.map(_.name).toArray, filters, Array(dir), confBroadcast)
val dataRows = relation.buildScan(
requiredDataColumns.map(_.name).toArray, filters, Array(dir), confBroadcast)

// Merges data values with partition values.
mergeWithPartitionValues(
relation.schema,
columns.map(_.name).toArray,
partitionColNames,
requiredColumns,
requiredDataColumns,
partitionColumns,
partitionValues,
toCatalystRDD(logicalRelation, needed, dataRows))
toCatalystRDD(logicalRelation, requiredDataColumns, dataRows))
}

val unionedRows =
Expand All @@ -188,52 +189,27 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
sparkPlan
}

// TODO: refactor this thing. It is very complicated because it does projection internally.
// We should just put a project on top of this.
private def mergeWithPartitionValues(
schema: StructType,
requiredColumns: Array[String],
partitionColumns: Array[String],
requiredColumns: Seq[Attribute],
dataColumns: Seq[Attribute],
partitionColumnSchema: StructType,
partitionValues: InternalRow,
dataRows: RDD[InternalRow]): RDD[InternalRow] = {
val nonPartitionColumns = requiredColumns.filterNot(partitionColumns.contains)

// If output columns contain any partition column(s), we need to merge scanned data
// columns and requested partition columns to form the final result.
if (!requiredColumns.sameElements(nonPartitionColumns)) {
val mergers = requiredColumns.zipWithIndex.map { case (name, index) =>
// To see whether the `index`-th column is a partition column...
val i = partitionColumns.indexOf(name)
if (i != -1) {
val dt = schema(partitionColumns(i)).dataType
// If yes, gets column value from partition values.
(mutableRow: MutableRow, dataRow: InternalRow, ordinal: Int) => {
mutableRow(ordinal) = partitionValues.get(i, dt)
}
} else {
// Otherwise, inherits the value from scanned data.
val i = nonPartitionColumns.indexOf(name)
val dt = schema(nonPartitionColumns(i)).dataType
(mutableRow: MutableRow, dataRow: InternalRow, ordinal: Int) => {
mutableRow(ordinal) = dataRow.get(i, dt)
}
}
if (requiredColumns != dataColumns) {
// Builds `AttributeReference`s for all partition columns so that we can use them to project
// required partition columns. Note that if a partition column appears in `requiredColumns`,
// we should use the `AttributeReference` in `requiredColumns`.
val requiredColumnMap = requiredColumns.map(a => a.name -> a).toMap
val partitionColumns = partitionColumnSchema.toAttributes.map { a =>
requiredColumnMap.getOrElse(a.name, a)
}

// Since we know for sure that this closure is serializable, we can avoid the overhead
// of cleaning a closure for each RDD by creating our own MapPartitionsRDD. Functionally
// this is equivalent to calling `dataRows.mapPartitions(mapPartitionsFunc)` (SPARK-7718).
val mapPartitionsFunc = (_: TaskContext, _: Int, iterator: Iterator[InternalRow]) => {
val dataTypes = requiredColumns.map(schema(_).dataType)
val mutableRow = new SpecificMutableRow(dataTypes)
iterator.map { dataRow =>
var i = 0
while (i < mutableRow.numFields) {
mergers(i)(mutableRow, dataRow, i)
i += 1
}
mutableRow.asInstanceOf[InternalRow]
}
val projection = UnsafeProjection.create(requiredColumns, dataColumns ++ partitionColumns)
val mutableJoinedRow = new JoinedRow()
iterator.map(dataRow => projection(mutableJoinedRow(dataRow, partitionValues)))
}

// This is an internal RDD whose call site the user should not be concerned with
Expand All @@ -242,7 +218,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
Utils.withDummyCallSite(dataRows.sparkContext) {
new MapPartitionsRDD(dataRows, mapPartitionsFunc, preservesPartitioning = false)
}

} else {
dataRows
}
Expand Down

0 comments on commit 8b877cc

Please sign in to comment.