Skip to content

Commit

Permalink
[KYUUBI apache#4754] [ARROW] Use `KyuubiArrowConveters#toBatchIterato…
Browse files Browse the repository at this point in the history
…r` instead of `ArrowConveters#toBatchIterator`

### _Why are the changes needed?_

to adapt Spark 3.4

the signature of function `ArrowConveters#toBatchIterator` is changed in apache/spark#38618 (since Spark 3.4)

Before Spark 3.4:

```
private[sql] def toBatchIterator(
    rowIter: Iterator[InternalRow],
    schema: StructType,
    maxRecordsPerBatch: Int,
    timeZoneId: String,
    context: TaskContext): Iterator[Array[Byte]]
```

Spark 3.4

```
private[sql] def toBatchIterator(
    rowIter: Iterator[InternalRow],
    schema: StructType,
    maxRecordsPerBatch: Long,
    timeZoneId: String,
    context: TaskContext): ArrowBatchIterator
```

the return type is changed from `Iterator[Array[Byte]]` to `ArrowBatchIterator`

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes apache#4754 from cfmcgrady/arrow-spark34.

Closes apache#4754

a3c58d0 [Fu Chen] fix ci
32704c5 [Fu Chen] Revert "fix ci"
e32311a [Fu Chen] fix ci
a76af62 [Cheng Pan] Update externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
453b6a6 [Cheng Pan] Update externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
74a9f7a [Cheng Pan] Update externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
4ce5844 [Fu Chen] adapt Spark 3.4

Lead-authored-by: Fu Chen <cfmcgrady@gmail.com>
Co-authored-by: Cheng Pan <pan3793@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
  • Loading branch information
cfmcgrady and pan3793 committed Apr 23, 2023
1 parent 4ca025b commit d0a7ca4
Showing 1 changed file with 9 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.kyuubi

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.{ByteUnit, JavaUtils}
import org.apache.spark.rdd.RDD
Expand All @@ -28,7 +27,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec, SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.{CollectLimitExec, SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.arrow.{ArrowConverters, KyuubiArrowConverters}
import org.apache.spark.sql.execution.arrow.KyuubiArrowConverters
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -73,17 +72,20 @@ object SparkDatasetHelper extends Logging {
def toArrowBatchRdd(plan: SparkPlan): RDD[Array[Byte]] = {
val schemaCaptured = plan.schema
// TODO: SparkPlan.session introduced in SPARK-35798, replace with SparkPlan.session once we
// drop Spark-3.1.x support.
// drop Spark 3.1 support.
val maxRecordsPerBatch = SparkSession.active.sessionState.conf.arrowMaxRecordsPerBatch
val timeZoneId = SparkSession.active.sessionState.conf.sessionLocalTimeZone
// note that, we can't pass the lazy variable `maxBatchSize` directly, this is because input
// arguments are serialized and sent to the executor side for execution.
val maxBatchSizePerBatch = maxBatchSize
plan.execute().mapPartitionsInternal { iter =>
val context = TaskContext.get()
ArrowConverters.toBatchIterator(
KyuubiArrowConverters.toBatchIterator(
iter,
schemaCaptured,
maxRecordsPerBatch,
timeZoneId,
context)
maxBatchSizePerBatch,
-1,
timeZoneId)
}
}

Expand Down

0 comments on commit d0a7ca4

Please sign in to comment.