Skip to content

Commit bc52c00

Browse files
committed
Cleanup of RxRatpack API.
- Rename asPromise* methods to promise* - Rename forkAndJoin to bindExec - Rename forkOnNext to forkEach Cleaned up implementations, by making ExecControl implicit and orienting API towards Observable.compose().
1 parent 3984b17 commit bc52c00

File tree

6 files changed

+115
-117
lines changed

6 files changed

+115
-117
lines changed

ratpack-core/src/main/java/ratpack/util/ExceptionUtils.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import ratpack.func.Action;
2121
import ratpack.func.BiAction;
2222
import ratpack.func.Factory;
23+
import ratpack.func.NoArgAction;
2324

2425
/**
2526
* Utility methods for dealing with exceptions.
@@ -100,6 +101,14 @@ public static <T> T uncheck(Factory<T> factory) {
100101
}
101102
}
102103

104+
public static void uncheck(NoArgAction action) {
105+
try {
106+
action.execute();
107+
} catch (Exception e) {
108+
throw uncheck(e);
109+
}
110+
}
111+
103112
/**
104113
* Executes the given action with the provided input argument, unchecking any exceptions it throws.
105114
* <p>

ratpack-manual/src/content/chapters/34-hystrix.md

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,30 +23,27 @@ want to use these features however, then your application should depend on the `
2323

2424
Hystrix provides three types of Command execution, [synchronous](https://github.com/Netflix/Hystrix/wiki/How-To-Use#Synchronous-Execution), [asynchronous](https://github.com/Netflix/Hystrix/wiki/How-To-Use#asynchronous-execution) and [reactive](https://github.com/Netflix/Hystrix/wiki/How-To-Use#reactive-execution).
2525
Out of the three only reactive is actually non-blocking. Synchronous and asynchronous command execution will work with Ratpack, as is demonstrated in the [integration tests](https://github.com/ratpack/ratpack/blob/master/ratpack-hystrix/src/test/groovy/ratpack/hystrix/HystrixRequestCachingSpec.groovy#L128),
26-
but for maximum performance only reactive execution is recommended. If you do not wish to use Observables however, then you can convert them to Ratpack Promises instead using [RxRatpack#asPromise](api/ratpack/rx/RxRatpack.html#asPromise-rx.Observable-)
27-
or [RxRatpack#asPromiseSingle](api/ratpack/rx/RxRatpack.html#asPromiseSingle-rx.Observable-).
26+
but for maximum performance only reactive execution is recommended. If you do not wish to use Observables however, then you can convert them to Ratpack Promises instead using [RxRatpack#promise](api/ratpack/rx/RxRatpack.html#promise-rx.Observable-)
27+
or [RxRatpack#promiseSingle](api/ratpack/rx/RxRatpack.html#promiseSingle-rx.Observable-).
2828

2929
```language-java
3030
import com.netflix.hystrix.HystrixCommandGroupKey;
3131
import com.netflix.hystrix.HystrixObservableCommand;
3232
import ratpack.exec.Promise;
33+
import ratpack.rx.RxRatpack;
3334
import rx.Observable;
34-
35-
import static ratpack.rx.RxRatpack.asPromiseSingle;
36-
3735
public class CommandFactory {
3836
3937
public static Promise<String> fooCommand() {
4038
Observable<String> command = new HystrixObservableCommand<String>(HystrixCommandGroupKey.Factory.asKey("foo-command")) {
4139
@Override
42-
protected rx.Observable<String> run() {
43-
return rx.Observable.just("foo");
40+
protected Observable<String> run() {
41+
return Observable.just("foo");
4442
}
4543
}.toObservable();
4644
47-
return asPromiseSingle(command);
45+
return RxRatpack.promiseSingle(command);
4846
}
49-
5047
}
5148
```
5249

ratpack-rx/src/main/java/ratpack/rx/RxRatpack.java

Lines changed: 73 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
* When using observables for asynchronous actions, it is generally required to wrap promises created by an {@link ExecControl} in order to integrate with Ratpack's execution model.
5050
* This typically means using {@link ExecControl#promise(Action)} or {@link ExecControl#blocking(java.util.concurrent.Callable)} to initiate the operation and then wrapping with {@link #observe(Promise)} or similar.
5151
* <p>
52-
* To test observable based services that use Ratpack's execution semantics, use the {@code ExecHarness} and convert the observable back to a promise with {@link #asPromise(Observable)}.
52+
* To test observable based services that use Ratpack's execution semantics, use the {@code ExecHarness} and convert the observable back to a promise with {@link #promise(Observable)}.
5353
* <p>
5454
* The methods in this class are also provided as <a href="http://docs.groovy-lang.org/latest/html/documentation/#_extension_modules">Groovy Extensions</a>.
5555
* When using Groovy, each static method in this class is able to act as an instance-level method against the {@link Observable} type.
@@ -124,39 +124,47 @@ public static Scheduler scheduler(final ExecController execController) {
124124
}
125125

126126
/**
127-
* Forks the current execution in order to subscribe to the given source, then joining the original execution with the source values.
127+
* Binds the given observable to the current execution, allowing integration of third-party asynchronous observables with Ratpack's execution model.
128128
* <p>
129-
* This method supports parallelism in the observable stream.
129+
* This method is useful when you want to consume an asynchronous observable within a Ratpack execution, as an observable.
130+
* It is just a combination of {@link #promise(Observable)} and {@link #observeEach(Promise)}.
131+
* <p>
132+
* <pre class="java">{@code
133+
* import rx.Observable;
134+
* import ratpack.test.exec.ExecHarness;
135+
* import ratpack.rx.RxRatpack;
136+
* import java.util.Arrays;
137+
* import java.util.List;
138+
* import static org.junit.Assert.*;
139+
*
140+
* public class Example {
141+
* public static void main(String... args) throws Exception {
142+
* Observable<String> asyncObservable = Observable.create(subscriber ->
143+
* new Thread(() -> {
144+
* subscriber.onNext("foo");
145+
* subscriber.onNext("bar");
146+
* subscriber.onCompleted();
147+
* }).start()
148+
* );
149+
*
150+
* List<String> strings = ExecHarness.yieldSingle(e ->
151+
* RxRatpack.promise(asyncObservable.compose(RxRatpack::bindExec))
152+
* ).getValue();
153+
*
154+
* assertEquals(Arrays.asList("foo", "bar"), strings);
155+
* }
156+
* }
157+
* }</pre>
130158
* <p>
131-
* This method uses {@link rx.Observable#toList()} on the given source to collect all values before returning control to the original execution.
132-
* As such, {@code source} should not be an infinite or extremely large stream.
133159
*
134-
* @param execControl the execution control
135160
* @param source the observable source
136161
* @param <T> the type of item observed
137162
* @return an observable stream equivalent to the given source
163+
* @see #observeEach(Promise)
164+
* @see #promise(Observable)
138165
*/
139-
public static <T> Observable<T> forkAndJoin(final ExecControl execControl, final Observable<T> source) {
140-
Promise<List<T>> promise = execControl.promise(fulfiller -> execControl.exec().start(execution -> source
141-
.toList()
142-
.subscribe(new Subscriber<List<T>>() {
143-
@Override
144-
public void onCompleted() {
145-
146-
}
147-
148-
@Override
149-
public void onError(Throwable e) {
150-
fulfiller.error(e);
151-
}
152-
153-
@Override
154-
public void onNext(List<T> ts) {
155-
fulfiller.success(ts);
156-
}
157-
})));
158-
159-
return observeEach(promise);
166+
public static <T> Observable<T> bindExec(Observable<T> source) {
167+
return ExceptionUtils.uncheck(() -> promise(source).to(RxRatpack::observeEach));
160168
}
161169

162170
/**
@@ -173,7 +181,7 @@ public void onNext(List<T> ts) {
173181
* public static String value;
174182
* public static void main(String... args) throws Exception {
175183
* ExecHarness.runSingle(e ->
176-
* e.blocking(() -> "hello world")
184+
* e.promiseOf("hello world")
177185
* .to(RxRatpack::observe)
178186
* .map(String::toUpperCase)
179187
* .subscribe(s -> value = s)
@@ -200,10 +208,10 @@ public static <T> Observable<T> observe(Promise<T> promise) {
200208
* import ratpack.test.exec.ExecHarness;
201209
* import ratpack.exec.ExecResult;
202210
* import rx.Observable;
211+
* import ratpack.rx.RxRatpack;
203212
*
204213
* import java.util.List;
205214
*
206-
* import static ratpack.rx.RxRatpack.asPromise;
207215
*
208216
* import static org.junit.Assert.assertEquals;
209217
*
@@ -224,7 +232,7 @@ public static <T> Observable<T> observe(Promise<T> promise) {
224232
* final AsyncService service = new AsyncService();
225233
*
226234
* // exercise the async code using the harness, blocking until the promised value is available
227-
* ExecResult<List<String>> result = ExecHarness.yieldSingle(execution -> asPromise(service.observe("foo")));
235+
* ExecResult<List<String>> result = ExecHarness.yieldSingle(execution -> RxRatpack.promise(service.observe("foo")));
228236
*
229237
* List<String> results = result.getValue();
230238
* assertEquals(1, results.size());
@@ -236,16 +244,16 @@ public static <T> Observable<T> observe(Promise<T> promise) {
236244
* @param observable the observable
237245
* @param <T> the type of the value observed
238246
* @return a promise that returns all values from the observable
239-
* @see #asPromiseSingle(Observable)
247+
* @see #promiseSingle(Observable)
240248
*/
241-
public static <T> Promise<List<T>> asPromise(Observable<T> observable) {
249+
public static <T> Promise<List<T>> promise(Observable<T> observable) {
242250
return ExecControl.current().promise(f -> observable.toList().subscribe(f::success, f::error));
243251
}
244252

245253
/**
246254
* Convenience for converting an {@link Observable} to a {@link Promise} when it is known that the observable sequence is of zero or one elements.
247255
* <p>
248-
* Has the same behavior as {@link #asPromise(Observable)}, except that the list representation of the sequence is “unpacked”.
256+
* Has the same behavior as {@link #promise(Observable)}, except that the list representation of the sequence is “unpacked”.
249257
* <p>
250258
* If the observable sequence produces no elements, the promised value will be {@code null}.
251259
* If the observable sequence produces one element, the promised value will be that element.
@@ -254,9 +262,9 @@ public static <T> Promise<List<T>> asPromise(Observable<T> observable) {
254262
* @param observable the observable
255263
* @param <T> the type of the value observed
256264
* @return a promise that returns the sole value from the observable
257-
* @see #asPromise(Observable)
265+
* @see #promise(Observable)
258266
*/
259-
public static <T> Promise<T> asPromiseSingle(Observable<T> observable) {
267+
public static <T> Promise<T> promiseSingle(Observable<T> observable) {
260268
return ExecControl.current().promise(f -> observable.single().subscribe(f::success, f::error));
261269
}
262270

@@ -306,77 +314,57 @@ public static <T, I extends Iterable<T>> Observable<T> observeEach(Promise<I> pr
306314
}
307315

308316
/**
309-
* Alternative method for forking the execution to process each observable element.
310-
* <p>
311-
* This method is alternative to {@link #forkOnNext(ExecControl)} and is functionally equivalent.
312-
*
313-
* @param execControl the execution control to use to fork executions
314-
* @param observable the observable sequence to process each element of in a forked execution
315-
* @param <T> the element type
316-
* @return an observable
317-
*/
318-
public static <T> Observable<T> forkOnNext(ExecControl execControl, Observable<T> observable) {
319-
return observable.lift(RxRatpack.<T>forkOnNext(execControl));
320-
}
321-
322-
/**
323-
* An operator to parallelize an observable stream by forking a new execution for each omitted item.
324-
* This allows downstream processing to occur in concurrent executions.
325-
* <p>
326-
* To be used with the {@link Observable#lift(Observable.Operator)} method.
317+
* Parallelize an observable by creating a new Ratpack execution for each element.
327318
* <p>
328-
* The {@code onCompleted()} or {@code onError()} downstream methods are guaranteed to be called <strong>after</strong> the last item has been given to the downstream {@code onNext()} method.
329-
* That is, the last invocation of the downstream {@code onNext()} will have returned before {@code onCompleted()} or {@code onError()} are invoked.
330-
* <p>
331-
* This is generally a more performant alternative to using plain Rx parallelization due to Ratpack's {@link ratpack.exec.Execution} semantics and use of Netty's event loop to schedule work.
332319
* <pre class="java">{@code
333320
* import ratpack.rx.RxRatpack;
321+
* import ratpack.util.ExceptionUtils;
334322
* import ratpack.test.exec.ExecHarness;
335323
*
336324
* import rx.Observable;
337325
*
338326
* import java.util.List;
339327
* import java.util.Arrays;
328+
* import java.util.LinkedList;
329+
* import java.util.Collection;
330+
* import java.util.Collections;
340331
* import java.util.concurrent.CyclicBarrier;
341-
* import java.util.concurrent.BrokenBarrierException;
342332
*
343333
* import static org.junit.Assert.assertEquals;
344334
*
345335
* public class Example {
346-
*
347336
* public static void main(String[] args) throws Exception {
348337
* RxRatpack.initialize();
338+
*
339+
* CyclicBarrier barrier = new CyclicBarrier(5);
340+
*
349341
* try (ExecHarness execHarness = ExecHarness.harness(6)) {
350-
* CyclicBarrier barrier = new CyclicBarrier(5);
351-
* Integer[] myArray = {1, 2, 3, 4, 5};
352-
* Observable<Integer> source = Observable.from(myArray);
353-
* List<Integer> doubledAndSorted = source
354-
* .lift(RxRatpack.<Integer>forkOnNext(execHarness))
355-
* .map(integer -> {
356-
* try {
357-
* barrier.await(); // prove stream is processed concurrently
358-
* } catch (InterruptedException | BrokenBarrierException e) {
359-
* throw new RuntimeException(e);
360-
* }
361-
* return integer.intValue() * 2;
362-
* })
363-
* .serialize()
364-
* .toSortedList()
365-
* .toBlocking()
366-
* .single();
367-
*
368-
* assertEquals(Arrays.asList(2, 4, 6, 8, 10), doubledAndSorted);
342+
* List<Integer> values = execHarness.yield(execution ->
343+
* RxRatpack.promise(
344+
* Observable.just(1, 2, 3, 4, 5)
345+
* .compose(RxRatpack::forkEach) // parallelize
346+
* .doOnNext(value -> ExceptionUtils.uncheck(() -> barrier.await())) // wait for all values
347+
* .map(integer -> integer.intValue() * 2)
348+
* .serialize()
349+
* )
350+
* ).getValue();
351+
*
352+
* List<Integer> sortedValues = new LinkedList<>(values);
353+
* Collections.sort(sortedValues);
354+
* assertEquals(Arrays.asList(2, 4, 6, 8, 10), sortedValues);
369355
* }
370356
* }
371357
* }
372358
* }</pre>
373359
*
374-
* @param execControl the execution control to use to fork executions
375-
* @param <T> the type of item in the stream
376-
* @return an observable operator
360+
* @param observable the observable sequence to process each element of in a forked execution
361+
* @param <T> the element type
362+
* @return an observable
377363
*/
378-
public static <T> Observable.Operator<T, T> forkOnNext(final ExecControl execControl) {
379-
return downstream -> new Subscriber<T>(downstream) {
364+
public static <T> Observable<T> forkEach(Observable<T> observable) {
365+
ExecControl current = ExecControl.current();
366+
367+
return observable.<T>lift(downstream -> new Subscriber<T>(downstream) {
380368

381369
private final AtomicInteger wip = new AtomicInteger(1);
382370
private final AtomicBoolean closed = new AtomicBoolean();
@@ -411,7 +399,7 @@ public void onNext(final T t) {
411399
}
412400

413401
wip.incrementAndGet();
414-
execControl.exec()
402+
current.exec()
415403
.onComplete(e -> this.maybeDone())
416404
.onError(this::onError)
417405
.start(e -> {
@@ -420,7 +408,7 @@ public void onNext(final T t) {
420408
}
421409
});
422410
}
423-
};
411+
});
424412
}
425413

426414
private static class PromiseSubscribe<T, S> implements Observable.OnSubscribe<S> {

ratpack-rx/src/test/groovy/ratpack/rx/RxAsPromiseSpec.groovy

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,15 @@ class RxAsPromiseSpec extends Specification {
4646

4747
def "can test async service"() {
4848
when:
49-
def result = harness.yield { service.observe("foo").asPromise() }
49+
def result = harness.yield { service.observe("foo").promise() }
5050

5151
then:
5252
result.valueOrThrow == ["foo"]
5353
}
5454

5555
def "failed observable causes exception to be thrown"() {
5656
when:
57-
harness.yield { service.fail().asPromise() }.valueOrThrow
57+
harness.yield { service.fail().promise() }.valueOrThrow
5858

5959
then:
6060
def e = thrown RuntimeException
@@ -63,7 +63,7 @@ class RxAsPromiseSpec extends Specification {
6363

6464
def "can unpack single"() {
6565
when:
66-
def result = harness.yield { service.observe("foo").asPromiseSingle() }
66+
def result = harness.yield { service.observe("foo").promiseSingle() }
6767

6868
then:
6969
result.valueOrThrow == "foo"

0 commit comments

Comments
 (0)