Skip to content

[SPARK-12286] [SPARK-12290] [SPARK-12294] [SPARK-12284] [SQL] always output UnsafeRow #10511

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -904,8 +904,7 @@ class SQLContext private[sql](
@transient
protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] {
val batches = Seq(
Batch("Add exchange", Once, EnsureRequirements(self)),
Batch("Add row converters", Once, EnsureRowFormats)
Batch("Add exchange", Once, EnsureRequirements(self))
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors.attachTree
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.util.MutablePair
Expand All @@ -50,26 +49,14 @@ case class Exchange(
case None => ""
}

val simpleNodeName = if (tungstenMode) "TungstenExchange" else "Exchange"
val simpleNodeName = "Exchange"
s"$simpleNodeName$extraInfo"
}

/**
* Returns true iff we can support the data type, and we are not doing range partitioning.
*/
private lazy val tungstenMode: Boolean = !newPartitioning.isInstanceOf[RangePartitioning]

override def outputPartitioning: Partitioning = newPartitioning

override def output: Seq[Attribute] = child.output

// This setting is somewhat counterintuitive:
// If the schema works with UnsafeRow, then we tell the planner that we don't support safe row,
// so the planner inserts a converter to convert data into UnsafeRow if needed.
override def outputsUnsafeRows: Boolean = tungstenMode
override def canProcessSafeRows: Boolean = !tungstenMode
override def canProcessUnsafeRows: Boolean = tungstenMode

/**
* Determines whether records must be defensively copied before being sent to the shuffle.
* Several of Spark's shuffle components will buffer deserialized Java objects in memory. The
Expand Down Expand Up @@ -130,15 +117,7 @@ case class Exchange(
}
}

@transient private lazy val sparkConf = child.sqlContext.sparkContext.getConf

private val serializer: Serializer = {
if (tungstenMode) {
new UnsafeRowSerializer(child.output.size)
} else {
new SparkSqlSerializer(sparkConf)
}
}
private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)

override protected def doPrepare(): Unit = {
// If an ExchangeCoordinator is needed, we register this Exchange operator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, GenericMutableRow}
import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, Attribute, AttributeSet, GenericMutableRow}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation}
import org.apache.spark.sql.types.DataType
Expand Down Expand Up @@ -99,10 +99,19 @@ private[sql] case class PhysicalRDD(
rdd: RDD[InternalRow],
override val nodeName: String,
override val metadata: Map[String, String] = Map.empty,
override val outputsUnsafeRows: Boolean = false)
isUnsafeRow: Boolean = false)
extends LeafNode {

protected override def doExecute(): RDD[InternalRow] = rdd
protected override def doExecute(): RDD[InternalRow] = {
if (isUnsafeRow) {
rdd
} else {
rdd.mapPartitionsInternal { iter =>
val proj = UnsafeProjection.create(schema)
iter.map(proj)
}
}
}

override def simpleString: String = {
val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,11 @@ case class Expand(
// as UNKNOWN partitioning
override def outputPartitioning: Partitioning = UnknownPartitioning(0)

override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows
override def canProcessUnsafeRows: Boolean = true
override def canProcessSafeRows: Boolean = true

override def references: AttributeSet =
AttributeSet(projections.flatten.flatMap(_.references))

private[this] val projection = {
if (outputsUnsafeRows) {
(exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output)
} else {
(exprs: Seq[Expression]) => newMutableProjection(exprs, child.output)()
}
}
private[this] val projection =
(exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output)

protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
child.execute().mapPartitions { iter =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ case class Generate(
child.execute().mapPartitionsInternal { iter =>
val generatorNullRow = InternalRow.fromSeq(Seq.fill[Any](generator.elementTypes.size)(null))
val joinedRow = new JoinedRow
val proj = UnsafeProjection.create(output, output)

iter.flatMap { row =>
// we should always set the left (child output)
Expand All @@ -77,13 +78,14 @@ case class Generate(
} ++ LazyIterator(() => boundGenerator.terminate()).map { row =>
// we leave the left side as the last element of its child output
// keep it the same as Hive does
joinedRow.withRight(row)
proj(joinedRow.withRight(row))
}
}
} else {
child.execute().mapPartitionsInternal { iter =>
iter.flatMap(row => boundGenerator.eval(row)) ++
LazyIterator(() => boundGenerator.terminate())
val proj = UnsafeProjection.create(output, output)
(iter.flatMap(row => boundGenerator.eval(row)) ++
LazyIterator(() => boundGenerator.terminate())).map(proj)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}


/**
Expand All @@ -29,15 +29,20 @@ private[sql] case class LocalTableScan(
output: Seq[Attribute],
rows: Seq[InternalRow]) extends LeafNode {

private lazy val rdd = sqlContext.sparkContext.parallelize(rows)
private val unsafeRows: Array[InternalRow] = {
val proj = UnsafeProjection.create(output, output)
rows.map(r => proj(r).copy()).toArray
}

private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows)

protected override def doExecute(): RDD[InternalRow] = rdd

override def executeCollect(): Array[InternalRow] = {
rows.toArray
unsafeRows
}

override def executeTake(limit: Int): Array[InternalRow] = {
rows.take(limit).toArray
unsafeRows.take(limit)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ case class Sort(
testSpillFrequency: Int = 0)
extends UnaryNode {

override def outputsUnsafeRows: Boolean = true
override def canProcessUnsafeRows: Boolean = true
override def canProcessSafeRows: Boolean = false

override def output: Seq[Attribute] = child.output

override def outputOrdering: Seq[SortOrder] = sortOrder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,36 +97,13 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
/** Specifies sort order for each partition requirements on the input data for this operator. */
def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil)

/** Specifies whether this operator outputs UnsafeRows */
def outputsUnsafeRows: Boolean = false

/** Specifies whether this operator is capable of processing UnsafeRows */
def canProcessUnsafeRows: Boolean = false

/**
* Specifies whether this operator is capable of processing Java-object-based Rows (i.e. rows
* that are not UnsafeRows).
*/
def canProcessSafeRows: Boolean = true

/**
* Returns the result of this query as an RDD[InternalRow] by delegating to doExecute
* after adding query plan information to created RDDs for visualization.
* Concrete implementations of SparkPlan should override doExecute instead.
*/
final def execute(): RDD[InternalRow] = {
if (children.nonEmpty) {
val hasUnsafeInputs = children.exists(_.outputsUnsafeRows)
val hasSafeInputs = children.exists(!_.outputsUnsafeRows)
assert(!(hasSafeInputs && hasUnsafeInputs),
"Child operators should output rows in the same format")
assert(canProcessSafeRows || canProcessUnsafeRows,
"Operator must be able to process at least one row format")
assert(!hasSafeInputs || canProcessSafeRows,
"Operator will receive safe rows as input but cannot process safe rows")
assert(!hasUnsafeInputs || canProcessUnsafeRows,
"Operator will receive unsafe rows as input but cannot process unsafe rows")
}
RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
prepare()
doExecute()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,6 @@ case class Window(

override def outputOrdering: Seq[SortOrder] = child.outputOrdering

override def canProcessUnsafeRows: Boolean = true

/**
* Create a bound ordering object for a given frame type and offset. A bound ordering object is
* used to determine which input row lies within the frame boundaries of an output row.
Expand Down Expand Up @@ -259,16 +257,16 @@ case class Window(
* @return the final resulting projection.
*/
private[this] def createResultProjection(
expressions: Seq[Expression]): MutableProjection = {
expressions: Seq[Expression]): UnsafeProjection = {
val references = expressions.zipWithIndex.map{ case (e, i) =>
// Results of window expressions will be on the right side of child's output
BoundReference(child.output.size + i, e.dataType, e.nullable)
}
val unboundToRefMap = expressions.zip(references).toMap
val patchedWindowExpression = windowExpression.map(_.transform(unboundToRefMap))
newMutableProjection(
UnsafeProjection.create(
projectList ++ patchedWindowExpression,
child.output)()
child.output)
}

protected override def doExecute(): RDD[InternalRow] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ case class SortBasedAggregate(
"numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"),
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))

override def outputsUnsafeRows: Boolean = true
override def canProcessUnsafeRows: Boolean = false
override def canProcessSafeRows: Boolean = true

override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)

override def requiredChildDistribution: List[Distribution] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ class SortBasedAggregationIterator(
// The aggregation buffer used by the sort-based aggregation.
private[this] val sortBasedAggregationBuffer: MutableRow = newBuffer

// An SafeProjection to turn UnsafeRow into GenericInternalRow, because UnsafeRow can't be
// compared to MutableRow (aggregation buffer) directly.
private[this] val safeProj: Projection = FromUnsafeProjection(valueAttributes.map(_.dataType))

protected def initialize(): Unit = {
if (inputIterator.hasNext) {
initializeBuffer(sortBasedAggregationBuffer)
Expand All @@ -110,7 +114,7 @@ class SortBasedAggregationIterator(
// We create a variable to track if we see the next group.
var findNextPartition = false
// firstRowInNextGroup is the first row of this group. We first process it.
processRow(sortBasedAggregationBuffer, firstRowInNextGroup)
processRow(sortBasedAggregationBuffer, safeProj(firstRowInNextGroup))

// The search will stop when we see the next group or there is no
// input row left in the iter.
Expand All @@ -122,7 +126,7 @@ class SortBasedAggregationIterator(

// Check if the current row belongs the current input row.
if (currentGroupingKey == groupingKey) {
processRow(sortBasedAggregationBuffer, currentRow)
processRow(sortBasedAggregationBuffer, safeProj(currentRow))
} else {
// We find a new group.
findNextPartition = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ case class TungstenAggregate(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"))

override def outputsUnsafeRows: Boolean = true
override def canProcessUnsafeRows: Boolean = true
override def canProcessSafeRows: Boolean = true

override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)

override def producedAttributes: AttributeSet =
Expand Down
Loading