-
Notifications
You must be signed in to change notification settings - Fork 4.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Asynchronous Callbacks with RxJava Integration #123
Comments
This begs the question of when |
Here's one way of doing it: observableOfIterable.mapMany({ iterable -> Observable.from(iterable)}); Full context to show where that statement fits: // simulate an Observable of Iterable<String>
ArrayList<String> s = new ArrayList<String>();
s.add("one")
s.add("two")
s.add("three")
Observable<Iterable<String>> o = Observable.from(s);
// transform it into Observable<String>
Observable<String> os = o.mapMany({ iterable -> Observable.from(iterable)});
os.subscribe({stringValue -> println(stringValue)}) If this was being done with a HystrixCommand it could look like this: Observable<Iterable<Data>> o = new HystrixCommandToFetchListOfData(args);
Observable<Data> d = o.mapMany({ iterable -> Observable.from(iterable)});
d.subscribe({data -> println(data)}) |
The idea would be to be able to consume the items as they are produced, not to have to collect the entire iteration before consuming any of it. This would require a streaming counterpart to |
Agreed. I too want a progressive/streaming model like that and it will likely evolve out of work on this issue and #11. |
+1 (subscribing) |
Schedulers (ReactiveX/RxJava#19) are almost ready which will unblock progression on this. |
I am working on this at https://github.com/benjchristensen/Hystrix/tree/observe-rxjava Unit tests are passing for changing HystrixCommand, now I'm doing performance testing and will put it through canary and production traffic at Netflix before merging this into the main project and releasing since it is non-trival change. Once I'm happy with that then I'll do HystrixCollapser, go through similar testing and then release. |
Is this still on the roadmap? |
Yes, in fact I got all unit tests passing on it today and am canary |
I think I've found and fixed the bug. I have version 1.3 running in production canaries again. |
Version 1.3 was released with this: https://github.com/Netflix/Hystrix/releases/tag/1.3.0 It's been running in Netflix production for a couple weeks now. |
Netflix#123 - refactored HystrixCommand to use Observable inside the implementation to be non-blocking and callback driven - observe() and toObservable() public methods added - HystrixCollapser not yet changed
- found a concurrency bug while working on Netflix/Hystrix#123 - the following code would lock up occasionally due to onCompleted not being delivered: ```java public class RunTest { public static void main(String[] args) { System.out.println("Starting test..."); final ArrayList<String> strings = new ArrayList<String>(200000); int num = 10000; while (true) { long start = System.currentTimeMillis(); final AtomicInteger count = new AtomicInteger(); for (int i = 0; i < num; i++) { new TestService1(2, 5).toObservable().forEach(new Action1<Integer>() { @OverRide public void call(Integer v) { count.addAndGet(v); } }); new TestService2("hello").toObservable().forEach(new Action1<String>() { @OverRide public void call(String v) { strings.add(v); } }); } long time = (System.currentTimeMillis() - start); long executions = num * 2; System.out.println("Time: " + time + "ms for " + executions + " executions (" + (time * 1000) / executions + " microseconds)"); System.out.println(" Count: " + count); System.out.println(" Strings: " + strings.size()); strings.clear(); } } } ``` - Also made OperationObserveOn not use ScheduledObserver if the `ImmediateScheduler` is chosen to allow an optimization. I believe this optimization is safe because ScheduledObserver does not require knowledge of a Scheduler (such as for now()) and all we do is emit data to the Observer on a scheduler and if we know it's Immediate we can go direct and skip the enqueuing step. This allows shaving off a noticable number of microseconds per execution in the loop above.
Add an observe() method that returns an Observable (https://github.com/Netflix/RxJava) to allow callbacks and composition.
This will be in addition to the existing execute() and queue() methods.
Thus the 3 invocation methods will be:
(This has been planned for a while - just documenting it. I had hoped to have this in when RxJava was released but have had other competing priorities. It is near the top of my list.)
The text was updated successfully, but these errors were encountered: