Skip to content

Commit

Permalink
Rebase and applying review comment
Browse files Browse the repository at this point in the history
  • Loading branch information
attilapiros committed Oct 4, 2024
1 parent 2b395d6 commit 915d3e5
Showing 1 changed file with 18 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,20 +102,12 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
// to make cached peers refresh frequently
conf.set(STORAGE_CACHED_PEERS_TTL, 10)

conf.set(SHUFFLE_SERVICE_ENABLED, true)
conf.set(SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true)
conf.set(Tests.TEST_SKIP_ESS_REGISTER, true)

val shuffleClient = Some(new ExternalBlockStoreClient(
new TransportConf("shuffle", MapConfigProvider.EMPTY),
null, false, 5000))

sc = new SparkContext("local", "test", conf)
val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]()
master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
new BlockManagerMasterEndpoint(rpcEnv, true, conf,
new LiveListenerBus(conf), shuffleClient, blockManagerInfo, mapOutputTracker,
sc.env.shuffleManager, isDriver = true)),
new LiveListenerBus(conf), None, blockManagerInfo, mapOutputTracker, sc.env.shuffleManager,
isDriver = true)),
rpcEnv.setupEndpoint("blockmanagerHeartbeat",
new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true, blockManagerInfo)), conf, true)
allStores.clear()
Expand Down Expand Up @@ -307,7 +299,22 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
}

test("Test block location after replication with SHUFFLE_SERVICE_FETCH_RDD_ENABLED enabled") {
val shuffleServicePort = conf.get(SHUFFLE_SERVICE_PORT)
val newConf = conf.clone()
newConf.set(SHUFFLE_SERVICE_ENABLED, true)
newConf.set(SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true)
newConf.set(Tests.TEST_SKIP_ESS_REGISTER, true)
val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]()
val shuffleClient = Some(new ExternalBlockStoreClient(
new TransportConf("shuffle", MapConfigProvider.EMPTY),
null, false, 5000))
master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager-2",
new BlockManagerMasterEndpoint(rpcEnv, true, newConf,
new LiveListenerBus(newConf), shuffleClient, blockManagerInfo, mapOutputTracker,
sc.env.shuffleManager, isDriver = true)),
rpcEnv.setupEndpoint("blockmanagerHeartbeat-2",
new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true, blockManagerInfo)), newConf, true)

val shuffleServicePort = newConf.get(SHUFFLE_SERVICE_PORT)
val store1 = makeBlockManager(10000, "host-1")
val store2 = makeBlockManager(10000, "host-2")
assert(master.getPeers(store1.blockManagerId).toSet === Set(store2.blockManagerId))
Expand Down

0 comments on commit 915d3e5

Please sign in to comment.