Skip to content

Commit 4cfeb8e

Browse files
committed
We don't need the to set operations, also sleepyRdd isn't always sleepy so lets call it baseRdd, and test both small blocksize to mem and not
1 parent 4cb0458 commit 4cfeb8e

File tree

1 file changed

+14
-8
lines changed

1 file changed

+14
-8
lines changed

core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionSuite.scala

+14-8
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,21 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext
4141
runDecomTest(true, false, true)
4242
}
4343

44-
test(s"verify that shuffle blocks are migrated.") {
44+
test(s"verify that shuffle blocks are migrated with force to disk") {
45+
runDecomTest(false, true, false, remoteBlockSize = "1")
46+
}
47+
48+
test(s"verify that shuffle blocks are migrated") {
4549
runDecomTest(false, true, false)
4650
}
4751

4852
test(s"verify that both migrations can work at the same time.") {
4953
runDecomTest(true, true, false)
5054
}
5155

52-
private def runDecomTest(persist: Boolean, shuffle: Boolean, migrateDuring: Boolean) = {
56+
private def runDecomTest(persist: Boolean, shuffle: Boolean, migrateDuring: Boolean,
57+
remoteBlockSize: String = "100000") = {
58+
5359
val master = s"local-cluster[${numExecs}, 1, 1024]"
5460
val conf = new SparkConf().setAppName("test").setMaster(master)
5561
.set(config.Worker.WORKER_DECOMMISSION_ENABLED, true)
@@ -75,16 +81,16 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext
7581

7682
// Create a new RDD where we have sleep in each partition, we are also increasing
7783
// the value of accumulator in each partition
78-
val sleepyRdd = input.mapPartitions { x =>
84+
val baseRdd = input.mapPartitions { x =>
7985
if (migrateDuring) {
8086
Thread.sleep(500)
8187
}
8288
accum.add(1)
8389
x.map(y => (y, y))
8490
}
8591
val testRdd = shuffle match {
86-
case true => sleepyRdd.reduceByKey(_ + _)
87-
case false => sleepyRdd
92+
case true => baseRdd.reduceByKey(_ + _)
93+
case false => baseRdd
8894
}
8995

9096
// Listen for the job & block updates
@@ -182,7 +188,7 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext
182188
val blockLocs = rddUpdates.map{ update =>
183189
(update.blockUpdatedInfo.blockId.name,
184190
update.blockUpdatedInfo.blockManagerId)}
185-
val blocksToManagers = blockLocs.groupBy(_._1).mapValues(_.toSet.size)
191+
val blocksToManagers = blockLocs.groupBy(_._1).mapValues(_.size)
186192
assert(!blocksToManagers.filter(_._2 > 1).isEmpty,
187193
s"We should have a block that has been on multiple BMs in rdds:\n ${rddUpdates} from:\n" +
188194
s"${blocksUpdated}\n but instead we got:\n ${blocksToManagers}")
@@ -193,11 +199,11 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext
193199
val numDataLocs = blocksUpdated.filter{ update =>
194200
val blockId = update.blockUpdatedInfo.blockId
195201
blockId.isInstanceOf[ShuffleDataBlockId]
196-
}.toSet.size
202+
}.size
197203
val numIndexLocs = blocksUpdated.filter{ update =>
198204
val blockId = update.blockUpdatedInfo.blockId
199205
blockId.isInstanceOf[ShuffleIndexBlockId]
200-
}.toSet.size
206+
}.size
201207
assert(numDataLocs >= 1, s"Expect shuffle data block updates in ${blocksUpdated}")
202208
assert(numIndexLocs >= 1, s"Expect shuffle index block updates in ${blocksUpdated}")
203209
}

0 commit comments

Comments
 (0)