17
17
18
18
package org .apache .spark .scheduler .mesos
19
19
20
- import org .apache .spark .executor .MesosExecutorBackend
21
- import org .scalatest .FunSuite
22
- import org .apache .spark .{SparkConf , SparkContext , LocalSparkContext }
23
- import org .apache .spark .scheduler .{SparkListenerExecutorAdded , LiveListenerBus ,
24
- TaskDescription , WorkerOffer , TaskSchedulerImpl }
25
- import org .apache .spark .scheduler .cluster .ExecutorInfo
26
- import org .apache .spark .scheduler .cluster .mesos .{MemoryUtils , MesosSchedulerBackend }
27
- import org .apache .mesos .SchedulerDriver
28
- import org .apache .mesos .Protos .{ExecutorInfo => MesosExecutorInfo , _ }
29
- import org .apache .mesos .Protos .Value .Scalar
30
- import org .easymock .{Capture , EasyMock }
31
20
import java .nio .ByteBuffer
32
- import java .util .Collections
33
21
import java .util
34
- import org . scalatest . mock . EasyMockSugar
22
+ import java . util . Collections
35
23
36
24
import scala .collection .mutable
37
25
import scala .collection .mutable .ArrayBuffer
38
26
39
- class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with EasyMockSugar {
27
+ import org .apache .mesos .SchedulerDriver
28
+ import org .apache .mesos .Protos ._
29
+ import org .apache .mesos .Protos .Value .Scalar
30
+ import org .mockito .Mockito ._
31
+ import org .mockito .Matchers ._
32
+ import org .mockito .{ArgumentCaptor , Matchers }
33
+ import org .scalatest .FunSuite
34
+ import org .scalatest .mock .MockitoSugar
35
+
36
+ import org .apache .spark .{LocalSparkContext , SparkConf , SparkContext }
37
+ import org .apache .spark .executor .MesosExecutorBackend
38
+ import org .apache .spark .scheduler ._
39
+ import org .apache .spark .scheduler .cluster .ExecutorInfo
40
+ import org .apache .spark .scheduler .cluster .mesos .{MesosSchedulerBackend , MemoryUtils }
41
+
42
+ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with MockitoSugar {
40
43
41
44
test(" check spark-class location correctly" ) {
42
45
val conf = new SparkConf
43
46
conf.set(" spark.mesos.executor.home" , " /mesos-home" )
44
47
45
- val listenerBus = EasyMock .createMock(classOf [LiveListenerBus ])
46
- listenerBus.post(SparkListenerExecutorAdded (EasyMock .anyLong, " s1" , new ExecutorInfo (" host1" , 2 , Map .empty)))
47
- EasyMock .replay(listenerBus)
48
-
49
- val sc = EasyMock .createMock(classOf [SparkContext ])
50
- EasyMock .expect(sc.getSparkHome()).andReturn(Option (" /spark-home" )).anyTimes()
51
- EasyMock .expect(sc.conf).andReturn(conf).anyTimes()
52
- EasyMock .expect(sc.executorEnvs).andReturn(new mutable.HashMap ).anyTimes()
53
- EasyMock .expect(sc.executorMemory).andReturn(100 ).anyTimes()
54
- EasyMock .expect(sc.listenerBus).andReturn(listenerBus)
55
- EasyMock .replay(sc)
56
- val taskScheduler = EasyMock .createMock(classOf [TaskSchedulerImpl ])
57
- EasyMock .expect(taskScheduler.CPUS_PER_TASK ).andReturn(2 ).anyTimes()
58
- EasyMock .replay(taskScheduler)
48
+ val listenerBus = mock[LiveListenerBus ]
49
+ listenerBus.post(
50
+ SparkListenerExecutorAdded (anyLong, " s1" , new ExecutorInfo (" host1" , 2 , Map .empty)))
51
+
52
+ val sc = mock[SparkContext ]
53
+ when(sc.getSparkHome()).thenReturn(Option (" /spark-home" ))
54
+
55
+ when(sc.conf).thenReturn(conf)
56
+ when(sc.executorEnvs).thenReturn(new mutable.HashMap [String , String ])
57
+ when(sc.executorMemory).thenReturn(100 )
58
+ when(sc.listenerBus).thenReturn(listenerBus)
59
+ val taskScheduler = mock[TaskSchedulerImpl ]
60
+ when(taskScheduler.CPUS_PER_TASK ).thenReturn(2 )
59
61
60
62
val mesosSchedulerBackend = new MesosSchedulerBackend (taskScheduler, sc, " master" )
61
63
@@ -84,20 +86,19 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
84
86
.setSlaveId(SlaveID .newBuilder().setValue(s " s ${id.toString}" )).setHostname(s " host ${id.toString}" ).build()
85
87
}
86
88
87
- val driver = EasyMock .createMock( classOf [SchedulerDriver ])
88
- val taskScheduler = EasyMock .createMock( classOf [TaskSchedulerImpl ])
89
+ val driver = mock [SchedulerDriver ]
90
+ val taskScheduler = mock [TaskSchedulerImpl ]
89
91
90
- val listenerBus = EasyMock .createMock( classOf [LiveListenerBus ])
91
- listenerBus.post(SparkListenerExecutorAdded ( EasyMock .anyLong, " s1 " , new ExecutorInfo ( " host1 " , 2 , Map .empty)))
92
- EasyMock .replay(listenerBus )
92
+ val listenerBus = mock [LiveListenerBus ]
93
+ listenerBus.post(
94
+ SparkListenerExecutorAdded (anyLong, " s1 " , new ExecutorInfo ( " host1 " , 2 , Map .empty)) )
93
95
94
- val sc = EasyMock .createMock(classOf [SparkContext ])
95
- EasyMock .expect(sc.executorMemory).andReturn(100 ).anyTimes()
96
- EasyMock .expect(sc.getSparkHome()).andReturn(Option (" /path" )).anyTimes()
97
- EasyMock .expect(sc.executorEnvs).andReturn(new mutable.HashMap ).anyTimes()
98
- EasyMock .expect(sc.conf).andReturn(new SparkConf ).anyTimes()
99
- EasyMock .expect(sc.listenerBus).andReturn(listenerBus)
100
- EasyMock .replay(sc)
96
+ val sc = mock[SparkContext ]
97
+ when(sc.executorMemory).thenReturn(100 )
98
+ when(sc.getSparkHome()).thenReturn(Option (" /path" ))
99
+ when(sc.executorEnvs).thenReturn(new mutable.HashMap [String , String ])
100
+ when(sc.conf).thenReturn(new SparkConf )
101
+ when(sc.listenerBus).thenReturn(listenerBus)
101
102
102
103
val minMem = MemoryUtils .calculateTotalMemory(sc).toInt
103
104
val minCpu = 4
@@ -121,25 +122,29 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
121
122
2
122
123
))
123
124
val taskDesc = new TaskDescription (1L , 0 , " s1" , " n1" , 0 , ByteBuffer .wrap(new Array [Byte ](0 )))
124
- EasyMock .expect(taskScheduler.resourceOffers(EasyMock .eq(expectedWorkerOffers))).andReturn(Seq (Seq (taskDesc)))
125
- EasyMock .expect(taskScheduler.CPUS_PER_TASK ).andReturn(2 ).anyTimes()
126
- EasyMock .replay(taskScheduler)
125
+ when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq (Seq (taskDesc)))
126
+ when(taskScheduler.CPUS_PER_TASK ).thenReturn(2 )
127
127
128
- val capture = new Capture [util.Collection [TaskInfo ]]
129
- EasyMock .expect (
128
+ val capture = ArgumentCaptor .forClass( classOf [util.Collection [TaskInfo ]])
129
+ when (
130
130
driver.launchTasks(
131
- EasyMock .eq(Collections .singleton(mesosOffers.get(0 ).getId)),
132
- EasyMock .capture(capture ),
133
- EasyMock .anyObject (classOf [Filters ])
131
+ Matchers .eq(Collections .singleton(mesosOffers.get(0 ).getId)),
132
+ capture .capture(),
133
+ any (classOf [Filters ])
134
134
)
135
- ).andReturn(Status .valueOf(1 )).once
136
- EasyMock .expect(driver.declineOffer(mesosOffers.get(1 ).getId)).andReturn(Status .valueOf(1 )).times(1 )
137
- EasyMock .expect(driver.declineOffer(mesosOffers.get(2 ).getId)).andReturn(Status .valueOf(1 )).times(1 )
138
- EasyMock .replay(driver)
135
+ ).thenReturn(Status .valueOf(1 ))
136
+ when(driver.declineOffer(mesosOffers.get(1 ).getId)).thenReturn(Status .valueOf(1 ))
137
+ when(driver.declineOffer(mesosOffers.get(2 ).getId)).thenReturn(Status .valueOf(1 ))
139
138
140
139
backend.resourceOffers(driver, mesosOffers)
141
140
142
- EasyMock .verify(driver)
141
+ verify(driver, times(1 )).launchTasks(
142
+ Matchers .eq(Collections .singleton(mesosOffers.get(0 ).getId)),
143
+ capture.capture(),
144
+ any(classOf [Filters ])
145
+ )
146
+ verify(driver, times(1 )).declineOffer(mesosOffers.get(1 ).getId)
147
+ verify(driver, times(1 )).declineOffer(mesosOffers.get(2 ).getId)
143
148
assert(capture.getValue.size() == 1 )
144
149
val taskInfo = capture.getValue.iterator().next()
145
150
assert(taskInfo.getName.equals(" n1" ))
@@ -151,15 +156,13 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
151
156
// Unwanted resources offered on an existing node. Make sure they are declined
152
157
val mesosOffers2 = new java.util.ArrayList [Offer ]
153
158
mesosOffers2.add(createOffer(1 , minMem, minCpu))
154
- EasyMock .reset(taskScheduler)
155
- EasyMock .reset(driver)
156
- EasyMock .expect(taskScheduler.resourceOffers(EasyMock .anyObject(classOf [Seq [WorkerOffer ]])).andReturn(Seq (Seq ())))
157
- EasyMock .expect(taskScheduler.CPUS_PER_TASK ).andReturn(2 ).anyTimes()
158
- EasyMock .replay(taskScheduler)
159
- EasyMock .expect(driver.declineOffer(mesosOffers2.get(0 ).getId)).andReturn(Status .valueOf(1 )).times(1 )
160
- EasyMock .replay(driver)
159
+ reset(taskScheduler)
160
+ reset(driver)
161
+ when(taskScheduler.resourceOffers(any(classOf [Seq [WorkerOffer ]]))).thenReturn(Seq (Seq ()))
162
+ when(taskScheduler.CPUS_PER_TASK ).thenReturn(2 )
163
+ when(driver.declineOffer(mesosOffers2.get(0 ).getId)).thenReturn(Status .valueOf(1 ))
161
164
162
165
backend.resourceOffers(driver, mesosOffers2)
163
- EasyMock . verify(driver)
166
+ verify(driver, times( 1 )).declineOffer(mesosOffers2.get( 0 ).getId )
164
167
}
165
168
}
0 commit comments