Skip to content

Commit d49ca33

Browse files
SimY4kdavisk6
authored andcommitted
Fixes 1003: Do not wrap exceptions in RuntimeException (#1004)
1 parent cab6eed commit d49ca33

File tree

6 files changed

+166
-75
lines changed

6 files changed

+166
-75
lines changed

reactive/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,12 @@
7171
<version>${project.version}</version>
7272
<scope>test</scope>
7373
</dependency>
74+
<dependency>
75+
<groupId>io.projectreactor</groupId>
76+
<artifactId>reactor-test</artifactId>
77+
<version>${reactor.version}</version>
78+
<scope>test</scope>
79+
</dependency>
7480
<dependency>
7581
<groupId>io.github.openfeign</groupId>
7682
<artifactId>feign-okhttp</artifactId>

reactive/src/main/java/feign/reactive/ReactiveInvocationHandler.java

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,16 @@
1313
*/
1414
package feign.reactive;
1515

16-
import feign.FeignException;
1716
import feign.InvocationHandlerFactory.MethodHandler;
1817
import feign.Target;
1918
import java.lang.reflect.InvocationHandler;
2019
import java.lang.reflect.Method;
2120
import java.lang.reflect.Proxy;
22-
import java.lang.reflect.Type;
2321
import java.util.Map;
24-
import java.util.concurrent.Callable;
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
2524
import org.reactivestreams.Publisher;
25+
import org.reactivestreams.Subscription;
2626

2727
public abstract class ReactiveInvocationHandler implements InvocationHandler {
2828

@@ -90,22 +90,50 @@ protected abstract Publisher invoke(Method method,
9090
Object[] arguments);
9191

9292
/**
93-
* Invoke the Method Handler as a Callable.
93+
* Invoke the Method Handler as a Publisher.
9494
*
9595
* @param methodHandler to invoke
9696
* @param arguments for the method
97-
* @return a Callable wrapper for the invocation.
97+
* @return a Publisher wrapper for the invocation.
9898
*/
99-
Callable<?> invokeMethod(MethodHandler methodHandler, Object[] arguments) {
100-
return () -> {
101-
try {
102-
return methodHandler.invoke(arguments);
103-
} catch (Throwable th) {
104-
if (th instanceof FeignException) {
105-
throw (FeignException) th;
99+
Publisher<?> invokeMethod(MethodHandler methodHandler, Object[] arguments) {
100+
return subscriber -> subscriber.onSubscribe(new Subscription() {
101+
private final AtomicBoolean isTerminated = new AtomicBoolean(false);
102+
103+
@Override
104+
public void request(long n) {
105+
if (n <= 0 && !terminated()) {
106+
subscriber.onError(new IllegalArgumentException("negative subscription request"));
106107
}
107-
throw new RuntimeException(th);
108+
if (!isTerminated()) {
109+
try {
110+
Object result = methodHandler.invoke(arguments);
111+
if (null != result) {
112+
subscriber.onNext(result);
113+
}
114+
} catch (Throwable th) {
115+
if (!terminated()) {
116+
subscriber.onError(th);
117+
}
118+
}
119+
}
120+
if (!terminated()) {
121+
subscriber.onComplete();
122+
}
123+
}
124+
125+
@Override
126+
public void cancel() {
127+
isTerminated.set(true);
128+
}
129+
130+
private boolean isTerminated() {
131+
return isTerminated.get();
132+
}
133+
134+
private boolean terminated() {
135+
return isTerminated.getAndSet(true);
108136
}
109-
};
137+
});
110138
}
111139
}

reactive/src/main/java/feign/reactive/ReactorInvocationHandler.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import feign.Target;
1818
import java.lang.reflect.Method;
1919
import java.util.Map;
20-
import java.util.concurrent.Callable;
2120
import org.reactivestreams.Publisher;
2221
import reactor.core.publisher.Flux;
2322
import reactor.core.publisher.Mono;
@@ -32,11 +31,11 @@ public class ReactorInvocationHandler extends ReactiveInvocationHandler {
3231

3332
@Override
3433
protected Publisher invoke(Method method, MethodHandler methodHandler, Object[] arguments) {
35-
Callable<?> invocation = this.invokeMethod(methodHandler, arguments);
34+
Publisher<?> invocation = this.invokeMethod(methodHandler, arguments);
3635
if (Flux.class.isAssignableFrom(method.getReturnType())) {
37-
return Flux.from(Mono.fromCallable(invocation)).subscribeOn(Schedulers.elastic());
36+
return Flux.from(invocation).subscribeOn(Schedulers.elastic());
3837
} else if (Mono.class.isAssignableFrom(method.getReturnType())) {
39-
return Mono.fromCallable(invocation).subscribeOn(Schedulers.elastic());
38+
return Mono.from(invocation).subscribeOn(Schedulers.elastic());
4039
}
4140
throw new IllegalArgumentException(
4241
"Return type " + method.getReturnType().getName() + " is not supported");

reactive/src/main/java/feign/reactive/RxJavaInvocationHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public class RxJavaInvocationHandler extends ReactiveInvocationHandler {
3030

3131
@Override
3232
protected Publisher invoke(Method method, MethodHandler methodHandler, Object[] arguments) {
33-
return Flowable.fromCallable(this.invokeMethod(methodHandler, arguments))
33+
return Flowable.fromPublisher(this.invokeMethod(methodHandler, arguments))
3434
.observeOn(Schedulers.trampoline());
3535
}
3636
}

reactive/src/test/java/feign/reactive/ReactiveFeignIntegrationTest.java

Lines changed: 51 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import static org.mockito.Mockito.verify;
2424
import static org.mockito.Mockito.when;
2525
import feign.Client;
26-
import feign.InvocationHandlerFactory;
2726
import feign.Logger;
2827
import feign.Logger.Level;
2928
import feign.Param;
@@ -38,20 +37,16 @@
3837
import feign.ResponseMapper;
3938
import feign.RetryableException;
4039
import feign.Retryer;
41-
import feign.Target;
4240
import feign.codec.Decoder;
4341
import feign.codec.ErrorDecoder;
4442
import feign.jackson.JacksonDecoder;
4543
import feign.jackson.JacksonEncoder;
4644
import feign.jaxrs.JAXRSContract;
4745
import io.reactivex.Flowable;
48-
import java.lang.reflect.InvocationHandler;
49-
import java.lang.reflect.Method;
5046
import java.lang.reflect.Type;
5147
import java.nio.charset.Charset;
5248
import java.util.Arrays;
5349
import java.util.Collections;
54-
import java.util.Map;
5550
import javax.ws.rs.GET;
5651
import javax.ws.rs.Path;
5752
import okhttp3.mockwebserver.MockResponse;
@@ -63,6 +58,7 @@
6358
import org.mockito.stubbing.Answer;
6459
import reactor.core.publisher.Flux;
6560
import reactor.core.publisher.Mono;
61+
import reactor.test.StepVerifier;
6662

6763
public class ReactiveFeignIntegrationTest {
6864

@@ -100,16 +96,18 @@ public void testReactorTargetFull() throws Exception {
10096
.target(TestReactorService.class, this.getServerUrl());
10197
assertThat(service).isNotNull();
10298

103-
String version = service.version()
104-
.block();
105-
assertThat(version).isNotNull();
99+
StepVerifier.create(service.version())
100+
.expectNext("1.0")
101+
.expectComplete()
102+
.verify();
106103
assertThat(webServer.takeRequest().getPath()).isEqualToIgnoringCase("/version");
107104

108105

109106
/* test encoding and decoding */
110-
User user = service.user("test")
111-
.blockFirst();
112-
assertThat(user).isNotNull().hasFieldOrPropertyWithValue("username", "test");
107+
StepVerifier.create(service.user("test"))
108+
.assertNext(user -> assertThat(user).hasFieldOrPropertyWithValue("username", "test"))
109+
.expectComplete()
110+
.verify();
113111
assertThat(webServer.takeRequest().getPath()).isEqualToIgnoringCase("/users/test");
114112

115113
}
@@ -127,15 +125,17 @@ public void testRxJavaTarget() throws Exception {
127125
.target(TestReactiveXService.class, this.getServerUrl());
128126
assertThat(service).isNotNull();
129127

130-
String version = service.version()
131-
.firstElement().blockingGet();
132-
assertThat(version).isNotNull();
128+
StepVerifier.create(service.version())
129+
.expectNext("1.0")
130+
.expectComplete()
131+
.verify();
133132
assertThat(webServer.takeRequest().getPath()).isEqualToIgnoringCase("/version");
134133

135134
/* test encoding and decoding */
136-
User user = service.user("test")
137-
.firstElement().blockingGet();
138-
assertThat(user).isNotNull().hasFieldOrPropertyWithValue("username", "test");
135+
StepVerifier.create(service.user("test"))
136+
.assertNext(user -> assertThat(user).hasFieldOrPropertyWithValue("username", "test"))
137+
.expectComplete()
138+
.verify();
139139
assertThat(webServer.takeRequest().getPath()).isEqualToIgnoringCase("/users/test");
140140
}
141141

@@ -164,7 +164,10 @@ public void testRequestInterceptor() {
164164
TestReactorService service = ReactorFeign.builder()
165165
.requestInterceptor(mockInterceptor)
166166
.target(TestReactorService.class, this.getServerUrl());
167-
service.version().block();
167+
StepVerifier.create(service.version())
168+
.expectNext("1.0")
169+
.expectComplete()
170+
.verify();
168171
verify(mockInterceptor, times(1)).apply(any(RequestTemplate.class));
169172
}
170173

@@ -176,7 +179,10 @@ public void testRequestInterceptors() {
176179
TestReactorService service = ReactorFeign.builder()
177180
.requestInterceptors(Arrays.asList(mockInterceptor, mockInterceptor))
178181
.target(TestReactorService.class, this.getServerUrl());
179-
service.version().block();
182+
StepVerifier.create(service.version())
183+
.expectNext("1.0")
184+
.expectComplete()
185+
.verify();
180186
verify(mockInterceptor, times(2)).apply(any(RequestTemplate.class));
181187
}
182188

@@ -193,7 +199,10 @@ public void testResponseMappers() throws Exception {
193199
TestReactorService service = ReactorFeign.builder()
194200
.mapAndDecode(responseMapper, decoder)
195201
.target(TestReactorService.class, this.getServerUrl());
196-
service.version().block();
202+
StepVerifier.create(service.version())
203+
.expectNext("1.0")
204+
.expectComplete()
205+
.verify();
197206
verify(responseMapper, times(1))
198207
.map(any(Response.class), any(Type.class));
199208
verify(decoder, times(1)).decode(any(Response.class), any(Type.class));
@@ -208,16 +217,16 @@ public void testQueryMapEncoders() {
208217
TestReactiveXService service = RxJavaFeign.builder()
209218
.queryMapEncoder(encoder)
210219
.target(TestReactiveXService.class, this.getServerUrl());
211-
String results = service.search(new SearchQuery())
212-
.blockingSingle();
213-
assertThat(results).isNotEmpty();
220+
StepVerifier.create(service.search(new SearchQuery()))
221+
.expectNext("No Results Found")
222+
.expectComplete()
223+
.verify();
214224
verify(encoder, times(1)).encode(any(Object.class));
215225
}
216226

217-
@SuppressWarnings({"ResultOfMethodCallIgnored", "ThrowableNotThrown"})
227+
@SuppressWarnings({"ThrowableNotThrown"})
218228
@Test
219229
public void testErrorDecoder() {
220-
this.thrown.expect(RuntimeException.class);
221230
this.webServer.enqueue(new MockResponse().setBody("Bad Request").setResponseCode(400));
222231

223232
ErrorDecoder errorDecoder = mock(ErrorDecoder.class);
@@ -227,8 +236,11 @@ public void testErrorDecoder() {
227236
TestReactiveXService service = RxJavaFeign.builder()
228237
.errorDecoder(errorDecoder)
229238
.target(TestReactiveXService.class, this.getServerUrl());
230-
service.search(new SearchQuery())
231-
.blockingSingle();
239+
StepVerifier.create(service.search(new SearchQuery()))
240+
.expectErrorSatisfies(ex -> assertThat(ex)
241+
.isInstanceOf(IllegalStateException.class)
242+
.hasMessage("bad request"))
243+
.verify();
232244
verify(errorDecoder, times(1)).decode(anyString(), any(Response.class));
233245
}
234246

@@ -243,7 +255,10 @@ public void testRetryer() {
243255
TestReactorService service = ReactorFeign.builder()
244256
.retryer(spy)
245257
.target(TestReactorService.class, this.getServerUrl());
246-
service.version().log().block();
258+
StepVerifier.create(service.version())
259+
.expectNext("1.0")
260+
.expectComplete()
261+
.verify();
247262
verify(spy, times(1)).continueOrPropagate(any(RetryableException.class));
248263
}
249264

@@ -261,7 +276,10 @@ public void testClient() throws Exception {
261276
TestReactorService service = ReactorFeign.builder()
262277
.client(client)
263278
.target(TestReactorService.class, this.getServerUrl());
264-
service.version().block();
279+
StepVerifier.create(service.version())
280+
.expectNext("1.0")
281+
.expectComplete()
282+
.verify();
265283
verify(client, times(1)).execute(any(Request.class), any(Options.class));
266284
}
267285

@@ -272,8 +290,10 @@ public void testDifferentContract() throws Exception {
272290
TestJaxRSReactorService service = ReactorFeign.builder()
273291
.contract(new JAXRSContract())
274292
.target(TestJaxRSReactorService.class, this.getServerUrl());
275-
String version = service.version().block();
276-
assertThat(version).isNotNull();
293+
StepVerifier.create(service.version())
294+
.expectNext("1.0")
295+
.expectComplete()
296+
.verify();
277297
assertThat(webServer.takeRequest().getPath()).isEqualToIgnoringCase("/version");
278298
}
279299

0 commit comments

Comments
 (0)