Skip to content

Commit dd1087a

Browse files
authored
Merge pull request #1524 from dmgcodevil/iss1403
iss1403: Support @HystrixCommand for rx.Single and rx.Completable sim…
2 parents 251e5d9 + e79acc9 commit dd1087a

File tree

7 files changed

+207
-8
lines changed

7 files changed

+207
-8
lines changed

hystrix-contrib/hystrix-javanica/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,8 @@ To perform "Reactive Execution" you should return an instance of `Observable` in
125125
});
126126
}
127127
```
128-
129-
The return type of command method should be `Observable`.
128+
In addition to `Observable` Javanica supports the following RX types: `Single` and `Completable`.
129+
Hystrix core supports only one RX type which is `Observable`, `HystrixObservableCommand` requires to return `Observable` therefore javanica transforms `Single` or `Completable` to `Observable` using `toObservable()` method for appropriate type and before returning the result to caller it translates `Observable` to either `Single` or `Completable` using `toSingle()` or `toCompletable()` correspondingly.
130130

131131
HystrixObservable interface provides two methods: ```observe()``` - eagerly starts execution of the command the same as ``` HystrixCommand#queue()``` and ```HystrixCommand#execute()```; ```toObservable()``` - lazily starts execution of the command only once the Observable is subscribed to. To control this behaviour and swith between two modes ```@HystrixCommand``` provides specific parameter called ```observableExecutionMode```.
132132
```@HystrixCommand(observableExecutionMode = EAGER)``` indicates that ```observe()``` method should be used to execute observable command

hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/aop/aspectj/HystrixCommandAspect.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@
4141
import org.aspectj.lang.annotation.Aspect;
4242
import org.aspectj.lang.annotation.Pointcut;
4343
import org.aspectj.lang.reflect.MethodSignature;
44+
import rx.Completable;
4445
import rx.Observable;
46+
import rx.Single;
4547
import rx.functions.Func1;
4648

4749
import java.lang.reflect.Method;
@@ -110,8 +112,8 @@ public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinP
110112
return result;
111113
}
112114

113-
private Observable executeObservable(HystrixInvokable invokable, ExecutionType executionType, final MetaHolder metaHolder) {
114-
return ((Observable) CommandExecutor.execute(invokable, executionType, metaHolder))
115+
private Object executeObservable(HystrixInvokable invokable, ExecutionType executionType, final MetaHolder metaHolder) {
116+
return mapObservable(((Observable) CommandExecutor.execute(invokable, executionType, metaHolder))
115117
.onErrorResumeNext(new Func1<Throwable, Observable>() {
116118
@Override
117119
public Observable call(Throwable throwable) {
@@ -123,7 +125,16 @@ public Observable call(Throwable throwable) {
123125
}
124126
return Observable.error(throwable);
125127
}
126-
});
128+
}), metaHolder);
129+
}
130+
131+
private Object mapObservable(Observable observable, final MetaHolder metaHolder) {
132+
if (Completable.class.isAssignableFrom(metaHolder.getMethod().getReturnType())) {
133+
return observable.toCompletable();
134+
} else if (Single.class.isAssignableFrom(metaHolder.getMethod().getReturnType())) {
135+
return observable.toSingle();
136+
}
137+
return observable;
127138
}
128139

129140
private Throwable hystrixRuntimeExceptionToThrowable(MetaHolder metaHolder, HystrixRuntimeException e) {

hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/command/ExecutionType.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,12 @@
1515
*/
1616
package com.netflix.hystrix.contrib.javanica.command;
1717

18+
import com.google.common.collect.ImmutableSet;
19+
import rx.Completable;
1820
import rx.Observable;
21+
import rx.Single;
1922

23+
import java.util.Set;
2024
import java.util.concurrent.Future;
2125

2226
/**
@@ -39,6 +43,9 @@ public enum ExecutionType {
3943
*/
4044
OBSERVABLE;
4145

46+
// RX types
47+
private static final Set<? extends Class> RX_TYPES = ImmutableSet.of(Observable.class, Single.class, Completable.class);
48+
4249
/**
4350
* Gets execution type for specified class type.
4451
* @param type the type
@@ -47,11 +54,19 @@ public enum ExecutionType {
4754
public static ExecutionType getExecutionType(Class<?> type) {
4855
if (Future.class.isAssignableFrom(type)) {
4956
return ExecutionType.ASYNCHRONOUS;
50-
} else if (Observable.class.isAssignableFrom(type)) {
57+
} else if (isRxType(type)) {
5158
return ExecutionType.OBSERVABLE;
5259
} else {
5360
return ExecutionType.SYNCHRONOUS;
5461
}
5562
}
5663

64+
private static boolean isRxType(Class<?> cl) {
65+
for (Class<?> rxType : RX_TYPES) {
66+
if (rxType.isAssignableFrom(cl)) {
67+
return true;
68+
}
69+
}
70+
return false;
71+
}
5772
}

hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/command/GenericObservableCommand.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828
import com.netflix.hystrix.exception.HystrixBadRequestException;
2929
import org.slf4j.Logger;
3030
import org.slf4j.LoggerFactory;
31+
import rx.Completable;
3132
import rx.Observable;
33+
import rx.Single;
3234
import rx.functions.Func1;
3335

3436
import javax.annotation.concurrent.ThreadSafe;
@@ -67,7 +69,8 @@ public GenericObservableCommand(HystrixCommandBuilder builder) {
6769
protected Observable construct() {
6870
Observable result;
6971
try {
70-
result = ((Observable) commandActions.getCommandAction().execute(executionType))
72+
Observable observable = toObservable(commandActions.getCommandAction().execute(executionType));
73+
result = observable
7174
.onErrorResumeNext(new Func1<Throwable, Observable>() {
7275
@Override
7376
public Observable call(Throwable throwable) {
@@ -105,6 +108,10 @@ protected Observable resumeWithFallback() {
105108
Object res = commandActions.getFallbackAction().executeWithArgs(executionType, args);
106109
if (res instanceof Observable) {
107110
return (Observable) res;
111+
} else if (res instanceof Single) {
112+
return ((Single) res).toObservable();
113+
} else if (res instanceof Completable) {
114+
return ((Completable) res).toObservable();
108115
} else {
109116
return Observable.just(res);
110117
}
@@ -157,4 +164,16 @@ boolean isIgnorable(Throwable throwable) {
157164
}
158165
return false;
159166
}
167+
168+
private Observable toObservable(Object obj) {
169+
if (Observable.class.isAssignableFrom(obj.getClass())) {
170+
return (Observable) obj;
171+
} else if (Completable.class.isAssignableFrom(obj.getClass())) {
172+
return ((Completable) obj).toObservable();
173+
} else if (Single.class.isAssignableFrom(obj.getClass())) {
174+
return ((Single) obj).toObservable();
175+
} else {
176+
throw new IllegalStateException("unsupported rx type: " + obj.getClass());
177+
}
178+
}
160179
}

hystrix-contrib/hystrix-javanica/src/main/java/com/netflix/hystrix/contrib/javanica/utils/FallbackMethod.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.netflix.hystrix.contrib.javanica.exception.FallbackDefinitionException;
2525
import org.apache.commons.lang3.StringUtils;
2626
import org.apache.commons.lang3.Validate;
27+
import rx.Completable;
2728

2829
import javax.annotation.Nonnull;
2930
import javax.annotation.Nullable;
@@ -97,6 +98,13 @@ public void validateReturnType(Method commandMethod) throws FallbackDefinitionEx
9798
if (ExecutionType.OBSERVABLE == ExecutionType.getExecutionType(commandReturnType)) {
9899
if (ExecutionType.OBSERVABLE != getExecutionType()) {
99100
Type commandParametrizedType = commandMethod.getGenericReturnType();
101+
102+
// basically any object can be wrapped into Completable, Completable itself ins't parametrized
103+
if(Completable.class.isAssignableFrom(commandMethod.getReturnType())) {
104+
validateCompletableReturnType(commandMethod, method.getReturnType());
105+
return;
106+
}
107+
100108
if (isReturnTypeParametrized(commandMethod)) {
101109
commandParametrizedType = getFirstParametrizedType(commandMethod);
102110
}
@@ -142,6 +150,13 @@ private Type getFirstParametrizedType(Method m) {
142150
return null;
143151
}
144152

153+
// everything can be wrapped into completable except 'void'
154+
private void validateCompletableReturnType(Method commandMethod, Class<?> callbackReturnType) {
155+
if (Void.TYPE == callbackReturnType) {
156+
throw new FallbackDefinitionException(createErrorMsg(commandMethod, method, "fallback cannot return 'void' if command return type is " + Completable.class.getSimpleName()));
157+
}
158+
}
159+
145160
private void validateReturnType(Method commandMethod, Method fallbackMethod) {
146161
if (isGenericReturnType(commandMethod)) {
147162
List<Type> commandParametrizedTypes = flattenTypeVariables(commandMethod.getGenericReturnType());

hystrix-contrib/hystrix-javanica/src/test/java/com/netflix/hystrix/contrib/javanica/test/common/fallback/BasicGenericFallbackTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.slf4j.helpers.MessageFormatter;
1515
import org.springframework.test.context.junit4.rules.SpringClassRule;
1616
import org.springframework.test.context.junit4.rules.SpringMethodRule;
17+
import rx.Completable;
1718

1819
import java.io.Serializable;
1920
import java.lang.reflect.InvocationTargetException;
@@ -64,6 +65,7 @@ public Object[] methodGenericDefinitionFailure() {
6465
new Object[]{MethodGenericDefinitionFailureCase8.class},
6566
new Object[]{MethodGenericDefinitionFailureCase9.class},
6667
new Object[]{MethodGenericDefinitionFailureCase10.class},
68+
new Object[]{MethodGenericDefinitionFailureCase11.class},
6769

6870
};
6971
}
@@ -247,6 +249,12 @@ public static class MethodGenericDefinitionFailureCase10 {
247249
private GenericEntity<? super Comparable> fallback() { return null; }
248250
}
249251

252+
public static class MethodGenericDefinitionFailureCase11 {
253+
@HystrixCommand(fallbackMethod = "fallback")
254+
public Completable command() { throw new IllegalStateException(); }
255+
private void fallback() { return; }
256+
}
257+
250258
/* ====================================================================== */
251259
/* ===================== GENERIC CLASS DEFINITIONS =====+================ */
252260
/* =========================== SUCCESS CASES ============================ */

hystrix-contrib/hystrix-javanica/src/test/java/com/netflix/hystrix/contrib/javanica/test/common/observable/BasicObservableTest.java

Lines changed: 132 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,13 @@
2424
import org.apache.commons.lang3.StringUtils;
2525
import org.junit.Before;
2626
import org.junit.Test;
27+
import rx.Completable;
2728
import rx.Observable;
2829
import rx.Observer;
30+
import rx.Single;
2931
import rx.Subscriber;
3032
import rx.functions.Action1;
31-
import rx.subjects.ReplaySubject;
33+
import rx.functions.Func0;
3234

3335
import static com.netflix.hystrix.contrib.javanica.test.common.CommonUtils.getHystrixCommandByKey;
3436
import static org.junit.Assert.assertEquals;
@@ -90,6 +92,82 @@ public void call(User user) {
9092
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.SUCCESS));
9193
}
9294

95+
@Test
96+
public void testGetCompletableUser(){
97+
userService.getCompletableUser("1", "name: ");
98+
com.netflix.hystrix.HystrixInvokableInfo getUserCommand = getHystrixCommandByKey("getCompletableUser");
99+
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.SUCCESS));
100+
}
101+
102+
@Test
103+
public void testGetCompletableUserWithRegularFallback() {
104+
Completable completable = userService.getCompletableUserWithRegularFallback(null, "name: ");
105+
completable.<User>toObservable().subscribe(new Action1<User>() {
106+
@Override
107+
public void call(User user) {
108+
assertEquals("default_id", user.getId());
109+
}
110+
});
111+
com.netflix.hystrix.HystrixInvokableInfo getUserCommand = getHystrixCommandByKey("getCompletableUserWithRegularFallback");
112+
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.FAILURE));
113+
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.FALLBACK_SUCCESS));
114+
}
115+
116+
@Test
117+
public void testGetCompletableUserWithRxFallback() {
118+
Completable completable = userService.getCompletableUserWithRxFallback(null, "name: ");
119+
completable.<User>toObservable().subscribe(new Action1<User>() {
120+
@Override
121+
public void call(User user) {
122+
assertEquals("default_id", user.getId());
123+
}
124+
});
125+
com.netflix.hystrix.HystrixInvokableInfo getUserCommand = getHystrixCommandByKey("getCompletableUserWithRxFallback");
126+
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.FAILURE));
127+
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.FALLBACK_SUCCESS));
128+
}
129+
130+
@Test
131+
public void testGetSingleUser() {
132+
final String id = "1";
133+
Single<User> user = userService.getSingleUser(id, "name: ");
134+
user.subscribe(new Action1<User>() {
135+
@Override
136+
public void call(User user) {
137+
assertEquals(id, user.getId());
138+
}
139+
});
140+
com.netflix.hystrix.HystrixInvokableInfo getUserCommand = getHystrixCommandByKey("getSingleUser");
141+
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.SUCCESS));
142+
}
143+
144+
@Test
145+
public void testGetSingleUserWithRegularFallback(){
146+
Single<User> user = userService.getSingleUserWithRegularFallback(null, "name: ");
147+
user.subscribe(new Action1<User>() {
148+
@Override
149+
public void call(User user) {
150+
assertEquals("default_id", user.getId());
151+
}
152+
});
153+
com.netflix.hystrix.HystrixInvokableInfo getUserCommand = getHystrixCommandByKey("getSingleUserWithRegularFallback");
154+
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.FAILURE));
155+
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.FALLBACK_SUCCESS));
156+
}
157+
158+
@Test
159+
public void testGetSingleUserWithRxFallback(){
160+
Single<User> user = userService.getSingleUserWithRxFallback(null, "name: ");
161+
user.subscribe(new Action1<User>() {
162+
@Override
163+
public void call(User user) {
164+
assertEquals("default_id", user.getId());
165+
}
166+
});
167+
com.netflix.hystrix.HystrixInvokableInfo getUserCommand = getHystrixCommandByKey("getSingleUserWithRxFallback");
168+
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.FAILURE));
169+
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.FALLBACK_SUCCESS));
170+
}
93171

94172
@Test
95173
public void testGetUserWithRegularFallback() {
@@ -163,6 +241,59 @@ public Observable<User> getUser(final String id, final String name) {
163241
return createObservable(id, name);
164242
}
165243

244+
@HystrixCommand
245+
public Completable getCompletableUser(final String id, final String name) {
246+
validate(id, name, "getCompletableUser has failed");
247+
return createObservable(id, name).toCompletable();
248+
}
249+
250+
@HystrixCommand(fallbackMethod = "completableUserRegularFallback")
251+
public Completable getCompletableUserWithRegularFallback(final String id, final String name) {
252+
return getCompletableUser(id, name);
253+
}
254+
255+
@HystrixCommand(fallbackMethod = "completableUserRxFallback")
256+
public Completable getCompletableUserWithRxFallback(final String id, final String name) {
257+
return getCompletableUser(id, name);
258+
}
259+
260+
public User completableUserRegularFallback(final String id, final String name) {
261+
return new User("default_id", "default_name");
262+
}
263+
264+
public Completable completableUserRxFallback(final String id, final String name) {
265+
return Completable.fromCallable(new Func0<User>() {
266+
@Override
267+
public User call() {
268+
return new User("default_id", "default_name");
269+
}
270+
});
271+
}
272+
273+
@HystrixCommand
274+
public Single<User> getSingleUser(final String id, final String name) {
275+
validate(id, name, "getSingleUser has failed");
276+
return createObservable(id, name).toSingle();
277+
}
278+
279+
@HystrixCommand(fallbackMethod = "singleUserRegularFallback")
280+
public Single<User> getSingleUserWithRegularFallback(final String id, final String name) {
281+
return getSingleUser(id, name);
282+
}
283+
284+
@HystrixCommand(fallbackMethod = "singleUserRxFallback")
285+
public Single<User> getSingleUserWithRxFallback(final String id, final String name) {
286+
return getSingleUser(id, name);
287+
}
288+
289+
User singleUserRegularFallback(final String id, final String name) {
290+
return new User("default_id", "default_name");
291+
}
292+
293+
Single<User> singleUserRxFallback(final String id, final String name) {
294+
return createObservable("default_id", "default_name").toSingle();
295+
}
296+
166297
@HystrixCommand(fallbackMethod = "regularFallback", observableExecutionMode = ObservableExecutionMode.LAZY)
167298
public Observable<User> getUserRegularFallback(final String id, final String name) {
168299
validate(id, name, "getUser has failed");

0 commit comments

Comments
 (0)