diff --git a/api/src/main/java/io/grpc/SynchronizationContext.java b/api/src/main/java/io/grpc/SynchronizationContext.java index 5a7677ac15f..046988b4779 100644 --- a/api/src/main/java/io/grpc/SynchronizationContext.java +++ b/api/src/main/java/io/grpc/SynchronizationContext.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkState; import java.lang.Thread.UncaughtExceptionHandler; +import java.time.Duration; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; @@ -194,6 +195,38 @@ public String toString() { } + public final ScheduledHandle scheduleWithFixedDelay( + final Runnable task, Duration initialDelay, Duration delay, TimeUnit unit, + ScheduledExecutorService timerService) { + final ManagedRunnable runnable = new ManagedRunnable(task); + System.out.println("Inside Durationcall"); + ScheduledFuture future = timerService.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + execute(runnable); + } + + @Override + public String toString() { + return task.toString() + "(scheduled in SynchronizationContext with delay of " + delay + + ")"; + } + }, toNanosSaturated(initialDelay), toNanosSaturated(delay), unit); + return new ScheduledHandle(runnable, future); + } + static long toNanosSaturated(Duration duration) { + // Using a try/catch seems lazy, but the catch block will rarely get invoked (except for + // durations longer than approximately +/- 292 years). + try { + //long delay = TimeUnit.MILLISECONDS.convert(500, TimeUnit.SECONDS); // Converts 500 seconds to milliseconds + return duration.toNanos(); + //return TimeUnit.NANOSECONDS.convert(duration); + + } catch (ArithmeticException tooBig) { + return duration.isNegative() ? Long.MIN_VALUE : Long.MAX_VALUE; + } + } + private static class ManagedRunnable implements Runnable { final Runnable task; boolean isCancelled; diff --git a/api/src/test/java/io/grpc/SynchronizationContextTest.java b/api/src/test/java/io/grpc/SynchronizationContextTest.java index 3d5e7fa42b9..a4d9e80837f 100644 --- a/api/src/test/java/io/grpc/SynchronizationContextTest.java +++ b/api/src/test/java/io/grpc/SynchronizationContextTest.java @@ -27,6 +27,7 @@ import com.google.common.util.concurrent.testing.TestingExecutors; import io.grpc.SynchronizationContext.ScheduledHandle; +import java.time.Duration; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; @@ -52,6 +53,7 @@ */ @RunWith(JUnit4.class) public class SynchronizationContextTest { + private final BlockingQueue uncaughtErrors = new LinkedBlockingQueue<>(); private final SynchronizationContext syncContext = new SynchronizationContext( new Thread.UncaughtExceptionHandler() { @@ -72,8 +74,9 @@ public void uncaughtException(Thread t, Throwable e) { @Mock private Runnable task3; - - @After public void tearDown() { + + @After + public void tearDown() { assertThat(uncaughtErrors).isEmpty(); } @@ -105,36 +108,36 @@ public void multiThread() throws Exception { final AtomicReference task2Thread = new AtomicReference<>(); doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - task1Thread.set(Thread.currentThread()); - task1Running.countDown(); - try { - assertTrue(task1Proceed.await(5, TimeUnit.SECONDS)); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - return null; + @Override + public Void answer(InvocationOnMock invocation) { + task1Thread.set(Thread.currentThread()); + task1Running.countDown(); + try { + assertTrue(task1Proceed.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + throw new RuntimeException(e); } - }).when(task1).run(); + return null; + } + }).when(task1).run(); doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - task2Thread.set(Thread.currentThread()); - return null; - } - }).when(task2).run(); + @Override + public Void answer(InvocationOnMock invocation) { + task2Thread.set(Thread.currentThread()); + return null; + } + }).when(task2).run(); Thread sideThread = new Thread() { - @Override - public void run() { - syncContext.executeLater(task1); - task1Added.countDown(); - syncContext.drain(); - sideThreadDone.countDown(); - } - }; + @Override + public void run() { + syncContext.executeLater(task1); + task1Added.countDown(); + syncContext.drain(); + sideThreadDone.countDown(); + } + }; sideThread.start(); assertTrue(task1Added.await(5, TimeUnit.SECONDS)); @@ -162,26 +165,26 @@ public void throwIfNotInThisSynchronizationContext() throws Exception { final CountDownLatch task1Proceed = new CountDownLatch(1); doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - task1Running.countDown(); - syncContext.throwIfNotInThisSynchronizationContext(); - try { - assertTrue(task1Proceed.await(5, TimeUnit.SECONDS)); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - taskSuccess.set(true); - return null; + @Override + public Void answer(InvocationOnMock invocation) { + task1Running.countDown(); + syncContext.throwIfNotInThisSynchronizationContext(); + try { + assertTrue(task1Proceed.await(5, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + throw new RuntimeException(e); } - }).when(task1).run(); + taskSuccess.set(true); + return null; + } + }).when(task1).run(); Thread sideThread = new Thread() { - @Override - public void run() { - syncContext.execute(task1); - } - }; + @Override + public void run() { + syncContext.execute(task1); + } + }; sideThread.start(); assertThat(task1Running.await(5, TimeUnit.SECONDS)).isTrue(); @@ -215,11 +218,11 @@ public void taskThrows() { InOrder inOrder = inOrder(task1, task2, task3); final RuntimeException e = new RuntimeException("Simulated"); doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - throw e; - } - }).when(task2).run(); + @Override + public Void answer(InvocationOnMock invocation) { + throw e; + } + }).when(task2).run(); syncContext.executeLater(task1); syncContext.executeLater(task2); syncContext.executeLater(task3); @@ -246,6 +249,24 @@ public void schedule() { verify(task1).run(); } + @Test + public void testScheduleWithFixedDelay() { + MockScheduledExecutorService executorService = new MockScheduledExecutorService(); + + ScheduledHandle handle = + syncContext.scheduleWithFixedDelay(task1, Duration.ofNanos(110), Duration.ofNanos(110), + TimeUnit.NANOSECONDS, executorService); + + assertThat(executorService.delay) + .isEqualTo(executorService.unit.convert(110, TimeUnit.NANOSECONDS)); + assertThat(handle.isPending()).isTrue(); + verify(task1, never()).run(); + + executorService.command.run(); + assertThat(handle.isPending()).isFalse(); + verify(task1).run(); + } + @Test public void scheduleDueImmediately() { MockScheduledExecutorService executorService = new MockScheduledExecutorService(); @@ -288,28 +309,28 @@ public void scheduledHandle_cancelRacesWithTimerExpiration() throws Exception { final CountDownLatch sideThreadDone = new CountDownLatch(1); doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) { - task1Running.countDown(); - try { - ScheduledHandle task2Handle; - assertThat(task2Handle = task2HandleQueue.poll(5, TimeUnit.SECONDS)).isNotNull(); - task2Handle.cancel(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - task1Done.set(true); - return null; + @Override + public Void answer(InvocationOnMock invocation) { + task1Running.countDown(); + try { + ScheduledHandle task2Handle; + assertThat(task2Handle = task2HandleQueue.poll(5, TimeUnit.SECONDS)).isNotNull(); + task2Handle.cancel(); + } catch (InterruptedException e) { + throw new RuntimeException(e); } - }).when(task1).run(); + task1Done.set(true); + return null; + } + }).when(task1).run(); Thread sideThread = new Thread() { - @Override - public void run() { - syncContext.execute(task1); - sideThreadDone.countDown(); - } - }; + @Override + public void run() { + syncContext.execute(task1); + sideThreadDone.countDown(); + } + }; ScheduledHandle handle = syncContext.schedule(task2, 10, TimeUnit.NANOSECONDS, executorService); // This will execute and block in task1 @@ -340,6 +361,7 @@ public void run() { } static class MockScheduledExecutorService extends ForwardingScheduledExecutorService { + private ScheduledExecutorService delegate = TestingExecutors.noOpScheduledExecutor(); Runnable command; @@ -347,15 +369,25 @@ static class MockScheduledExecutorService extends ForwardingScheduledExecutorSer TimeUnit unit; ScheduledFuture future; - @Override public ScheduledExecutorService delegate() { + @Override + public ScheduledExecutorService delegate() { return delegate; } - @Override public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { this.command = command; this.delay = delay; this.unit = unit; return future = super.schedule(command, delay, unit); } + @Override + public ScheduledFuture scheduleWithFixedDelay(Runnable command, long intialDelay, long delay, + TimeUnit unit) { + this.command = command; + this.delay = delay; + this.unit = unit; + return future = super.scheduleWithFixedDelay(command, intialDelay, delay, unit); + } } }