Skip to content

Commit ebb4034

Browse files
committed
Fix racy test
1 parent 38c553a commit ebb4034

File tree

3 files changed

+16
-1
lines changed

3 files changed

+16
-1
lines changed

extended/src/main/java/io/kubernetes/client/extended/workqueue/DefaultDelayingQueue.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.Executors;
2626
import java.util.concurrent.LinkedBlockingQueue;
2727
import java.util.concurrent.TimeUnit;
28+
import java.util.function.Supplier;
2829

2930
/** The default delaying queue implementation. */
3031
public class DefaultDelayingQueue<T> extends DefaultWorkQueue<T> implements DelayingQueue<T> {
@@ -34,8 +35,10 @@ public class DefaultDelayingQueue<T> extends DefaultWorkQueue<T> implements Dela
3435
private DelayQueue<WaitForEntry<T>> delayQueue;
3536
private ConcurrentMap<T, WaitForEntry<T>> waitingEntryByData;
3637
protected BlockingQueue<WaitForEntry<T>> waitingForAddQueue;
38+
private Supplier<Instant> timeSource;
3739

3840
public DefaultDelayingQueue(ExecutorService waitingWorker) {
41+
this.timeSource = () -> { return Instant.now(); };
3942
this.delayQueue = new DelayQueue<>();
4043
this.waitingEntryByData = new ConcurrentHashMap<>();
4144
this.waitingForAddQueue = new LinkedBlockingQueue<>(1000);
@@ -61,6 +64,11 @@ public void addAfter(T item, Duration duration) {
6164
this.waitingForAddQueue.offer(entry);
6265
}
6366

67+
// Visible for testing
68+
protected void injectTimeSource(Supplier<Instant> fn) {
69+
this.timeSource = fn;
70+
}
71+
6472
private void waitingLoop() {
6573
try {
6674
while (true) {

extended/src/main/java/io/kubernetes/client/extended/workqueue/DefaultRateLimitingQueue.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.kubernetes.client.extended.workqueue.ratelimiter.RateLimiter;
1717
import java.util.concurrent.ExecutorService;
1818
import java.util.concurrent.Executors;
19+
import java.time.Instant;
1920

2021
/** The default rate limiting queue implementation. */
2122
public class DefaultRateLimitingQueue<T> extends DefaultDelayingQueue<T>

extended/src/test/java/io/kubernetes/client/extended/workqueue/DefaultDelayingQueueTest.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,25 @@
1717

1818
import io.kubernetes.client.extended.wait.Wait;
1919
import java.time.Duration;
20+
import java.time.Instant;
2021
import org.junit.Test;
2122

2223
public class DefaultDelayingQueueTest {
2324

2425
@Test
2526
public void testSimpleDelayingQueue() throws Exception {
27+
final Instant staticTime = Instant.now();
2628
DefaultDelayingQueue<String> queue = new DefaultDelayingQueue<>();
29+
// Hold time still
30+
queue.injectTimeSource(() -> { return staticTime; });
2731
queue.addAfter("foo", Duration.ofMillis(50));
2832

33+
// Verify that we haven't released it
2934
assertTrue(waitForWaitingQueueToFill(queue));
3035
assertTrue(queue.length() == 0);
3136

32-
Thread.sleep(60L);
37+
// Advance time
38+
queue.injectTimeSource(() -> { return staticTime.plusMillis(100); });
3339
assertTrue(waitForAdded(queue, 1));
3440
String item = queue.get();
3541
queue.done(item);

0 commit comments

Comments
 (0)