Skip to content

Commit

Permalink
[SPARK-44287][SQL] Use PartitionEvaluator API in RowToColumnarExec & …
Browse files Browse the repository at this point in the history
…ColumnarToRowExec SQL operators

### What changes were proposed in this pull request?

SQL operators `RowToColumnarExec` & `ColumnarToRowExec`  are updated to use the `PartitionEvaluator` API to do execution.

### Why are the changes needed?

To avoid the use of lambda during distributed execution.
Ref: SPARK-43061 for more details.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Updated 2 test cases, once all the SQL operators are migrated, the flag `spark.sql.execution.useTaskEvaluator` will be enabled by default to avoid running the tests with and without this TaskEvaluator

Closes #41839 from vinodkc/br_refactorToEvaluatorFactory1.

Authored-by: Vinod KC <vinod.kc.in@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
vinodkc authored and cloud-fan committed Jul 7, 2023
1 parent 7bfbeb6 commit 56b9f6c
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@

package org.apache.spark.sql.execution

import scala.collection.JavaConverters._

import org.apache.spark.{broadcast, TaskContext}
import org.apache.spark.broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder, SpecializedGetters, UnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder, SpecializedGetters}
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
Expand All @@ -31,7 +29,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.datasources.V1WriteCommand
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector}
import org.apache.spark.sql.execution.vectorized.WritableColumnVector
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -89,15 +87,18 @@ case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition w
override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
val numInputBatches = longMetric("numInputBatches")
// This avoids calling `output` in the RDD closure, so that we don't need to include the entire
// plan (this) in the closure.
val localOutput = this.output
child.executeColumnar().mapPartitionsInternal { batches =>
val toUnsafe = UnsafeProjection.create(localOutput, localOutput)
batches.flatMap { batch =>
numInputBatches += 1
numOutputRows += batch.numRows()
batch.rowIterator().asScala.map(toUnsafe)
val evaluatorFactory =
new ColumnarToRowEvaluatorFactory(
child.output,
numOutputRows,
numInputBatches)

if (conf.usePartitionEvaluator) {
child.executeColumnar().mapPartitionsWithEvaluator(evaluatorFactory)
} else {
child.executeColumnar().mapPartitionsInternal { batches =>
val evaluator = evaluatorFactory.createEvaluator()
evaluator.eval(0, batches)
}
}
}
Expand Down Expand Up @@ -453,51 +454,25 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
)

override def doExecuteColumnar(): RDD[ColumnarBatch] = {
val enableOffHeapColumnVector = conf.offHeapColumnVectorEnabled
val numInputRows = longMetric("numInputRows")
val numOutputBatches = longMetric("numOutputBatches")
// Instead of creating a new config we are reusing columnBatchSize. In the future if we do
// combine with some of the Arrow conversion tools we will need to unify some of the configs.
val numRows = conf.columnBatchSize
// This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
// plan (this) in the closure.
val localSchema = this.schema
child.execute().mapPartitionsInternal { rowIterator =>
if (rowIterator.hasNext) {
new Iterator[ColumnarBatch] {
private val converters = new RowToColumnConverter(localSchema)
private val vectors: Seq[WritableColumnVector] = if (enableOffHeapColumnVector) {
OffHeapColumnVector.allocateColumns(numRows, localSchema)
} else {
OnHeapColumnVector.allocateColumns(numRows, localSchema)
}
private val cb: ColumnarBatch = new ColumnarBatch(vectors.toArray)

TaskContext.get().addTaskCompletionListener[Unit] { _ =>
cb.close()
}

override def hasNext: Boolean = {
rowIterator.hasNext
}

override def next(): ColumnarBatch = {
cb.setNumRows(0)
vectors.foreach(_.reset())
var rowCount = 0
while (rowCount < numRows && rowIterator.hasNext) {
val row = rowIterator.next()
converters.convert(row, vectors.toArray)
rowCount += 1
}
cb.setNumRows(rowCount)
numInputRows += rowCount
numOutputBatches += 1
cb
}
}
} else {
Iterator.empty
val evaluatorFactory =
new RowToColumnarEvaluatorFactory(
conf.offHeapColumnVectorEnabled,
numRows,
schema,
numInputRows,
numOutputBatches)

if (conf.usePartitionEvaluator) {
child.execute().mapPartitionsWithEvaluator(evaluatorFactory)
} else {
child.execute().mapPartitionsInternal { rowIterator =>
val evaluator = evaluatorFactory.createEvaluator()
evaluator.eval(0, rowIterator)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import scala.collection.JavaConverters._

import org.apache.spark.{PartitionEvaluator, PartitionEvaluatorFactory, TaskContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch

class ColumnarToRowEvaluatorFactory(
childOutput: Seq[Attribute],
numOutputRows: SQLMetric,
numInputBatches: SQLMetric)
extends PartitionEvaluatorFactory[ColumnarBatch, InternalRow] {

override def createEvaluator(): PartitionEvaluator[ColumnarBatch, InternalRow] = {
new ColumnarToRowEvaluator
}

private class ColumnarToRowEvaluator extends PartitionEvaluator[ColumnarBatch, InternalRow] {
override def eval(
partitionIndex: Int,
inputs: Iterator[ColumnarBatch]*): Iterator[InternalRow] = {
assert(inputs.length == 1)
val toUnsafe = UnsafeProjection.create(childOutput, childOutput)
inputs.head.flatMap { input =>
numInputBatches += 1
numOutputRows += input.numRows()
input.rowIterator().asScala.map(toUnsafe)
}
}
}
}

class RowToColumnarEvaluatorFactory(
enableOffHeapColumnVector: Boolean,
numRows: Int,
schema: StructType,
numInputRows: SQLMetric,
numOutputBatches: SQLMetric)
extends PartitionEvaluatorFactory[InternalRow, ColumnarBatch] {

override def createEvaluator(): PartitionEvaluator[InternalRow, ColumnarBatch] = {
new RowToColumnarEvaluator
}

private class RowToColumnarEvaluator extends PartitionEvaluator[InternalRow, ColumnarBatch] {
override def eval(
partitionIndex: Int,
inputs: Iterator[InternalRow]*): Iterator[ColumnarBatch] = {
assert(inputs.length == 1)
val rowIterator = inputs.head

if (rowIterator.hasNext) {
new Iterator[ColumnarBatch] {
private val converters = new RowToColumnConverter(schema)
private val vectors: Seq[WritableColumnVector] = if (enableOffHeapColumnVector) {
OffHeapColumnVector.allocateColumns(numRows, schema)
} else {
OnHeapColumnVector.allocateColumns(numRows, schema)
}
private val cb: ColumnarBatch = new ColumnarBatch(vectors.toArray)

TaskContext.get().addTaskCompletionListener[Unit] { _ =>
cb.close()
}

override def hasNext: Boolean = {
rowIterator.hasNext
}

override def next(): ColumnarBatch = {
cb.setNumRows(0)
vectors.foreach(_.reset())
var rowCount = 0
while (rowCount < numRows && rowIterator.hasNext) {
val row = rowIterator.next()
converters.convert(row, vectors.toArray)
rowCount += 1
}
cb.setNumRows(rowCount)
numInputRows += rowCount
numOutputBatches += 1
cb
}
}
} else {
Iterator.empty
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -279,36 +279,40 @@ class SparkSessionExtensionSuite extends SparkFunSuite with SQLHelper {
}
withSession(extensions) { session =>
session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED, enableAQE)
assert(session.sessionState.columnarRules.contains(
MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())))
import session.sqlContext.implicits._
// perform a join to inject a broadcast exchange
val left = Seq((1, 50L), (2, 100L), (3, 150L)).toDF("l1", "l2")
val right = Seq((1, 50L), (2, 100L), (3, 150L)).toDF("r1", "r2")
val data = left.join(right, $"l1" === $"r1")
// repartitioning avoids having the add operation pushed up into the LocalTableScan
.repartition(1)
val df = data.selectExpr("l2 + r2")
// execute the plan so that the final adaptive plan is available when AQE is on
df.collect()
val found = collectPlanSteps(df.queryExecution.executedPlan).sum
// 1 MyBroadcastExchangeExec
// 1 MyShuffleExchangeExec
// 1 ColumnarToRowExec
// 2 ColumnarProjectExec
// 1 ReplacedRowToColumnarExec
// so 11121 is expected.
assert(found == 11121)

// Verify that we get back the expected, wrong, result
val result = df.collect()
assert(result(0).getLong(0) == 101L) // Check that broken columnar Add was used.
assert(result(1).getLong(0) == 201L)
assert(result(2).getLong(0) == 301L)

withTempPath { path =>
val e = intercept[Exception](df.write.parquet(path.getCanonicalPath))
assert(e.getMessage == "columnar write")
Seq(true, false).foreach { enableEvaluator =>
withSQLConf(SQLConf.USE_PARTITION_EVALUATOR.key -> enableEvaluator.toString) {
assert(session.sessionState.columnarRules.contains(
MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())))
import session.sqlContext.implicits._
// perform a join to inject a broadcast exchange
val left = Seq((1, 50L), (2, 100L), (3, 150L)).toDF("l1", "l2")
val right = Seq((1, 50L), (2, 100L), (3, 150L)).toDF("r1", "r2")
val data = left.join(right, $"l1" === $"r1")
// repartitioning avoids having the add operation pushed up into the LocalTableScan
.repartition(1)
val df = data.selectExpr("l2 + r2")
// execute the plan so that the final adaptive plan is available when AQE is on
df.collect()
val found = collectPlanSteps(df.queryExecution.executedPlan).sum
// 1 MyBroadcastExchangeExec
// 1 MyShuffleExchangeExec
// 1 ColumnarToRowExec
// 2 ColumnarProjectExec
// 1 ReplacedRowToColumnarExec
// so 11121 is expected.
assert(found == 11121)

// Verify that we get back the expected, wrong, result
val result = df.collect()
assert(result(0).getLong(0) == 101L) // Check that broken columnar Add was used.
assert(result(1).getLong(0) == 201L)
assert(result(2).getLong(0) == 301L)

withTempPath { path =>
val e = intercept[Exception](df.write.parquet(path.getCanonicalPath))
assert(e.getMessage == "columnar write")
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,22 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession {

test("SPARK-37779: ColumnarToRowExec should be canonicalizable after being (de)serialized") {
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
withTempPath { path =>
spark.range(1).write.parquet(path.getAbsolutePath)
val df = spark.read.parquet(path.getAbsolutePath)
val columnarToRowExec =
df.queryExecution.executedPlan.collectFirst { case p: ColumnarToRowExec => p }.get
try {
spark.range(1).foreach { _ =>
columnarToRowExec.canonicalized
()
Seq(true, false).foreach { enable =>
withSQLConf(SQLConf.USE_PARTITION_EVALUATOR.key -> enable.toString) {
withTempPath { path =>
spark.range(1).write.parquet(path.getAbsolutePath)
val df = spark.read.parquet(path.getAbsolutePath)
val columnarToRowExec =
df.queryExecution.executedPlan.collectFirst { case p: ColumnarToRowExec => p }.get
try {
spark.range(1).foreach { _ =>
columnarToRowExec.canonicalized
()
}
} catch {
case e: Throwable => fail("ColumnarToRowExec was not canonicalizable", e)
}
}
} catch {
case e: Throwable => fail("ColumnarToRowExec was not canonicalizable", e)
}
}
}
Expand Down

0 comments on commit 56b9f6c

Please sign in to comment.