Skip to content

Commit

Permalink
Rebase and applying review comment
Browse files Browse the repository at this point in the history
  • Loading branch information
attilapiros committed Oct 4, 2024
1 parent 9b40ac4 commit 6ba9559
Showing 1 changed file with 14 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
with LocalSparkContext {

val conf: SparkConf = createConf()
var shuffleClient: Option[ExternalBlockStoreClient] = null
protected def createConf(): SparkConf

protected var rpcEnv: RpcEnv = null
Expand Down Expand Up @@ -102,11 +103,9 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
// to make cached peers refresh frequently
conf.set(STORAGE_CACHED_PEERS_TTL, 10)

conf.set(SHUFFLE_SERVICE_ENABLED, true)
conf.set(SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true)
conf.set(Tests.TEST_SKIP_ESS_REGISTER, true)

val shuffleClient = Some(new ExternalBlockStoreClient(
shuffleClient = Some(new ExternalBlockStoreClient(
new TransportConf("shuffle", MapConfigProvider.EMPTY),
null, false, 5000))

Expand Down Expand Up @@ -307,7 +306,18 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
}

test("Test block location after replication with SHUFFLE_SERVICE_FETCH_RDD_ENABLED enabled") {
val shuffleServicePort = conf.get(SHUFFLE_SERVICE_PORT)
val newConf = conf.clone()
newConf.set(SHUFFLE_SERVICE_ENABLED, true)
newConf.set(SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true)
val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]()
master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager-2",
new BlockManagerMasterEndpoint(rpcEnv, true, newConf,
new LiveListenerBus(newConf), shuffleClient, blockManagerInfo, mapOutputTracker,
sc.env.shuffleManager, isDriver = true)),
rpcEnv.setupEndpoint("blockmanagerHeartbeat-2",
new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true, blockManagerInfo)), newConf, true)

val shuffleServicePort = newConf.get(SHUFFLE_SERVICE_PORT)
val store1 = makeBlockManager(10000, "host-1")
val store2 = makeBlockManager(10000, "host-2")
assert(master.getPeers(store1.blockManagerId).toSet === Set(store2.blockManagerId))
Expand Down

0 comments on commit 6ba9559

Please sign in to comment.