Skip to content

Commit 14d4dec

Browse files
committed
[SPARK-35879][CORE][SHUFFLE] Fix performance regression caused by collectFetchRequests
### What changes were proposed in this pull request? This PR fixes perf regression at the executor side when creating fetch requests with large initial partitions ![image](https://user-images.githubusercontent.com/8326978/123270865-dd21e800-d532-11eb-8447-ad80e47b034f.png) In NetEase, we had an online job that took `45min` to "fetch" about 100MB of shuffle data, which actually turned out that it was just collecting fetch requests slowly. Normally, such a task should finish in seconds. See the `DEBUG` log ``` 21/06/22 11:52:26 DEBUG BlockManagerStorageEndpoint: Sent response: 0 to kyuubi.163.org: 21/06/22 11:53:05 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 3941440 at BlockManagerId(12, .., 43559, None) with 19 blocks 21/06/22 11:53:44 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 3716400 at BlockManagerId(20, .., 38287, None) with 18 blocks 21/06/22 11:54:41 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 4559280 at BlockManagerId(6, .., 39689, None) with 22 blocks 21/06/22 11:55:08 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 3120160 at BlockManagerId(33, .., 39449, None) with 15 blocks ``` I also create a test case locally with my local laptop docker env to give some reproducible cases. ``` bin/spark-sql --conf spark.kubernetes.file.upload.path=./ --master k8s://https://kubernetes.docker.internal:6443 --conf spark.kubernetes.container.image=yaooqinn/spark:v20210624-5 -c spark.kubernetes.context=docker-for-desktop_1 --num-executors 5 --driver-memory 5g --conf spark.kubernetes.executor.podNamePrefix=sparksql ``` ```sql SET spark.sql.adaptive.enabled=true; SET spark.sql.shuffle.partitions=3000; SELECT /*+ REPARTITION */ 1 as pid, id from range(1, 1000000, 1, 500); SELECT /*+ REPARTITION(pid, id) */ 1 as pid, id from range(1, 1000000, 1, 500); ``` ### Why are the changes needed? fix perf regression which was introduced by SPARK-29292 (3ad4863) in v3.1.0. 3ad4863 is for support compilation with scala 2.13 but the performance losses is huge. We need to consider backporting this PR to branch 3.1. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? Mannully, #### before ```log 21/06/23 13:54:22 DEBUG ShuffleBlockFetcherIterator: maxBytesInFlight: 50331648, targetRemoteRequestSize: 10066329, maxBlocksInFlightPerAddress: 2147483647 21/06/23 13:54:38 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 2314708 at BlockManagerId(2, 10.1.3.114, 36423, None) with 86 blocks 21/06/23 13:54:59 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 2636612 at BlockManagerId(3, 10.1.3.115, 34293, None) with 87 blocks 21/06/23 13:55:18 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 2508706 at BlockManagerId(4, 10.1.3.116, 41869, None) with 90 blocks 21/06/23 13:55:34 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 2350854 at BlockManagerId(5, 10.1.3.117, 45787, None) with 85 blocks 21/06/23 13:55:34 INFO ShuffleBlockFetcherIterator: Getting 438 (11.8 MiB) non-empty blocks including 90 (2.5 MiB) local and 0 (0.0 B) host-local and 348 (9.4 MiB) remote blocks 21/06/23 13:55:34 DEBUG ShuffleBlockFetcherIterator: Sending request for 87 blocks (2.5 MiB) from 10.1.3.115:34293 21/06/23 13:55:34 INFO TransportClientFactory: Successfully created connection to /10.1.3.115:34293 after 1 ms (0 ms spent in bootstraps) 21/06/23 13:55:34 DEBUG ShuffleBlockFetcherIterator: Sending request for 90 blocks (2.4 MiB) from 10.1.3.116:41869 21/06/23 13:55:34 INFO TransportClientFactory: Successfully created connection to /10.1.3.116:41869 after 2 ms (0 ms spent in bootstraps) 21/06/23 13:55:34 DEBUG ShuffleBlockFetcherIterator: Sending request for 85 blocks (2.2 MiB) from 10.1.3.117:45787 ``` ```log 21/06/23 14:00:45 INFO MapOutputTracker: Broadcast outputstatuses size = 411, actual size = 828997 21/06/23 14:00:45 INFO MapOutputTrackerWorker: Got the map output locations 21/06/23 14:00:45 DEBUG ShuffleBlockFetcherIterator: maxBytesInFlight: 50331648, targetRemoteRequestSize: 10066329, maxBlocksInFlightPerAddress: 2147483647 21/06/23 14:00:55 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 1894389 at BlockManagerId(2, 10.1.3.114, 36423, None) with 99 blocks 21/06/23 14:01:04 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 1919993 at BlockManagerId(3, 10.1.3.115, 34293, None) with 100 blocks 21/06/23 14:01:14 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 1977186 at BlockManagerId(5, 10.1.3.117, 45787, None) with 103 blocks 21/06/23 14:01:23 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 1938336 at BlockManagerId(4, 10.1.3.116, 41869, None) with 101 blocks 21/06/23 14:01:23 INFO ShuffleBlockFetcherIterator: Getting 500 (9.1 MiB) non-empty blocks including 97 (1820.3 KiB) local and 0 (0.0 B) host-local and 403 (7.4 MiB) remote blocks 21/06/23 14:01:23 DEBUG ShuffleBlockFetcherIterator: Sending request for 101 blocks (1892.9 KiB) from 10.1.3.116:41869 21/06/23 14:01:23 DEBUG ShuffleBlockFetcherIterator: Sending request for 103 blocks (1930.8 KiB) from 10.1.3.117:45787 21/06/23 14:01:23 DEBUG ShuffleBlockFetcherIterator: Sending request for 99 blocks (1850.0 KiB) from 10.1.3.114:36423 21/06/23 14:01:23 DEBUG ShuffleBlockFetcherIterator: Sending request for 100 blocks (1875.0 KiB) from 10.1.3.115:34293 21/06/23 14:01:23 INFO ShuffleBlockFetcherIterator: Started 4 remote fetches in 37889 ms ``` #### After ```log 21/06/24 13:01:16 DEBUG ShuffleBlockFetcherIterator: maxBytesInFlight: 50331648, targetRemoteRequestSize: 10066329, maxBlocksInFlightPerAddress: 2147483647 21/06/24 13:01:16 INFO ShuffleBlockFetcherIterator: ==> Call blockInfos.map(_._2).sum: 40 ms 21/06/24 13:01:16 INFO ShuffleBlockFetcherIterator: ==> Call mergeFetchBlockInfo for shuffle_0_9_2990_2997/9: 0 ms 21/06/24 13:01:16 INFO ShuffleBlockFetcherIterator: ==> Call mergeFetchBlockInfo for shuffle_0_15_2395_2997/15: 0 ms ``` Closes apache#33063 from yaooqinn/SPARK-35879. Authored-by: Kent Yao <yao@apache.org> Signed-off-by: Kent Yao <yao@apache.org>
1 parent a9ebfc5 commit 14d4dec

File tree

1 file changed

+12
-11
lines changed

1 file changed

+12
-11
lines changed

core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,10 @@ final class ShuffleBlockFetcherIterator(
380380
hostLocalBlockBytes += mergedBlockInfos.map(_.size).sum
381381
} else {
382382
remoteBlockBytes += blockInfos.map(_._2).sum
383-
collectFetchRequests(address, blockInfos, collectedRemoteRequests)
383+
val (_, timeCost) = Utils.timeTakenMs[Unit] {
384+
collectFetchRequests(address, blockInfos, collectedRemoteRequests)
385+
}
386+
logDebug(s"Collected remote fetch requests for $address in $timeCost ms")
384387
}
385388
}
386389
val numRemoteBlocks = collectedRemoteRequests.map(_.blocks.size).sum
@@ -408,10 +411,10 @@ final class ShuffleBlockFetcherIterator(
408411
curBlocks: Seq[FetchBlockInfo],
409412
address: BlockManagerId,
410413
isLast: Boolean,
411-
collectedRemoteRequests: ArrayBuffer[FetchRequest]): Seq[FetchBlockInfo] = {
414+
collectedRemoteRequests: ArrayBuffer[FetchRequest]): ArrayBuffer[FetchBlockInfo] = {
412415
val mergedBlocks = mergeContinuousShuffleBlockIdsIfNeeded(curBlocks, doBatchFetch)
413416
numBlocksToFetch += mergedBlocks.size
414-
var retBlocks = Seq.empty[FetchBlockInfo]
417+
val retBlocks = new ArrayBuffer[FetchBlockInfo]
415418
if (mergedBlocks.length <= maxBlocksInFlightPerAddress) {
416419
collectedRemoteRequests += createFetchRequest(mergedBlocks, address)
417420
} else {
@@ -421,7 +424,7 @@ final class ShuffleBlockFetcherIterator(
421424
} else {
422425
// The last group does not exceed `maxBlocksInFlightPerAddress`. Put it back
423426
// to `curBlocks`.
424-
retBlocks = blocks
427+
retBlocks ++= blocks
425428
numBlocksToFetch -= blocks.size
426429
}
427430
}
@@ -435,26 +438,24 @@ final class ShuffleBlockFetcherIterator(
435438
collectedRemoteRequests: ArrayBuffer[FetchRequest]): Unit = {
436439
val iterator = blockInfos.iterator
437440
var curRequestSize = 0L
438-
var curBlocks = Seq.empty[FetchBlockInfo]
441+
var curBlocks = new ArrayBuffer[FetchBlockInfo]()
439442

440443
while (iterator.hasNext) {
441444
val (blockId, size, mapIndex) = iterator.next()
442445
assertPositiveBlockSize(blockId, size)
443-
curBlocks = curBlocks ++ Seq(FetchBlockInfo(blockId, size, mapIndex))
446+
curBlocks += FetchBlockInfo(blockId, size, mapIndex)
444447
curRequestSize += size
445448
// For batch fetch, the actual block in flight should count for merged block.
446449
val mayExceedsMaxBlocks = !doBatchFetch && curBlocks.size >= maxBlocksInFlightPerAddress
447450
if (curRequestSize >= targetRemoteRequestSize || mayExceedsMaxBlocks) {
448-
curBlocks = createFetchRequests(curBlocks, address, isLast = false,
451+
curBlocks = createFetchRequests(curBlocks.toSeq, address, isLast = false,
449452
collectedRemoteRequests)
450453
curRequestSize = curBlocks.map(_.size).sum
451454
}
452455
}
453456
// Add in the final request
454457
if (curBlocks.nonEmpty) {
455-
curBlocks = createFetchRequests(curBlocks, address, isLast = true,
456-
collectedRemoteRequests)
457-
curRequestSize = curBlocks.map(_.size).sum
458+
createFetchRequests(curBlocks.toSeq, address, isLast = true, collectedRemoteRequests)
458459
}
459460
}
460461

@@ -994,7 +995,7 @@ object ShuffleBlockFetcherIterator {
994995
blocks: Seq[FetchBlockInfo],
995996
doBatchFetch: Boolean): Seq[FetchBlockInfo] = {
996997
val result = if (doBatchFetch) {
997-
var curBlocks = new ArrayBuffer[FetchBlockInfo]
998+
val curBlocks = new ArrayBuffer[FetchBlockInfo]
998999
val mergedBlockInfo = new ArrayBuffer[FetchBlockInfo]
9991000

10001001
def mergeFetchBlockInfo(toBeMerged: ArrayBuffer[FetchBlockInfo]): FetchBlockInfo = {

0 commit comments

Comments
 (0)