Skip to content

Commit

Permalink
Support Asynchronous Callbacks with RxJava Integration
Browse files Browse the repository at this point in the history
Netflix#123

- refactored HystrixCommand to use Observable inside the implementation to be non-blocking and callback driven
- observe() and toObservable() public methods added
- HystrixCollapser not yet changed
  • Loading branch information
benjchristensen committed May 10, 2013
1 parent 0fef200 commit c51bd7e
Show file tree
Hide file tree
Showing 6 changed files with 1,145 additions and 754 deletions.
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=http\://services.gradle.org/distributions/gradle-1.1-bin.zip
distributionUrl=http\://services.gradle.org/distributions/gradle-1.3-bin.zip
1 change: 1 addition & 0 deletions hystrix-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ apply plugin: 'idea'

dependencies {
compile 'com.netflix.archaius:archaius-core:0.4.1'
compile 'com.netflix.rxjava:rxjava-core:0.8.2'
compile 'org.slf4j:slf4j-api:1.7.0'
compile 'com.google.code.findbugs:jsr305:2.0.0'
provided 'junit:junit-dep:4.10'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.concurrency.Schedulers;
import rx.subjects.ReplaySubject;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func1;

import com.netflix.hystrix.HystrixCommand.UnitTest.TestHystrixCommand;
import com.netflix.hystrix.exception.HystrixRuntimeException;
import com.netflix.hystrix.strategy.HystrixPlugins;
Expand Down Expand Up @@ -317,26 +326,28 @@ public ResponseType execute() {
public Future<ResponseType> queue() {
RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> collapser = null;

if (Scope.REQUEST == getScope()) {
collapser = getCollapserForUserRequest();
} else if (Scope.GLOBAL == getScope()) {
collapser = getCollapserForGlobalScope();
} else {
logger.warn("Invalid Scope: " + getScope() + " Defaulting to REQUEST scope.");
collapser = getCollapserForUserRequest();
}
/* try from cache first */
if (properties.requestCachingEnabled().get()) {
Future<ResponseType> fromCache = requestCache.get(getCacheKey());
Observable<ResponseType> fromCache = requestCache.get(getCacheKey());
if (fromCache != null) {
/* mark that we received this response from cache */
// TODO Add collapser metrics so we can capture this information
// we can't add it to the command metrics because the command can change each time (dynamic key for example)
// and we don't have access to it when responding from cache
// collapserMetrics.markResponseFromCache();
return fromCache;
return fromCache.toFuture();
}
}

if (Scope.REQUEST == getScope()) {
collapser = getCollapserForUserRequest();
} else if (Scope.GLOBAL == getScope()) {
collapser = getCollapserForGlobalScope();
} else {
logger.warn("Invalid Scope: " + getScope() + " Defaulting to REQUEST scope.");
collapser = getCollapserForUserRequest();
}

Future<ResponseType> response = collapser.submitRequest(getRequestArgument());
if (properties.requestCachingEnabled().get()) {
/*
Expand All @@ -348,11 +359,39 @@ public Future<ResponseType> queue() {
* If this is an issue we can make a lazy-future that gets set in the cache
* then only the winning 'put' will be invoked to actually call 'submitRequest'
*/
requestCache.putIfAbsent(getCacheKey(), response);
requestCache.putIfAbsent(getCacheKey(), Observable.toObservable(response));
}

return response;
}

// public Observable<ResponseType> observe() {
// // us a ReplaySubject to buffer the eagerly subscribed-to Observable
// ReplaySubject<ResponseType> subject = ReplaySubject.create();
// // eagerly kick off subscription
// toObservable().subscribe(subject);
// // return the subject that can be subscribed to later while the execution has already started
// return subject;
// }
//
// public Observable<ResponseType> toObservable(Scheduler scheduler) {
//
// // create an Observable that will lazily execute when subscribed to
// Observable<ResponseType> o = Observable.create(new Func1<Observer<ResponseType>, Subscription>() {
//
// @Override
// public Subscription call(Observer<ResponseType> observer) {
// }
//
// });
//
// return o;
// }
//
// public Observable<ResponseType> toObservable() {
// return toObservable(Schedulers.threadPoolForComputation());
// }

/**
* Static global cache of RequestCollapsers for Scope.GLOBAL
*/
Expand Down
Loading

0 comments on commit c51bd7e

Please sign in to comment.