Skip to content

Commit

Permalink
[CELEBORN-912][FOLLOWUP] Support columnar shuffle for Spark 3.5
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Introduce `spark-3.5-columnar-shuffle` module to support columnar shuffle for Spark 3.5.

Follow up apache#2710, apache#2609.

### Why are the changes needed?

Tests of `CelebornColumnarShuffleReaderSuite` are failed for the changes of apache#2609.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

`CelebornColumnarShuffleReaderSuite`

Closes apache#2726 from SteNicholas/CELEBORN-912.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: xiyu.zk <xiyu.zk@alibaba-inc.com>
  • Loading branch information
SteNicholas authored and s0nskar committed Sep 16, 2024
1 parent b203193 commit 1cb9dd2
Showing 1 changed file with 9 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.shuffle.celeborn

import org.apache.spark.{ShuffleDependency, SparkConf}
import org.apache.spark.{ShuffleDependency, SparkConf, TaskContext}
import org.apache.spark.serializer.{KryoSerializer, KryoSerializerInstance}
import org.apache.spark.sql.execution.UnsafeRowSerializer
import org.apache.spark.sql.execution.columnar.CelebornColumnarBatchSerializerInstance
Expand Down Expand Up @@ -45,14 +45,17 @@ class CelebornColumnarShuffleReaderSuite {

var shuffleClient: MockedStatic[ShuffleClient] = null
try {
val taskContext = Mockito.mock(classOf[TaskContext])
Mockito.when(taskContext.stageAttemptNumber).thenReturn(0)
Mockito.when(taskContext.attemptNumber).thenReturn(0)
shuffleClient = Mockito.mockStatic(classOf[ShuffleClient])
val shuffleReader = SparkUtils.createColumnarShuffleReader(
handle,
0,
10,
0,
10,
null,
taskContext,
new CelebornConf(),
null,
new ExecutorShuffleIdTracker())
Expand All @@ -68,6 +71,9 @@ class CelebornColumnarShuffleReaderSuite {
def columnarShuffleReaderNewSerializerInstance(): Unit = {
var shuffleClient: MockedStatic[ShuffleClient] = null
try {
val taskContext = Mockito.mock(classOf[TaskContext])
Mockito.when(taskContext.stageAttemptNumber).thenReturn(0)
Mockito.when(taskContext.attemptNumber).thenReturn(0)
shuffleClient = Mockito.mockStatic(classOf[ShuffleClient])
val shuffleReader = SparkUtils.createColumnarShuffleReader(
new CelebornShuffleHandle[Int, String, String](
Expand All @@ -83,7 +89,7 @@ class CelebornColumnarShuffleReaderSuite {
10,
0,
10,
null,
taskContext,
new CelebornConf(),
null,
new ExecutorShuffleIdTracker())
Expand Down

0 comments on commit 1cb9dd2

Please sign in to comment.