From 29f454b6bf60c56e9904601dddca0c14ef0fb771 Mon Sep 17 00:00:00 2001 From: maheshbehera Date: Wed, 3 Apr 2024 12:22:38 +0530 Subject: [PATCH] [SPARK-47702][CORE] Remove Shuffle service endpoint from the locations list when RDD block is removed form a node --- .../apache/spark/storage/BlockManager.scala | 6 ++-- .../BlockManagerReplicationSuite.scala | 34 +++++++++++++++++-- 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 89b3914e94af2..a461bfc3403c4 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -2112,8 +2112,10 @@ private[spark] class BlockManager( hasRemoveBlock = true if (tellMaster) { // Only update storage level from the captured block status before deleting, so that - // memory size and disk size are being kept for calculating delta. - reportBlockStatus(blockId, blockStatus.get.copy(storageLevel = StorageLevel.NONE)) + // memory size and disk size are being kept for calculating delta. Reset the replica + // count 0 in storage level to notify that its a remove operation. + val storageLevel = StorageLevel(blockStatus.get.storageLevel.toInt, 0) + reportBlockStatus(blockId, blockStatus.get.copy(storageLevel = storageLevel)) } } finally { if (!hasRemoveBlock) { diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index 1fbc900727c4c..8bd6eddc7c706 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -38,6 +38,8 @@ import org.apache.spark.internal.config.Tests._ import org.apache.spark.memory.UnifiedMemoryManager import org.apache.spark.network.BlockTransferService import org.apache.spark.network.netty.NettyBlockTransferService +import org.apache.spark.network.shuffle.ExternalBlockStoreClient +import org.apache.spark.network.util.{MapConfigProvider, TransportConf} import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.serializer.{KryoSerializer, SerializerManager} @@ -100,12 +102,20 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite // to make cached peers refresh frequently conf.set(STORAGE_CACHED_PEERS_TTL, 10) + conf.set(SHUFFLE_SERVICE_ENABLED, true) + conf.set(SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true) + conf.set(Tests.TEST_SKIP_ESS_REGISTER, true) + + val shuffleClient = Some(new ExternalBlockStoreClient( + new TransportConf("shuffle", MapConfigProvider.EMPTY), + null, false, 5000)) + sc = new SparkContext("local", "test", conf) val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]() master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", new BlockManagerMasterEndpoint(rpcEnv, true, conf, - new LiveListenerBus(conf), None, blockManagerInfo, mapOutputTracker, sc.env.shuffleManager, - isDriver = true)), + new LiveListenerBus(conf), shuffleClient, blockManagerInfo, mapOutputTracker, + sc.env.shuffleManager, isDriver = true)), rpcEnv.setupEndpoint("blockmanagerHeartbeat", new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true, blockManagerInfo)), conf, true) allStores.clear() @@ -296,6 +306,26 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite } } + test("Test block location after replication with SHUFFLE_SERVICE_FETCH_RDD_ENABLED enabled") { + val shuffleServicePort = conf.get(SHUFFLE_SERVICE_PORT) + val store1 = makeBlockManager(10000, "host-1") + val store2 = makeBlockManager(10000, "host-2") + assert(master.getPeers(store1.blockManagerId).toSet === Set(store2.blockManagerId)) + + val blockId = RDDBlockId(1, 2) + val message = new Array[Byte](1000) + + // if SHUFFLE_SERVICE_FETCH_RDD_ENABLED is enabled, then shuffle port should be present. + store1.putSingle(blockId, message, StorageLevel.DISK_ONLY) + assert(master.getLocations(blockId).contains( + BlockManagerId("host-1", "localhost", shuffleServicePort, None))) + + // after block is removed, shuffle port should be removed. + store1.removeBlock(blockId, true) + assert(!master.getLocations(blockId).contains( + BlockManagerId("host-1", "localhost", shuffleServicePort, None))) + } + test("block replication - addition and deletion of block managers") { val blockSize = 1000 val storeSize = 10000