Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Asynchronous Callbacks with RxJava Integration #123

Closed
benjchristensen opened this issue Mar 4, 2013 · 11 comments
Closed

Support Asynchronous Callbacks with RxJava Integration #123

benjchristensen opened this issue Mar 4, 2013 · 11 comments
Milestone

Comments

@benjchristensen
Copy link
Contributor

Add an observe() method that returns an Observable (https://github.com/Netflix/RxJava) to allow callbacks and composition.

This will be in addition to the existing execute() and queue() methods.

Thus the 3 invocation methods will be:

T execute()
Future<T> queue()
Observable<T> observe()

(This has been planned for a while - just documenting it. I had hoped to have this in when RxJava was released but have had other competing priorities. It is near the top of my list.)

@ghost ghost assigned benjchristensen Mar 4, 2013
@johngmyers
Copy link

This begs the question of when T is an Iterable<A> how one could get an Observable<A>.

@benjchristensen
Copy link
Contributor Author

Here's one way of doing it:

observableOfIterable.mapMany({ iterable -> Observable.from(iterable)});

Full context to show where that statement fits:

// simulate an Observable of Iterable<String>
ArrayList<String> s = new ArrayList<String>();
s.add("one")
s.add("two")
s.add("three")
Observable<Iterable<String>> o = Observable.from(s);

// transform it into Observable<String>
Observable<String> os = o.mapMany({ iterable -> Observable.from(iterable)});
os.subscribe({stringValue -> println(stringValue)})

If this was being done with a HystrixCommand it could look like this:

Observable<Iterable<Data>> o = new HystrixCommandToFetchListOfData(args);
Observable<Data> d = o.mapMany({ iterable -> Observable.from(iterable)});
d.subscribe({data -> println(data)})

@johngmyers
Copy link

The idea would be to be able to consume the items as they are produced, not to have to collect the entire iteration before consuming any of it. This would require a streaming counterpart to HystrixCommand with a different interface. This is a separate issue with lower priority.

@benjchristensen
Copy link
Contributor Author

Agreed. I too want a progressive/streaming model like that and it will likely evolve out of work on this issue and #11.

@rore
Copy link

rore commented Mar 14, 2013

+1 (subscribing)

@benjchristensen
Copy link
Contributor Author

Schedulers (ReactiveX/RxJava#19) are almost ready which will unblock progression on this.

@benjchristensen
Copy link
Contributor Author

I am working on this at https://github.com/benjchristensen/Hystrix/tree/observe-rxjava

Unit tests are passing for changing HystrixCommand, now I'm doing performance testing and will put it through canary and production traffic at Netflix before merging this into the main project and releasing since it is non-trival change. Once I'm happy with that then I'll do HystrixCollapser, go through similar testing and then release.

@samhendley
Copy link

Is this still on the roadmap?

@benjchristensen
Copy link
Contributor Author

Yes, in fact I got all unit tests passing on it today and am canary
testing in our Netflix environments. I found a bug I'm trying to track
down.

@benjchristensen
Copy link
Contributor Author

I think I've found and fixed the bug. I have version 1.3 running in production canaries again.

@benjchristensen
Copy link
Contributor Author

Version 1.3 was released with this: https://github.com/Netflix/Hystrix/releases/tag/1.3.0

It's been running in Netflix production for a couple weeks now.

neerajrj pushed a commit to neerajrj/Hystrix that referenced this issue Nov 2, 2013
Netflix#123

- refactored HystrixCommand to use Observable inside the implementation to be non-blocking and callback driven
- observe() and toObservable() public methods added
- HystrixCollapser not yet changed
rickbw pushed a commit to rickbw/RxJava that referenced this issue Jan 9, 2014
- found a concurrency bug while working on Netflix/Hystrix#123
- the following code would lock up occasionally due to onCompleted not being delivered:

```java
public class RunTest {
    public static void main(String[] args) {
        System.out.println("Starting test...");

        final ArrayList<String> strings = new ArrayList<String>(200000);

        int num = 10000;
        while (true) {
            long start = System.currentTimeMillis();
            final AtomicInteger count = new AtomicInteger();
            for (int i = 0; i < num; i++) {
                new TestService1(2, 5).toObservable().forEach(new Action1<Integer>() {

                    @OverRide
                    public void call(Integer v) {
                        count.addAndGet(v);
                    }
                });

                new TestService2("hello").toObservable().forEach(new Action1<String>() {

                    @OverRide
                    public void call(String v) {
                        strings.add(v);
                    }

                });
            }
            long time = (System.currentTimeMillis() - start);
            long executions = num * 2;
            System.out.println("Time: " + time + "ms for " + executions + " executions (" + (time * 1000) / executions + " microseconds)");
            System.out.println("   Count: " + count);
            System.out.println("   Strings: " + strings.size());
            strings.clear();
        }
    }
}
```

- Also made OperationObserveOn not use ScheduledObserver if the `ImmediateScheduler` is chosen to allow an optimization. I believe this optimization is safe because ScheduledObserver does not require knowledge of a Scheduler (such as for now()) and all we do is emit data to the Observer on a scheduler and if we know it's Immediate we can go direct and skip the enqueuing step. This allows shaving off a noticable number of microseconds per execution in the loop above.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants