Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;

@RunWith(MockitoJUnitRunner.class)
public class DefaultControllerTest {
Expand Down Expand Up @@ -67,23 +70,34 @@ public void testStartingStoppingController() throws InterruptedException {
testController.setWorkerThreadPool(Executors.newScheduledThreadPool(1));

Request request1 = new Request("test1");
when(mockReconciler.reconcile(request1)).thenReturn(new Result(false));
final Semaphore latch = new Semaphore(1);
latch.acquire();
when(mockReconciler.reconcile(request1))
.thenAnswer(
new Answer() {
public Object answer(InvocationOnMock invocation) {
latch.release();
return new Result(false);
}
});

// emit an event when the controller hasn't started
workQueue.add(request1);
// I don't love sleeping here, but we're waiting for something we don't expect
// to happen, so there's no good way to do it other than sleep (that I can think of)
cooldown();
verify(mockReconciler, times(0)).reconcile(request1);

controllerThead.submit(testController::run);

cooldown();
latch.acquire();
verify(mockReconciler, times(1)).reconcile(request1);

testController.shutdown();
Request request2 = new Request("test2");

// emit an event after the controller has shutdown
workQueue.add(request2);
// as above wrt sleep
cooldown();
verify(mockReconciler, times(0)).reconcile(request2);
}
Expand All @@ -92,8 +106,18 @@ public void testStartingStoppingController() throws InterruptedException {
public void testControllerWontStartBeforeReady() throws InterruptedException {

Request request1 = new Request("test1");
when(mockReconciler.reconcile(request1)).thenReturn(new Result(false));
final Semaphore latch = new Semaphore(1);

when(mockReconciler.reconcile(request1))
.thenAnswer(
new Answer() {
public Object answer(InvocationOnMock invocation) {
latch.release();
return new Result(false);
}
});

latch.acquire();
AtomicBoolean ready = new AtomicBoolean(false);
DefaultController testController =
new DefaultController("", mockReconciler, workQueue, () -> ready.get());
Expand All @@ -105,12 +129,13 @@ public void testControllerWontStartBeforeReady() throws InterruptedException {

// emit an event when the controller hasn't been ready
workQueue.add(request1);
// As above wrt sleep
cooldown();

verify(mockReconciler, times(0)).reconcile(request1);

ready.set(true);
cooldown();
latch.acquire();
verify(mockReconciler, times(1)).reconcile(request1);
}

Expand All @@ -120,18 +145,23 @@ public void testControllerKeepsWorkingWhenReconcilerAbortsWithRuntimeException()
AtomicBoolean aborts = new AtomicBoolean(true);
AtomicBoolean resumed = new AtomicBoolean(false);
List<Request> finishedRequests = new ArrayList<>();
final Semaphore latch = new Semaphore(1);
DefaultController testController =
new DefaultController(
"",
new Reconciler() {
@Override
public Result reconcile(Request request) {
if (aborts.get()) {
throw new RuntimeException("Oops!!");
try {
if (aborts.get()) {
throw new RuntimeException("Oops!!");
}
resumed.set(true);
finishedRequests.add(request);
return new Result(false);
} finally {
latch.release();
}
resumed.set(true);
finishedRequests.add(request);
return new Result(false);
}
},
workQueue);
Expand All @@ -142,13 +172,13 @@ public Result reconcile(Request request) {

Request request1 = new Request("test1");
workQueue.add(request1);
cooldown();
latch.acquire();

aborts.set(false);
// emit another event, the previous one has been backoff'd
Request request2 = new Request("test2");
workQueue.add(request2);
cooldown();
latch.acquire();
testController.shutdown();

assertTrue(resumed.get());
Expand Down