From 915d3e5404c1c315baae91c51575612af7148ddc Mon Sep 17 00:00:00 2001 From: attilapiros Date: Thu, 3 Oct 2024 19:07:00 -0700 Subject: [PATCH] Rebase and applying review comment --- .../BlockManagerReplicationSuite.scala | 29 ++++++++++++------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index 8bd6eddc7c706..f5fca56e5ef77 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -102,20 +102,12 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite // to make cached peers refresh frequently conf.set(STORAGE_CACHED_PEERS_TTL, 10) - conf.set(SHUFFLE_SERVICE_ENABLED, true) - conf.set(SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true) - conf.set(Tests.TEST_SKIP_ESS_REGISTER, true) - - val shuffleClient = Some(new ExternalBlockStoreClient( - new TransportConf("shuffle", MapConfigProvider.EMPTY), - null, false, 5000)) - sc = new SparkContext("local", "test", conf) val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]() master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", new BlockManagerMasterEndpoint(rpcEnv, true, conf, - new LiveListenerBus(conf), shuffleClient, blockManagerInfo, mapOutputTracker, - sc.env.shuffleManager, isDriver = true)), + new LiveListenerBus(conf), None, blockManagerInfo, mapOutputTracker, sc.env.shuffleManager, + isDriver = true)), rpcEnv.setupEndpoint("blockmanagerHeartbeat", new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true, blockManagerInfo)), conf, true) allStores.clear() @@ -307,7 +299,22 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite } test("Test block location after replication with SHUFFLE_SERVICE_FETCH_RDD_ENABLED enabled") { - val shuffleServicePort = conf.get(SHUFFLE_SERVICE_PORT) + val newConf = conf.clone() + newConf.set(SHUFFLE_SERVICE_ENABLED, true) + newConf.set(SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true) + newConf.set(Tests.TEST_SKIP_ESS_REGISTER, true) + val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]() + val shuffleClient = Some(new ExternalBlockStoreClient( + new TransportConf("shuffle", MapConfigProvider.EMPTY), + null, false, 5000)) + master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager-2", + new BlockManagerMasterEndpoint(rpcEnv, true, newConf, + new LiveListenerBus(newConf), shuffleClient, blockManagerInfo, mapOutputTracker, + sc.env.shuffleManager, isDriver = true)), + rpcEnv.setupEndpoint("blockmanagerHeartbeat-2", + new BlockManagerMasterHeartbeatEndpoint(rpcEnv, true, blockManagerInfo)), newConf, true) + + val shuffleServicePort = newConf.get(SHUFFLE_SERVICE_PORT) val store1 = makeBlockManager(10000, "host-1") val store2 = makeBlockManager(10000, "host-2") assert(master.getPeers(store1.blockManagerId).toSet === Set(store2.blockManagerId))