Skip to content

Commit

Permalink
SynchronizationContextTest changes for scheduleFixedDelay with Duration
Browse files Browse the repository at this point in the history
  • Loading branch information
SreeramdasLavanya committed Sep 20, 2024
1 parent 782a44a commit ab97045
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 70 deletions.
33 changes: 33 additions & 0 deletions api/src/main/java/io/grpc/SynchronizationContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.google.common.base.Preconditions.checkState;

import java.lang.Thread.UncaughtExceptionHandler;
import java.time.Duration;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -194,6 +195,38 @@ public String toString() {
}


public final ScheduledHandle scheduleWithFixedDelay(
final Runnable task, Duration initialDelay, Duration delay, TimeUnit unit,
ScheduledExecutorService timerService) {
final ManagedRunnable runnable = new ManagedRunnable(task);
System.out.println("Inside Durationcall");
ScheduledFuture<?> future = timerService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
execute(runnable);
}

@Override
public String toString() {
return task.toString() + "(scheduled in SynchronizationContext with delay of " + delay
+ ")";
}
}, toNanosSaturated(initialDelay), toNanosSaturated(delay), unit);
return new ScheduledHandle(runnable, future);
}
static long toNanosSaturated(Duration duration) {
// Using a try/catch seems lazy, but the catch block will rarely get invoked (except for
// durations longer than approximately +/- 292 years).
try {
//long delay = TimeUnit.MILLISECONDS.convert(500, TimeUnit.SECONDS); // Converts 500 seconds to milliseconds
return duration.toNanos();
//return TimeUnit.NANOSECONDS.convert(duration);

} catch (ArithmeticException tooBig) {
return duration.isNegative() ? Long.MIN_VALUE : Long.MAX_VALUE;
}
}

private static class ManagedRunnable implements Runnable {
final Runnable task;
boolean isCancelled;
Expand Down
172 changes: 102 additions & 70 deletions api/src/test/java/io/grpc/SynchronizationContextTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import com.google.common.util.concurrent.testing.TestingExecutors;
import io.grpc.SynchronizationContext.ScheduledHandle;
import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
Expand All @@ -52,6 +53,7 @@
*/
@RunWith(JUnit4.class)
public class SynchronizationContextTest {

private final BlockingQueue<Throwable> uncaughtErrors = new LinkedBlockingQueue<>();
private final SynchronizationContext syncContext = new SynchronizationContext(
new Thread.UncaughtExceptionHandler() {
Expand All @@ -72,8 +74,9 @@ public void uncaughtException(Thread t, Throwable e) {

@Mock
private Runnable task3;

@After public void tearDown() {

@After
public void tearDown() {
assertThat(uncaughtErrors).isEmpty();
}

Expand Down Expand Up @@ -105,36 +108,36 @@ public void multiThread() throws Exception {
final AtomicReference<Thread> task2Thread = new AtomicReference<>();

doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) {
task1Thread.set(Thread.currentThread());
task1Running.countDown();
try {
assertTrue(task1Proceed.await(5, TimeUnit.SECONDS));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return null;
@Override
public Void answer(InvocationOnMock invocation) {
task1Thread.set(Thread.currentThread());
task1Running.countDown();
try {
assertTrue(task1Proceed.await(5, TimeUnit.SECONDS));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).when(task1).run();
return null;
}
}).when(task1).run();

doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) {
task2Thread.set(Thread.currentThread());
return null;
}
}).when(task2).run();
@Override
public Void answer(InvocationOnMock invocation) {
task2Thread.set(Thread.currentThread());
return null;
}
}).when(task2).run();

Thread sideThread = new Thread() {
@Override
public void run() {
syncContext.executeLater(task1);
task1Added.countDown();
syncContext.drain();
sideThreadDone.countDown();
}
};
@Override
public void run() {
syncContext.executeLater(task1);
task1Added.countDown();
syncContext.drain();
sideThreadDone.countDown();
}
};
sideThread.start();

assertTrue(task1Added.await(5, TimeUnit.SECONDS));
Expand Down Expand Up @@ -162,26 +165,26 @@ public void throwIfNotInThisSynchronizationContext() throws Exception {
final CountDownLatch task1Proceed = new CountDownLatch(1);

doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) {
task1Running.countDown();
syncContext.throwIfNotInThisSynchronizationContext();
try {
assertTrue(task1Proceed.await(5, TimeUnit.SECONDS));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
taskSuccess.set(true);
return null;
@Override
public Void answer(InvocationOnMock invocation) {
task1Running.countDown();
syncContext.throwIfNotInThisSynchronizationContext();
try {
assertTrue(task1Proceed.await(5, TimeUnit.SECONDS));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).when(task1).run();
taskSuccess.set(true);
return null;
}
}).when(task1).run();

Thread sideThread = new Thread() {
@Override
public void run() {
syncContext.execute(task1);
}
};
@Override
public void run() {
syncContext.execute(task1);
}
};
sideThread.start();

assertThat(task1Running.await(5, TimeUnit.SECONDS)).isTrue();
Expand Down Expand Up @@ -215,11 +218,11 @@ public void taskThrows() {
InOrder inOrder = inOrder(task1, task2, task3);
final RuntimeException e = new RuntimeException("Simulated");
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) {
throw e;
}
}).when(task2).run();
@Override
public Void answer(InvocationOnMock invocation) {
throw e;
}
}).when(task2).run();
syncContext.executeLater(task1);
syncContext.executeLater(task2);
syncContext.executeLater(task3);
Expand All @@ -246,6 +249,24 @@ public void schedule() {
verify(task1).run();
}

@Test
public void testScheduleWithFixedDelay() {
MockScheduledExecutorService executorService = new MockScheduledExecutorService();

ScheduledHandle handle =
syncContext.scheduleWithFixedDelay(task1, Duration.ofNanos(110), Duration.ofNanos(110),
TimeUnit.NANOSECONDS, executorService);

assertThat(executorService.delay)
.isEqualTo(executorService.unit.convert(110, TimeUnit.NANOSECONDS));
assertThat(handle.isPending()).isTrue();
verify(task1, never()).run();

executorService.command.run();
assertThat(handle.isPending()).isFalse();
verify(task1).run();
}

@Test
public void scheduleDueImmediately() {
MockScheduledExecutorService executorService = new MockScheduledExecutorService();
Expand Down Expand Up @@ -288,28 +309,28 @@ public void scheduledHandle_cancelRacesWithTimerExpiration() throws Exception {
final CountDownLatch sideThreadDone = new CountDownLatch(1);

doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) {
task1Running.countDown();
try {
ScheduledHandle task2Handle;
assertThat(task2Handle = task2HandleQueue.poll(5, TimeUnit.SECONDS)).isNotNull();
task2Handle.cancel();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
task1Done.set(true);
return null;
@Override
public Void answer(InvocationOnMock invocation) {
task1Running.countDown();
try {
ScheduledHandle task2Handle;
assertThat(task2Handle = task2HandleQueue.poll(5, TimeUnit.SECONDS)).isNotNull();
task2Handle.cancel();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).when(task1).run();
task1Done.set(true);
return null;
}
}).when(task1).run();

Thread sideThread = new Thread() {
@Override
public void run() {
syncContext.execute(task1);
sideThreadDone.countDown();
}
};
@Override
public void run() {
syncContext.execute(task1);
sideThreadDone.countDown();
}
};

ScheduledHandle handle = syncContext.schedule(task2, 10, TimeUnit.NANOSECONDS, executorService);
// This will execute and block in task1
Expand Down Expand Up @@ -340,22 +361,33 @@ public void run() {
}

static class MockScheduledExecutorService extends ForwardingScheduledExecutorService {

private ScheduledExecutorService delegate = TestingExecutors.noOpScheduledExecutor();

Runnable command;
long delay;
TimeUnit unit;
ScheduledFuture<?> future;

@Override public ScheduledExecutorService delegate() {
@Override
public ScheduledExecutorService delegate() {
return delegate;
}

@Override public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
this.command = command;
this.delay = delay;
this.unit = unit;
return future = super.schedule(command, delay, unit);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long intialDelay, long delay,
TimeUnit unit) {
this.command = command;
this.delay = delay;
this.unit = unit;
return future = super.scheduleWithFixedDelay(command, intialDelay, delay, unit);
}
}
}

0 comments on commit ab97045

Please sign in to comment.