2525import java .util .concurrent .Executors ;
2626import java .util .concurrent .LinkedBlockingQueue ;
2727import java .util .concurrent .TimeUnit ;
28+ import java .util .function .Supplier ;
2829
2930/** The default delaying queue implementation. */
3031public class DefaultDelayingQueue <T > extends DefaultWorkQueue <T > implements DelayingQueue <T > {
@@ -34,8 +35,10 @@ public class DefaultDelayingQueue<T> extends DefaultWorkQueue<T> implements Dela
3435 private DelayQueue <WaitForEntry <T >> delayQueue ;
3536 private ConcurrentMap <T , WaitForEntry <T >> waitingEntryByData ;
3637 protected BlockingQueue <WaitForEntry <T >> waitingForAddQueue ;
38+ private Supplier <Instant > timeSource ;
3739
3840 public DefaultDelayingQueue (ExecutorService waitingWorker ) {
41+ this .timeSource = Instant ::now ;
3942 this .delayQueue = new DelayQueue <>();
4043 this .waitingEntryByData = new ConcurrentHashMap <>();
4144 this .waitingForAddQueue = new LinkedBlockingQueue <>(1000 );
@@ -57,10 +60,16 @@ public void addAfter(T item, Duration duration) {
5760 super .add (item );
5861 return ;
5962 }
60- WaitForEntry <T > entry = new WaitForEntry <>(item , duration .addTo (Instant .now ()));
63+ WaitForEntry <T > entry =
64+ new WaitForEntry <>(item , duration .addTo (this .timeSource .get ()), this .timeSource );
6165 this .waitingForAddQueue .offer (entry );
6266 }
6367
68+ // Visible for testing
69+ protected void injectTimeSource (Supplier <Instant > fn ) {
70+ this .timeSource = fn ;
71+ }
72+
6473 private void waitingLoop () {
6574 try {
6675 while (true ) {
@@ -78,7 +87,7 @@ private void waitingLoop() {
7887 // a. if ready, remove it from the delay-queue and push it into underlying
7988 // work-queue
8089 // b. if not, refresh the next ready-at time.
81- Instant now = Instant . now ();
90+ Instant now = this . timeSource . get ();
8291 if (!Duration .between (entry .readyAtMillis , now ).isNegative ()) {
8392 delayQueue .remove (entry );
8493 super .add (entry .data );
@@ -92,7 +101,7 @@ private void waitingLoop() {
92101 WaitForEntry <T > waitForEntry =
93102 waitingForAddQueue .poll (nextReadyAt .toMillis (), TimeUnit .MILLISECONDS );
94103 if (waitForEntry != null ) {
95- if (Duration .between (waitForEntry .readyAtMillis , Instant . now ()).isNegative ()) {
104+ if (Duration .between (waitForEntry .readyAtMillis , this . timeSource . get ()).isNegative ()) {
96105 // the item is not yet ready, insert it to the delay-queue
97106 insert (this .delayQueue , this .waitingEntryByData , waitForEntry );
98107 } else {
@@ -126,17 +135,19 @@ private void insert(
126135 // WaitForEntry holds the data to add and the time it should be added.
127136 private static class WaitForEntry <T > implements Delayed {
128137
129- private WaitForEntry (T data , Temporal readyAtMillis ) {
138+ private WaitForEntry (T data , Temporal readyAtMillis , Supplier < Instant > timeSource ) {
130139 this .data = data ;
131140 this .readyAtMillis = readyAtMillis ;
141+ this .timeSource = timeSource ;
132142 }
133143
134144 private T data ;
135145 private Temporal readyAtMillis ;
146+ private Supplier <Instant > timeSource ;
136147
137148 @ Override
138149 public long getDelay (TimeUnit unit ) {
139- Duration duration = Duration .between (Instant . now (), readyAtMillis );
150+ Duration duration = Duration .between (this . timeSource . get (), readyAtMillis );
140151 return unit .convert (duration .toMillis (), TimeUnit .MILLISECONDS );
141152 }
142153
0 commit comments