Skip to content

Commit d09b88e

Browse files
wplong11velo
andauthored
Support retry cancel feature fully for AsyncFeign (#1801)
* Support retry cancel feature for AsyncFeign * Support retry cancel feature fully for AsyncFeign Co-authored-by: Marvin Froeder <velo@users.noreply.github.com>
1 parent 950935d commit d09b88e

File tree

3 files changed

+101
-14
lines changed

3 files changed

+101
-14
lines changed

core/src/main/java/feign/AsynchronousMethodHandler.java

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -88,18 +88,18 @@ public Object invoke(Object[] argv) throws Throwable {
8888
private CompletableFuture<Object> executeAndDecode(RequestTemplate template,
8989
Options options,
9090
Retryer retryer) {
91-
CompletableFuture<Object> resultFuture = new CompletableFuture<>();
91+
CancellableFuture<Object> resultFuture = new CancellableFuture<>();
9292

9393
executeAndDecode(template, options)
9494
.whenComplete((response, throwable) -> {
9595
if (throwable != null) {
96-
if (shouldRetry(retryer, throwable, resultFuture)) {
96+
if (!resultFuture.isDone() && shouldRetry(retryer, throwable, resultFuture)) {
9797
if (logLevel != Logger.Level.NONE) {
9898
logger.logRetry(metadata.configKey(), logLevel);
9999
}
100100

101-
executeAndDecode(template, options, retryer)
102-
.whenComplete(pipeTo(resultFuture));
101+
resultFuture.setInner(
102+
executeAndDecode(template, options, retryer));
103103
}
104104
} else {
105105
resultFuture.complete(response);
@@ -109,6 +109,38 @@ private CompletableFuture<Object> executeAndDecode(RequestTemplate template,
109109
return resultFuture;
110110
}
111111

112+
private static class CancellableFuture<T> extends CompletableFuture<T> {
113+
private CompletableFuture<T> inner = null;
114+
115+
public void setInner(CompletableFuture<T> value) {
116+
inner = value;
117+
inner.whenComplete(pipeTo(this));
118+
}
119+
120+
@Override
121+
public boolean cancel(boolean mayInterruptIfRunning) {
122+
final boolean result = super.cancel(mayInterruptIfRunning);
123+
if (inner != null) {
124+
inner.cancel(mayInterruptIfRunning);
125+
}
126+
return result;
127+
}
128+
129+
private static <T> BiConsumer<? super T, ? super Throwable> pipeTo(CompletableFuture<T> completableFuture) {
130+
return (value, throwable) -> {
131+
if (completableFuture.isDone()) {
132+
return;
133+
}
134+
135+
if (throwable != null) {
136+
completableFuture.completeExceptionally(throwable);
137+
} else {
138+
completableFuture.complete(value);
139+
}
140+
};
141+
}
142+
}
143+
112144
private boolean shouldRetry(Retryer retryer,
113145
Throwable throwable,
114146
CompletableFuture<Object> resultFuture) {
@@ -136,16 +168,6 @@ private boolean shouldRetry(Retryer retryer,
136168
}
137169
}
138170

139-
private static <T> BiConsumer<? super T, ? super Throwable> pipeTo(CompletableFuture<T> completableFuture) {
140-
return (value, throwable) -> {
141-
if (throwable != null) {
142-
completableFuture.completeExceptionally(throwable);
143-
} else {
144-
completableFuture.complete(value);
145-
}
146-
};
147-
}
148-
149171
private CompletableFuture<Object> executeAndDecode(RequestTemplate template, Options options) {
150172
Request request = targetRequest(template);
151173

core/src/test/java/feign/AsyncFeignTest.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,21 @@
4848
import java.util.Map;
4949
import java.util.NoSuchElementException;
5050
import java.util.concurrent.CompletableFuture;
51+
import java.util.concurrent.CompletionException;
5152
import java.util.concurrent.ExecutionException;
5253
import java.util.concurrent.ExecutorService;
5354
import java.util.concurrent.Executors;
5455
import java.util.concurrent.TimeUnit;
56+
import java.util.concurrent.atomic.AtomicBoolean;
57+
import java.util.concurrent.atomic.AtomicInteger;
5558
import java.util.concurrent.atomic.AtomicReference;
5659
import okhttp3.mockwebserver.MockResponse;
5760
import okhttp3.mockwebserver.MockWebServer;
5861
import okio.Buffer;
5962
import org.junit.Rule;
6063
import org.junit.Test;
64+
import org.junit.jupiter.params.ParameterizedTest;
65+
import org.junit.jupiter.params.provider.ValueSource;
6166
import org.junit.rules.ExpectedException;
6267

6368
public class AsyncFeignTest {
@@ -656,6 +661,53 @@ public void whenReturnTypeIsResponseNoErrorHandling() throws Throwable {
656661
execs.shutdown();
657662
}
658663

664+
@ParameterizedTest
665+
@ValueSource(ints = {1, 5, 10, 100, 1000})
666+
public void cancelRetry(final int expectedTryCount) throws Throwable {
667+
// Arrange
668+
final CompletableFuture<Boolean> maximumTryCompleted = new CompletableFuture<>();
669+
final AtomicInteger actualTryCount = new AtomicInteger();
670+
final AtomicBoolean isCancelled = new AtomicBoolean(true);
671+
672+
final int RUNNING_TIME_MILLIS = 100;
673+
final ExecutorService execs = Executors.newSingleThreadExecutor();
674+
final AsyncClient<Void> clientMock =
675+
(request, options, requestContext) -> CompletableFuture.supplyAsync(() -> {
676+
final int tryCount = actualTryCount.addAndGet(1);
677+
if (tryCount < expectedTryCount) {
678+
throw new CompletionException(new IOException());
679+
}
680+
681+
if (tryCount > expectedTryCount) {
682+
isCancelled.set(false);
683+
throw new CompletionException(new IOException());
684+
}
685+
686+
maximumTryCompleted.complete(true);
687+
try {
688+
Thread.sleep(RUNNING_TIME_MILLIS);
689+
throw new IOException();
690+
} catch (Throwable e) {
691+
throw new CompletionException(e);
692+
}
693+
}, execs);
694+
final TestInterfaceAsync sut = AsyncFeign.<Void>builder()
695+
.client(clientMock)
696+
.retryer(new Retryer.Default(0, Long.MAX_VALUE, expectedTryCount * 2))
697+
.target(TestInterfaceAsync.class, "http://localhost:" + server.getPort());
698+
699+
// Act
700+
final CompletableFuture<String> actual = sut.post();
701+
maximumTryCompleted.join();
702+
actual.cancel(true);
703+
Thread.sleep(RUNNING_TIME_MILLIS * 5);
704+
705+
// Assert
706+
assertThat(actualTryCount.get()).isEqualTo(expectedTryCount);
707+
assertThat(isCancelled.get()).isTrue();
708+
execs.shutdown();
709+
}
710+
659711
private static class MockRetryer implements Retryer {
660712

661713
boolean tripped;

pom.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
<json.version>20220924</json.version>
8787

8888
<junit.version>4.13.2</junit.version>
89+
<junit5.version>5.9.1</junit5.version>
8990
<jackson.version>2.14.0-rc3</jackson.version>
9091
<jackson-databind.version>2.14.0-rc3</jackson-databind.version>
9192
<assertj.version>3.23.1</assertj.version>
@@ -297,6 +298,12 @@
297298
<version>${junit.version}</version>
298299
</dependency>
299300

301+
<dependency>
302+
<groupId>org.junit.jupiter</groupId>
303+
<artifactId>junit-jupiter-params</artifactId>
304+
<version>${junit5.version}</version>
305+
</dependency>
306+
300307
<dependency>
301308
<groupId>org.hamcrest</groupId>
302309
<artifactId>hamcrest</artifactId>
@@ -399,6 +406,12 @@
399406
<scope>test</scope>
400407
</dependency>
401408

409+
<dependency>
410+
<groupId>org.junit.jupiter</groupId>
411+
<artifactId>junit-jupiter-params</artifactId>
412+
<scope>test</scope>
413+
</dependency>
414+
402415
<dependency>
403416
<groupId>org.assertj</groupId>
404417
<artifactId>assertj-core</artifactId>

0 commit comments

Comments
 (0)