@@ -24,8 +24,7 @@ import scala.concurrent.duration._
24
24
25
25
import org .scalatest .concurrent .Eventually
26
26
27
- import org .apache .spark .{LocalSparkContext , SparkConf , SparkContext , SparkFunSuite , Success ,
28
- TestUtils }
27
+ import org .apache .spark ._
29
28
import org .apache .spark .internal .config
30
29
import org .apache .spark .scheduler ._
31
30
import org .apache .spark .scheduler .cluster .StandaloneSchedulerBackend
@@ -35,41 +34,51 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext
35
34
with ResetSystemProperties with Eventually {
36
35
37
36
val numExecs = 3
37
+ val numParts = 3
38
38
39
39
test(s " verify that an already running task which is going to cache data succeeds " +
40
40
s " on a decommissioned executor " ) {
41
- runDecomTest(true , false )
41
+ runDecomTest(true , false , true )
42
42
}
43
43
44
44
test(s " verify that shuffle blocks are migrated. " ) {
45
- runDecomTest(false , true )
45
+ runDecomTest(false , true , false )
46
46
}
47
47
48
48
test(s " verify that both migrations can work at the same time. " ) {
49
- runDecomTest(true , true )
49
+ runDecomTest(true , true , false )
50
50
}
51
51
52
- private def runDecomTest (persist : Boolean , shuffle : Boolean ) = {
52
+ private def runDecomTest (persist : Boolean , shuffle : Boolean , migrateDuring : Boolean ) = {
53
53
val master = s " local-cluster[ ${numExecs}, 1, 1024] "
54
54
val conf = new SparkConf ().setAppName(" test" ).setMaster(master)
55
55
.set(config.Worker .WORKER_DECOMMISSION_ENABLED , true )
56
56
.set(config.STORAGE_DECOMMISSION_ENABLED , true )
57
57
.set(config.STORAGE_RDD_DECOMMISSION_ENABLED , persist)
58
58
.set(config.STORAGE_SHUFFLE_DECOMMISSION_ENABLED , shuffle)
59
+ // Just replicate blocks as fast as we can during testing, there isn't another
60
+ // workload we need to worry about.
59
61
.set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL , 1L )
60
62
63
+ // Force fetching to local disk
64
+ if (shuffle) {
65
+ conf.set(" spark.network.maxRemoteBlockSizeFetchToMem" , " 1" )
66
+ }
67
+
61
68
sc = new SparkContext (master, " test" , conf)
62
69
63
70
// Create input RDD with 10 partitions
64
- val input = sc.parallelize(1 to 10 , 10 )
71
+ val input = sc.parallelize(1 to numParts, numParts )
65
72
val accum = sc.longAccumulator(" mapperRunAccumulator" )
66
73
// Do a count to wait for the executors to be registered.
67
74
input.count()
68
75
69
76
// Create a new RDD where we have sleep in each partition, we are also increasing
70
77
// the value of accumulator in each partition
71
78
val sleepyRdd = input.mapPartitions { x =>
72
- Thread .sleep(250 )
79
+ if (migrateDuring) {
80
+ Thread .sleep(500 )
81
+ }
73
82
accum.add(1 )
74
83
x.map(y => (y, y))
75
84
}
@@ -79,19 +88,26 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext
79
88
}
80
89
81
90
// Listen for the job & block updates
82
- val sem = new Semaphore (0 )
91
+ val taskStartSem = new Semaphore (0 )
92
+ val broadcastSem = new Semaphore (0 )
83
93
val taskEndEvents = ArrayBuffer .empty[SparkListenerTaskEnd ]
84
94
val blocksUpdated = ArrayBuffer .empty[SparkListenerBlockUpdated ]
85
95
sc.addSparkListener(new SparkListener {
96
+
86
97
override def onTaskStart (taskStart : SparkListenerTaskStart ): Unit = {
87
- sem .release()
98
+ taskStartSem .release()
88
99
}
89
100
90
101
override def onTaskEnd (taskEnd : SparkListenerTaskEnd ): Unit = {
91
102
taskEndEvents.append(taskEnd)
92
103
}
93
104
94
105
override def onBlockUpdated (blockUpdated : SparkListenerBlockUpdated ): Unit = {
106
+ // Once broadcast start landing on the executors we're good to proceed.
107
+ // We don't only use task start as it can occur before the work is on the executor.
108
+ if (blockUpdated.blockUpdatedInfo.blockId.isBroadcast) {
109
+ broadcastSem.release()
110
+ }
95
111
blocksUpdated.append(blockUpdated)
96
112
}
97
113
})
@@ -102,19 +118,32 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext
102
118
testRdd.persist()
103
119
}
104
120
105
- // Wait for all of the executors to start
121
+ // Wait for the first executor to start
106
122
TestUtils .waitUntilExecutorsUp(sc = sc,
107
- numExecutors = numExecs ,
123
+ numExecutors = 1 ,
108
124
timeout = 10000 ) // 10s
109
125
110
126
// Start the computation of RDD - this step will also cache the RDD
111
127
val asyncCount = testRdd.countAsync()
112
128
113
- // Wait for the job to have started
114
- sem.acquire(1 )
129
+ // Wait for all of the executors to start
130
+ TestUtils .waitUntilExecutorsUp(sc = sc,
131
+ numExecutors = numExecs,
132
+ timeout = 10000 ) // 10s
115
133
116
- // Give Spark a tiny bit to start the tasks after the listener says hello
117
- Thread .sleep(50 )
134
+ // Wait for the job to have started.
135
+ taskStartSem.acquire(1 )
136
+ // Wait for each executor + driver to have it's broadcast info delivered.
137
+ broadcastSem.acquire((numExecs + 1 ))
138
+
139
+ // Make sure the job is either mid run or otherwise has data to migrate.
140
+ if (migrateDuring) {
141
+ // Give Spark a tiny bit to start executing after the broadcast blocks land.
142
+ // For me this works at 100, set to 300 for system variance.
143
+ Thread .sleep(300 )
144
+ } else {
145
+ ThreadUtils .awaitResult(asyncCount, 15 .seconds)
146
+ }
118
147
119
148
// Decommission one of the executor
120
149
val sched = sc.schedulerBackend.asInstanceOf [StandaloneSchedulerBackend ]
@@ -127,49 +156,58 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext
127
156
128
157
// Wait for job to finish
129
158
val asyncCountResult = ThreadUtils .awaitResult(asyncCount, 15 .seconds)
130
- assert(asyncCountResult === 10 )
131
- // All 10 tasks finished, so accum should have been increased 10 times
132
- assert(accum.value === 10 )
159
+ assert(asyncCountResult === numParts )
160
+ // All tasks finished, so accum should have been increased numParts times
161
+ assert(accum.value === numParts )
133
162
134
163
// All tasks should be successful, nothing should have failed
135
164
sc.listenerBus.waitUntilEmpty()
136
165
if (shuffle) {
137
- // 10 mappers & 10 reducers which succeeded
138
- assert(taskEndEvents.count(_.reason == Success ) === 20 ,
139
- s " Expected 20 tasks got ${taskEndEvents.size} ( ${taskEndEvents}) " )
166
+ // mappers & reducers which succeeded
167
+ assert(taskEndEvents.count(_.reason == Success ) === 2 * numParts ,
168
+ s " Expected ${ 2 * numParts} tasks got ${taskEndEvents.size} ( ${taskEndEvents}) " )
140
169
} else {
141
- // 10 mappers which executed successfully
142
- assert(taskEndEvents.count(_.reason == Success ) === 10 ,
143
- s " Expected 10 tasks got ${taskEndEvents.size} ( ${taskEndEvents}) " )
170
+ // only mappers which executed successfully
171
+ assert(taskEndEvents.count(_.reason == Success ) === numParts ,
172
+ s " Expected ${numParts} tasks got ${taskEndEvents.size} ( ${taskEndEvents}) " )
144
173
}
145
174
146
175
// Wait for our respective blocks to have migrated
147
176
eventually(timeout(15 .seconds), interval(10 .milliseconds)) {
148
177
if (persist) {
149
178
// One of our blocks should have moved.
150
- val blockLocs = blocksUpdated.map{ update =>
179
+ val rddUpdates = blocksUpdated.filter{update =>
180
+ val blockId = update.blockUpdatedInfo.blockId
181
+ blockId.isRDD}
182
+ val blockLocs = rddUpdates.map{ update =>
151
183
(update.blockUpdatedInfo.blockId.name,
152
184
update.blockUpdatedInfo.blockManagerId)}
153
185
val blocksToManagers = blockLocs.groupBy(_._1).mapValues(_.toSet.size)
154
186
assert(! blocksToManagers.filter(_._2 > 1 ).isEmpty,
155
- s " We should have a block that has been on multiple BMs in ${blocksUpdated}" )
187
+ s " We should have a block that has been on multiple BMs in rdds: \n ${rddUpdates} from: \n " +
188
+ s " ${blocksUpdated}\n but instead we got: \n ${blocksToManagers}" )
156
189
}
157
190
// If we're migrating shuffles we look for any shuffle block updates
158
191
// as there is no block update on the initial shuffle block write.
159
192
if (shuffle) {
160
- val numLocs = blocksUpdated.filter{ update =>
193
+ val numDataLocs = blocksUpdated.filter{ update =>
194
+ val blockId = update.blockUpdatedInfo.blockId
195
+ blockId.isInstanceOf [ShuffleDataBlockId ]
196
+ }.toSet.size
197
+ val numIndexLocs = blocksUpdated.filter{ update =>
161
198
val blockId = update.blockUpdatedInfo.blockId
162
- blockId.isShuffle || blockId.isInternalShuffle
199
+ blockId.isInstanceOf [ ShuffleIndexBlockId ]
163
200
}.toSet.size
164
- assert(numLocs > 0 , s " No shuffle block updates in ${blocksUpdated}" )
201
+ assert(numDataLocs >= 1 , s " Expect shuffle data block updates in ${blocksUpdated}" )
202
+ assert(numIndexLocs >= 1 , s " Expect shuffle index block updates in ${blocksUpdated}" )
165
203
}
166
204
}
167
205
168
206
// Since the RDD is cached or shuffled so further usage of same RDD should use the
169
207
// cached data. Original RDD partitions should not be recomputed i.e. accum
170
208
// should have same value like before
171
- assert(testRdd.count() === 10 )
172
- assert(accum.value === 10 )
209
+ assert(testRdd.count() === numParts )
210
+ assert(accum.value === numParts )
173
211
174
212
val storageStatus = sc.env.blockManager.master.getStorageStatus
175
213
val execIdToBlocksMapping = storageStatus.map(
@@ -178,8 +216,8 @@ class BlockManagerDecommissionSuite extends SparkFunSuite with LocalSparkContext
178
216
assert(execIdToBlocksMapping(execToDecommission).keys.filter(_.isRDD).toSeq === Seq (),
179
217
" Cache blocks should be migrated" )
180
218
if (persist) {
181
- // There should still be all 10 RDD blocks cached
182
- assert(execIdToBlocksMapping.values.flatMap(_.keys).count(_.isRDD) === 10 )
219
+ // There should still be all the RDD blocks cached
220
+ assert(execIdToBlocksMapping.values.flatMap(_.keys).count(_.isRDD) === numParts )
183
221
}
184
222
}
185
223
}
0 commit comments