Skip to content

Commit

Permalink
[SPARK-47702][CORE] Remove Shuffle service endpoint from the location…
Browse files Browse the repository at this point in the history
…s list when RDD block is removed form a node
  • Loading branch information
maheshbehera committed Apr 3, 2024
1 parent 2c2a2ad commit 29f454b
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2112,8 +2112,10 @@ private[spark] class BlockManager(
hasRemoveBlock = true
if (tellMaster) {
// Only update storage level from the captured block status before deleting, so that
// memory size and disk size are being kept for calculating delta.
reportBlockStatus(blockId, blockStatus.get.copy(storageLevel = StorageLevel.NONE))
// memory size and disk size are being kept for calculating delta. Reset the replica
// count 0 in storage level to notify that its a remove operation.
val storageLevel = StorageLevel(blockStatus.get.storageLevel.toInt, 0)
reportBlockStatus(blockId, blockStatus.get.copy(storageLevel = storageLevel))
}
} finally {
if (!hasRemoveBlock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import org.apache.spark.internal.config.Tests._
import org.apache.spark.memory.UnifiedMemoryManager
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.network.shuffle.ExternalBlockStoreClient
import org.apache.spark.network.util.{MapConfigProvider, TransportConf}
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
Expand Down Expand Up @@ -100,12 +102,20 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
// to make cached peers refresh frequently
conf.set(STORAGE_CACHED_PEERS_TTL, 10)

conf.set(SHUFFLE_SERVICE_ENABLED, true)
conf.set(SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true)
conf.set(Tests.TEST_SKIP_ESS_REGISTER, true)

val shuffleClient = Some(new ExternalBlockStoreClient(
new TransportConf("shuffle", MapConfigProvider.EMPTY),
null, false, 5000))

sc = new SparkContext("local", "test", conf)
val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]()
master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
new BlockManagerMasterEndpoint(rpcEnv, true, conf,
new LiveListenerBus(conf), None, blockManagerInfo, mapOutputTracker, sc.env.shuffleManager,
isDriver = true)),
new LiveListenerBus(conf), shuffleClient, blockManagerInfo, mapOutputTracker,
sc.env.shuffleManager, isDriver = true)),
rpcEnv.setupEndpoint("blockmanagerHeartbeat",
new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true, blockManagerInfo)), conf, true)
allStores.clear()
Expand Down Expand Up @@ -296,6 +306,26 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
}
}

test("Test block location after replication with SHUFFLE_SERVICE_FETCH_RDD_ENABLED enabled") {
val shuffleServicePort = conf.get(SHUFFLE_SERVICE_PORT)
val store1 = makeBlockManager(10000, "host-1")
val store2 = makeBlockManager(10000, "host-2")
assert(master.getPeers(store1.blockManagerId).toSet === Set(store2.blockManagerId))

val blockId = RDDBlockId(1, 2)
val message = new Array[Byte](1000)

// if SHUFFLE_SERVICE_FETCH_RDD_ENABLED is enabled, then shuffle port should be present.
store1.putSingle(blockId, message, StorageLevel.DISK_ONLY)
assert(master.getLocations(blockId).contains(
BlockManagerId("host-1", "localhost", shuffleServicePort, None)))

// after block is removed, shuffle port should be removed.
store1.removeBlock(blockId, true)
assert(!master.getLocations(blockId).contains(
BlockManagerId("host-1", "localhost", shuffleServicePort, None)))
}

test("block replication - addition and deletion of block managers") {
val blockSize = 1000
val storeSize = 10000
Expand Down

0 comments on commit 29f454b

Please sign in to comment.