From d0a7ca4ba80c2995aac2143a940ece14be12527f Mon Sep 17 00:00:00 2001 From: Fu Chen Date: Sun, 23 Apr 2023 17:39:59 +0800 Subject: [PATCH] [KYUUBI #4754] [ARROW] Use `KyuubiArrowConveters#toBatchIterator` instead of `ArrowConveters#toBatchIterator` ### _Why are the changes needed?_ to adapt Spark 3.4 the signature of function `ArrowConveters#toBatchIterator` is changed in https://github.com/apache/spark/pull/38618 (since Spark 3.4) Before Spark 3.4: ``` private[sql] def toBatchIterator( rowIter: Iterator[InternalRow], schema: StructType, maxRecordsPerBatch: Int, timeZoneId: String, context: TaskContext): Iterator[Array[Byte]] ``` Spark 3.4 ``` private[sql] def toBatchIterator( rowIter: Iterator[InternalRow], schema: StructType, maxRecordsPerBatch: Long, timeZoneId: String, context: TaskContext): ArrowBatchIterator ``` the return type is changed from `Iterator[Array[Byte]]` to `ArrowBatchIterator` ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request Closes #4754 from cfmcgrady/arrow-spark34. Closes #4754 a3c58d0ad [Fu Chen] fix ci 32704c577 [Fu Chen] Revert "fix ci" e32311a03 [Fu Chen] fix ci a76af6209 [Cheng Pan] Update externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala 453b6a6b8 [Cheng Pan] Update externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala 74a9f7a9d [Cheng Pan] Update externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala 4ce5844af [Fu Chen] adapt Spark 3.4 Lead-authored-by: Fu Chen Co-authored-by: Cheng Pan Signed-off-by: Cheng Pan --- .../spark/sql/kyuubi/SparkDatasetHelper.scala | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala index 6bd96676fc3..285a28a60f1 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.kyuubi import scala.collection.mutable.ArrayBuffer -import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.network.util.{ByteUnit, JavaUtils} import org.apache.spark.rdd.RDD @@ -28,7 +27,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec, SparkPlan, SQLExecution} import org.apache.spark.sql.execution.{CollectLimitExec, SparkPlan, SQLExecution} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec -import org.apache.spark.sql.execution.arrow.{ArrowConverters, KyuubiArrowConverters} +import org.apache.spark.sql.execution.arrow.KyuubiArrowConverters import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ @@ -73,17 +72,20 @@ object SparkDatasetHelper extends Logging { def toArrowBatchRdd(plan: SparkPlan): RDD[Array[Byte]] = { val schemaCaptured = plan.schema // TODO: SparkPlan.session introduced in SPARK-35798, replace with SparkPlan.session once we - // drop Spark-3.1.x support. + // drop Spark 3.1 support. val maxRecordsPerBatch = SparkSession.active.sessionState.conf.arrowMaxRecordsPerBatch val timeZoneId = SparkSession.active.sessionState.conf.sessionLocalTimeZone + // note that, we can't pass the lazy variable `maxBatchSize` directly, this is because input + // arguments are serialized and sent to the executor side for execution. + val maxBatchSizePerBatch = maxBatchSize plan.execute().mapPartitionsInternal { iter => - val context = TaskContext.get() - ArrowConverters.toBatchIterator( + KyuubiArrowConverters.toBatchIterator( iter, schemaCaptured, maxRecordsPerBatch, - timeZoneId, - context) + maxBatchSizePerBatch, + -1, + timeZoneId) } }