Skip to content

Commit f2134b2

Browse files
authored
Avoid a race condition in RetryingExecutorService (rosjava#278)
This fixes rosjava#274, which occurs when the task completes before the `Future` is added to the `callables` map (because the corresponding submit() is still executing). In that case, `callable` is null, which causes latches.get(callable) to throw an NPE. The bug can be reproduced with the added test by adding `Thread.sleep(1000)` below the `completionService.submit` call.
1 parent 1dd68e3 commit f2134b2

File tree

2 files changed

+74
-3
lines changed

2 files changed

+74
-3
lines changed

rosjava/src/main/java/org/ros/concurrent/RetryingExecutorService.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,13 @@ private class RetryLoop extends CancellableLoop {
6363
@Override
6464
public void loop() throws InterruptedException {
6565
Future<Boolean> future = completionService.take();
66-
final Callable<Boolean> callable = callables.remove(future);
66+
Callable<Boolean> callable;
67+
CountDownLatch latch;
68+
// Grab the mutex to make sure submit() of the future that we took is finished.
69+
synchronized (mutex) {
70+
callable = callables.remove(future);
71+
latch = latches.get(callable);
72+
}
6773
boolean retry;
6874
try {
6975
retry = future.get();
@@ -74,14 +80,15 @@ public void loop() throws InterruptedException {
7480
if (DEBUG) {
7581
log.info("Retry requested.");
7682
}
83+
final Callable<Boolean> finalCallable = callable;
7784
scheduledExecutorService.schedule(new Runnable() {
7885
@Override
7986
public void run() {
80-
submit(callable);
87+
submit(finalCallable);
8188
}
8289
}, retryDelay, retryTimeUnit);
8390
} else {
84-
latches.get(callable).countDown();
91+
latch.countDown();
8592
}
8693
}
8794
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright (C) 2012 Google Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package org.ros.concurrent;
18+
19+
import static org.mockito.Mockito.*;
20+
21+
22+
import java.util.concurrent.Callable;
23+
import java.util.concurrent.Executors;
24+
import java.util.concurrent.ScheduledExecutorService;
25+
import java.util.concurrent.TimeUnit;
26+
import org.junit.Before;
27+
import org.junit.Test;
28+
29+
/**
30+
* @author rodrigoq@google.com (Rodrigo Queiro)
31+
*/
32+
public class RetryingExecutorServiceTest {
33+
34+
private ScheduledExecutorService executorService;
35+
36+
@Before
37+
public void before() {
38+
executorService = Executors.newScheduledThreadPool(4);
39+
}
40+
41+
@Test
42+
public void testNoRetry_calledOnce() throws Exception {
43+
RetryingExecutorService service = new RetryingExecutorService(executorService);
44+
Callable<Boolean> callable = mock(Callable.class);
45+
when(callable.call()).thenReturn(false);
46+
service.submit(callable);
47+
service.shutdown(10, TimeUnit.SECONDS);
48+
verify(callable, times(1)).call();
49+
}
50+
51+
@Test
52+
public void testOneRetry_calledTwice() throws Exception {
53+
RetryingExecutorService service = new RetryingExecutorService(executorService);
54+
service.setRetryDelay(0, TimeUnit.SECONDS);
55+
Callable<Boolean> callable = mock(Callable.class);
56+
when(callable.call()).thenReturn(true).thenReturn(false);
57+
service.submit(callable);
58+
59+
// Call verify() with a timeout before calling shutdown, as shutdown() will prevent further
60+
// retries.
61+
verify(callable, timeout(10000).times(2)).call();
62+
service.shutdown(10, TimeUnit.SECONDS);
63+
}
64+
}

0 commit comments

Comments
 (0)