2525import java .util .List ;
2626import java .util .concurrent .ExecutorService ;
2727import java .util .concurrent .Executors ;
28+ import java .util .concurrent .Semaphore ;
2829import java .util .concurrent .atomic .AtomicBoolean ;
2930import org .junit .After ;
3031import org .junit .Before ;
3132import org .junit .Test ;
3233import org .junit .runner .RunWith ;
3334import org .mockito .Mock ;
35+ import org .mockito .stubbing .Answer ;
3436import org .mockito .junit .MockitoJUnitRunner ;
3537
3638@ RunWith (MockitoJUnitRunner .class )
@@ -40,16 +42,6 @@ public class DefaultControllerTest {
4042 private RateLimitingQueue <Request > workQueue =
4143 new DefaultRateLimitingQueue <>(Executors .newSingleThreadExecutor ());
4244
43- private final int stepCooldownIntervalInMillis = 500 ;
44-
45- private void cooldown () {
46- try {
47- Thread .sleep (stepCooldownIntervalInMillis );
48- } catch (InterruptedException e ) {
49- e .printStackTrace ();
50- }
51- }
52-
5345 @ Before
5446 public void setUp () throws Exception {}
5547
@@ -67,33 +59,53 @@ public void testStartingStoppingController() throws InterruptedException {
6759 testController .setWorkerThreadPool (Executors .newScheduledThreadPool (1 ));
6860
6961 Request request1 = new Request ("test1" );
70- when (mockReconciler .reconcile (request1 )).thenReturn (new Result (false ));
62+ final Semaphore latch = new Semaphore (1 );
63+ latch .acquire ();
64+ when (mockReconciler .reconcile (request1 ))
65+ .thenAnswer (
66+ new Answer () {
67+ public Object answer (InvocationOnMock invocation ) {
68+ latch .release ();
69+ return new Result (false );
70+ }
71+ });
7172
7273 // emit an event when the controller hasn't started
7374 workQueue .add (request1 );
74- cooldown ();
75- verify (mockReconciler , times (0 )).reconcile (request1 );
75+ latch .acquire ();
7676
77+ verify (mockReconciler , times (0 )).reconcile (request1 );
7778 controllerThead .submit (testController ::run );
7879
79- cooldown ();
80+ latch . acquire ();
8081 verify (mockReconciler , times (1 )).reconcile (request1 );
8182
8283 testController .shutdown ();
8384 Request request2 = new Request ("test2" );
8485
8586 // emit an event after the controller has shutdown
8687 workQueue .add (request2 );
87- cooldown ();
88+ latch .acquire ();
89+
8890 verify (mockReconciler , times (0 )).reconcile (request2 );
8991 }
9092
9193 @ Test
9294 public void testControllerWontStartBeforeReady () throws InterruptedException {
9395
9496 Request request1 = new Request ("test1" );
95- when (mockReconciler .reconcile (request1 )).thenReturn (new Result (false ));
97+ final Semaphore latch = new Semaphore (1 );
98+
99+ when (mockReconciler .reconcile (request1 ))
100+ .thenAnswer (
101+ new Answer () {
102+ public Object answer (InvocationOnMock invocation ) {
103+ latch .release ();
104+ return new Result (false );
105+ }
106+ });
96107
108+ latch .acquire ();
97109 AtomicBoolean ready = new AtomicBoolean (false );
98110 DefaultController testController =
99111 new DefaultController ("" , mockReconciler , workQueue , () -> ready .get ());
@@ -105,12 +117,12 @@ public void testControllerWontStartBeforeReady() throws InterruptedException {
105117
106118 // emit an event when the controller hasn't been ready
107119 workQueue .add (request1 );
108- cooldown ();
120+ latch . acquire ();
109121
110122 verify (mockReconciler , times (0 )).reconcile (request1 );
111123
112124 ready .set (true );
113- cooldown ();
125+ latch . acquire ();
114126 verify (mockReconciler , times (1 )).reconcile (request1 );
115127 }
116128
@@ -120,18 +132,23 @@ public void testControllerKeepsWorkingWhenReconcilerAbortsWithRuntimeException()
120132 AtomicBoolean aborts = new AtomicBoolean (true );
121133 AtomicBoolean resumed = new AtomicBoolean (false );
122134 List <Request > finishedRequests = new ArrayList <>();
135+ final Semaphore latch = new Semaphore (1 );
123136 DefaultController testController =
124137 new DefaultController (
125138 "" ,
126139 new Reconciler () {
127140 @ Override
128141 public Result reconcile (Request request ) {
129- if (aborts .get ()) {
130- throw new RuntimeException ("Oops!!" );
142+ try {
143+ if (aborts .get ()) {
144+ throw new RuntimeException ("Oops!!" );
145+ }
146+ resumed .set (true );
147+ finishedRequests .add (request );
148+ return new Result (false );
149+ } finally {
150+ latch .release ();
131151 }
132- resumed .set (true );
133- finishedRequests .add (request );
134- return new Result (false );
135152 }
136153 },
137154 workQueue );
@@ -142,13 +159,13 @@ public Result reconcile(Request request) {
142159
143160 Request request1 = new Request ("test1" );
144161 workQueue .add (request1 );
145- cooldown ();
162+ latch . acquire ();
146163
147164 aborts .set (false );
148165 // emit another event, the previous one has been backoff'd
149166 Request request2 = new Request ("test2" );
150167 workQueue .add (request2 );
151- cooldown ();
168+ latch . acquire ();
152169 testController .shutdown ();
153170
154171 assertTrue (resumed .get ());
0 commit comments