Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ARROW] Arrow serialization should not introduce extra shuffle for outermost limit #4662

Closed
wants to merge 28 commits into from

Conversation

cfmcgrady
Copy link
Contributor

Why are the changes needed?

The fundamental concept is to execute a job similar to the way in which CollectLimitExec.executeCollect() operates.

select * from parquet.`parquet/tpcds/sf1000/catalog_sales` limit 20;

Before this PR:
截屏2023-04-04 下午3 20 34

截屏2023-04-04 下午3 20 54

After this PR:

截屏2023-04-04 下午3 17 05

截屏2023-04-04 下午3 17 16

How was this patch tested?

  • Add some test cases that check the changes thoroughly including negative and positive cases if possible

  • Add screenshots for manual tests if appropriate

  • Run test locally before make a pull request


def executeArrowBatchCollect: SparkPlan => Array[Array[Byte]] = {
case adaptiveSparkPlan: AdaptiveSparkPlanExec =>
executeArrowBatchCollect(adaptiveSparkPlan.finalPhysicalPlan)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the AdaptiveSparkPlanExec.finalPhysicalPlan function was introduced in SPARK-41914, and it may present compatibility issues if the underlying Spark runtime lacks the corresponding patch.

Copy link
Contributor

@ulysses-you ulysses-you Apr 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we reflect some related private method to workaround ? it's unacceptable if we break the compatibility.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have changed to reflective call function adaptiveSparkPlan.finalPhysicalPlan.

}
i += 1
}
result.toArray
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add offset support in the separate PR to adapt Spark-3.4.0

@cfmcgrady
Copy link
Contributor Author

cc @yaooqinn @ulysses-you @pan3793 @turboFei @cxzl25

Copy link
Contributor

@ulysses-you ulysses-you left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we unfiy the class name a bit more ? not see the difference between xxxutils and xxx helper... logically, most of those methods should be private.

estimatedBatchSize += (row match {
case ur: UnsafeRow => ur.getSizeInBytes
// Trying to estimate the size of the current row, assuming 16 bytes per value.
case ir: InternalRow => ir.numFields * 16
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in general, we can infer row size by schema.defaultSize

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for the lack of documentation.
This class ArrowBatchIterator is derived from org.apache.spark.sql.execution.arrow.ArrowConverters.ArrowBatchWithSchemaIterator, with two key differences:

  1. there is no requirement to write the schema at the batch header
  2. iteration halts when rowCount equals limit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is the diff, compare with latest spark master branch https://github.com/apache/spark/blob/3c189abd73afa998e8573cbfdaf0f72445284314/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala

-  private[sql] class ArrowBatchWithSchemaIterator(
+  private[sql] class ArrowBatchIterator(
       rowIter: Iterator[InternalRow],
       schema: StructType,
       maxRecordsPerBatch: Long,
       maxEstimatedBatchSize: Long,
+      limit: Long,
       timeZoneId: String,
       context: TaskContext)
-    extends ArrowBatchIterator(
-      rowIter, schema, maxRecordsPerBatch, timeZoneId, context) {
+    extends Iterator[Array[Byte]] {
+

-    private val arrowSchemaSize = SizeEstimator.estimate(arrowSchema)
     var rowCountInLastBatch: Long = 0
+    var rowCount: Long = 0

     override def next(): Array[Byte] = {
       val out = new ByteArrayOutputStream()
       val writeChannel = new WriteChannel(Channels.newChannel(out))

       rowCountInLastBatch = 0
-      var estimatedBatchSize = arrowSchemaSize
+      var estimatedBatchSize = 0
       Utils.tryWithSafeFinally {
-        // Always write the schema.
-        MessageSerializer.serialize(writeChannel, arrowSchema)

         // Always write the first row.
         while (rowIter.hasNext && (
@@ -31,15 +30,17 @@
             estimatedBatchSize < maxEstimatedBatchSize ||
             // If the size of rows are 0 or negative, unlimit it.
             maxRecordsPerBatch <= 0 ||
-            rowCountInLastBatch < maxRecordsPerBatch)) {
+            rowCountInLastBatch < maxRecordsPerBatch ||
+            rowCount < limit)) {
           val row = rowIter.next()
           arrowWriter.write(row)
           estimatedBatchSize += (row match {
             case ur: UnsafeRow => ur.getSizeInBytes
-            // Trying to estimate the size of the current row, assuming 16 bytes per value.
-            case ir: InternalRow => ir.numFields * 16
+            // Trying to estimate the size of the current row
+            case _: InternalRow => schema.defaultSize
           })
           rowCountInLastBatch += 1
+          rowCount += 1
         }
         arrowWriter.finish()
         val batch = unloader.getRecordBatch()


def executeArrowBatchCollect: SparkPlan => Array[Array[Byte]] = {
case adaptiveSparkPlan: AdaptiveSparkPlanExec =>
executeArrowBatchCollect(adaptiveSparkPlan.finalPhysicalPlan)
Copy link
Contributor

@ulysses-you ulysses-you Apr 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we reflect some related private method to workaround ? it's unacceptable if we break the compatibility.

@codecov-commenter
Copy link

codecov-commenter commented Apr 6, 2023

Codecov Report

Merging #4662 (82c912e) into master (d9e14f2) will increase coverage by 0.25%.
The diff coverage is 85.91%.

@@             Coverage Diff              @@
##             master    #4662      +/-   ##
============================================
+ Coverage     57.60%   57.85%   +0.25%     
  Complexity       13       13              
============================================
  Files           579      580       +1     
  Lines         31951    32212     +261     
  Branches       4269     4304      +35     
============================================
+ Hits          18404    18635     +231     
+ Misses        11785    11780       -5     
- Partials       1762     1797      +35     
Impacted Files Coverage Δ
...uubi/engine/spark/operation/ExecuteStatement.scala 82.60% <50.00%> (+1.80%) ⬆️
...g/apache/spark/sql/kyuubi/SparkDatasetHelper.scala 80.00% <82.60%> (+1.42%) ⬆️
...rk/sql/execution/arrow/KyuubiArrowConverters.scala 88.57% <88.57%> (ø)

... and 23 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@cfmcgrady
Copy link
Contributor Author

can we unfiy the class name a bit more ? not see the difference between xxxutils and xxx helper... logically, most of those methods should be private.

have completed the refactoring, please take another look when you have time. Thank you. @ulysses-you

@ulysses-you
Copy link
Contributor

thank you @cfmcgrady , lgtm if tests pass

val in = new ByteArrayInputStream(bytes)
val out = new ByteArrayOutputStream(bytes.length)

val rootAllocator = ArrowUtils.rootAllocator.newChildAllocator(
Copy link
Member

@pan3793 pan3793 Apr 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name "rootAllocator" is not suitable then, and why should we create a child allocator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's for debugging.

Memory was leaked by query. Memory leaked: (128)
Allocator(slice) 0/128/128/9223372036854775807 (res/actual/peak/limit)

java.lang.IllegalStateException: Memory was leaked by query. Memory leaked: (128)
Allocator(slice) 0/128/128/9223372036854775807 (res/actual/peak/limit)

	at org.apache.arrow.memory.BaseAllocator.close(BaseAllocator.java:437)
	at org.apache.spark.sql.execution.arrow.KyuubiArrowConverters$.slice(KyuubiArrowConverters.scala:91)
	at org.apache.spark.sql.kyuubi.SparkDatasetHelper$.doCollectLimit(SparkDatasetHelper.scala:170)
	at org.apache.spark.sql.kyuubi.SparkDatasetHelper$.$anonfun$executeArrowBatchCollect$1(SparkDatasetHelper.scala:51)
	at org.apache.spark.sql.kyuubi.SparkDatasetHelper$


Copy link
Member

@pan3793 pan3793 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM if CI pass

Copy link
Member

@turboFei turboFei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work, thanks

@ulysses-you
Copy link
Contributor

ulysses-you commented Apr 10, 2023

thanks, merging to master/branch-1.7

ulysses-you pushed a commit that referenced this pull request Apr 10, 2023
… shuffle for outermost limit

### _Why are the changes needed?_

The fundamental concept is to execute a job similar to the way in which `CollectLimitExec.executeCollect()` operates.

```sql
select * from parquet.`parquet/tpcds/sf1000/catalog_sales` limit 20;
```

Before this PR:
![截屏2023-04-04 下午3 20 34](https://user-images.githubusercontent.com/8537877/229717946-87c480c6-9550-4d00-bc96-14d59d7ce9f7.png)

![截屏2023-04-04 下午3 20 54](https://user-images.githubusercontent.com/8537877/229717973-bf6da5af-74e7-422a-b9fa-8b7bebd43320.png)

After this PR:

![截屏2023-04-04 下午3 17 05](https://user-images.githubusercontent.com/8537877/229718016-6218d019-b223-4deb-b596-6f0431d33d2a.png)

![截屏2023-04-04 下午3 17 16](https://user-images.githubusercontent.com/8537877/229718046-ea07cd1f-5ffc-42ba-87d5-08085feb4b16.png)

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4662 from cfmcgrady/arrow-collect-limit-exec-2.

Closes #4662

82c912e [Fu Chen] close vector
130bcb1 [Fu Chen] finally close
facc13f [Fu Chen] exclude rule OptimizeLimitZero
3700839 [Fu Chen] SparkArrowbasedOperationSuite adapt Spark-3.1.x
6064ab9 [Fu Chen] limit = 0 test case
6d596fc [Fu Chen] address comment
8280783 [Fu Chen] add `isStaticConfigKey` to adapt Spark-3.1.x
22cc70f [Fu Chen] add ut
b72bc6f [Fu Chen] add offset support to adapt Spark-3.4.x
9ffb44f [Fu Chen] make toBatchIterator private
c83cf3f [Fu Chen] SparkArrowbasedOperationSuite adapt Spark-3.1.x
573a262 [Fu Chen] fix
4cef204 [Fu Chen] SparkArrowbasedOperationSuite adapt Spark-3.1.x
d70aee3 [Fu Chen] SparkPlan.session -> SparkSession.active to adapt Spark-3.1.x
e3bf84c [Fu Chen] refactor
81886f0 [Fu Chen] address comment
2286afc [Fu Chen] reflective calla AdaptiveSparkPlanExec.finalPhysicalPlan
03d0747 [Fu Chen] address comment
25e4f05 [Fu Chen] add docs
885cf2c [Fu Chen] infer row size by schema.defaultSize
4e7ca54 [Fu Chen] unnecessarily changes
ee5a756 [Fu Chen] revert unnecessarily changes
6c5b1eb [Fu Chen] add ut
4212a89 [Fu Chen] refactor and add ut
ed8c692 [Fu Chen] refactor
0088671 [Fu Chen] refine
8593d85 [Fu Chen] driver slice last batch
a584943 [Fu Chen] arrow take

Authored-by: Fu Chen <cfmcgrady@gmail.com>
Signed-off-by: ulyssesyou <ulyssesyou@apache.org>
(cherry picked from commit 1a65125)
Signed-off-by: ulyssesyou <ulyssesyou@apache.org>
@ulysses-you ulysses-you added this to the v1.7.1 milestone Apr 10, 2023
@cfmcgrady cfmcgrady deleted the arrow-collect-limit-exec-2 branch April 10, 2023 02:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants