-
Notifications
You must be signed in to change notification settings - Fork 8
Promises.all
We have all
functionality with promises. You can create a promise that waits on all promise
s passed to it to be async returned.
Promises.all(promise1, promise2, promise3).catchError(returnPromise::reject)
.then(v -> returnPromise.resolve(true)).invoke()
If every promise you pass to an all promise is invokeable then the all promise is invokeable as well, and calling invoke on it will invoke all of the children promises.
The Promise.all(list or array) method returns a promise that resolves when all of the promises have resolved, or rejects with the reason of the first passed promise that rejects.
Promises.all(
//Call to save Todo item in two table, don't respond until
// both calls come back from Cassandra.
// First call to cassandra.
futureToPromise(
session.executeAsync(insertInto("Todo")
.value("id", todo.getId())
.value("updatedTime", todo.getUpdatedTime())
.value("createdTime", todo.getCreatedTime())
.value("name", todo.getName())
.value("description", todo.getDescription()))
).catchError(error -> recordCassandraError("add.todo", error))
.thenSafe(resultSet -> handleResultFromAdd(resultSet, "add.todo")),
// Second call to cassandra.
futureToPromise(
session.executeAsync(insertInto("TodoLookup")
.value("id", todo.getId())
.value("updatedTime", todo.getUpdatedTime()))
).catchError(error -> recordCassandraError("add.lookup", error))
.thenSafe(resultSet -> handleResultFromAdd(resultSet, "add.lookup")
).catchError(returnPromise::reject)
.then(v -> returnPromise.resolve(true)).invoke()
/** Employee service. */
EmployeeService employeeService = ...
/* Promise that expects an employee. */
Promise<Employee> promise1 = Promises.promise();
Promise<Employee> promise2 = Promises.promise();
/* Promise that returns when all employees are returned. */
final Promise<Void> allPromise = Promises.all(promise1, promise2);
allPromise.then(nil -> System.out.println("All DONE!"));
assertFalse("Not done yet", allPromise.complete());
/** Call service. */
employeeService.loadEmployee("1", promise1);
/** Still not done because only one service has been called. */
assertFalse("Still not done yet", allPromise.complete());
/** Ok now second service is called. */
employeeService.loadEmployee("2", promise2);
/** Wait some time. */
//...
assertTrue(allPromise.complete());
assertTrue(allPromise.success());
We have three types of all
promises: callback, blocking and replay callback. (A replay callback is a callback that gets replayed in the caller's thread).
...
public interface Promises ...{
/**
* All promises must complete.
* @param promises promises
* @return return containing promise
*/
static Promise<Void> all(Promise<?>... promises) {
return new AllPromise(promises);
}
/**
* All promises must complete.
* @param promises promises
* @return return containing promise that is blocking.
*/
static Promise<Void> allBlocking(Promise<?>... promises) {
return new AllBlockingPromise(promises);
}
/**
* All promises must complete.
* @param timeout timeout
* @param time time
* @param promises promises
* @return returns replay promise so promise can be replayed in caller's thread.
*/
static ReplayPromise<Void> allReplay(final Duration timeout, long time, Promise<?>... promises) {
return new AllReplayPromise(timeout, time, promises);
}
/**
*
* All promises must complete.
* @param timeout timeout
* @param promises promises
* @return returns replay promise so promise can be replayed in caller's thread.
*/
static ReplayPromise<Void> allReplay(final Duration timeout, Promise<?>... promises) {
return Promises.allReplay(timeout, System.currentTimeMillis(), promises);
}
...
Given this test service.
public static class TestService {
public void simple(Callback<Employee> callback) {
callback.reply(new Employee("Rick"));
}
public void async(final Callback<Employee> callback) {
new Thread(() -> {
callback.reply(new Employee("Rick"));
}).start();
}
public void asyncTimeout(final Callback<Employee> callback) {
new Thread(() -> {
try {
Thread.sleep(10_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
callback.reply(new Employee("Rick"));
}).start();
}
public void asyncError(final Callback<Employee> callback) {
new Thread(() -> {
callback.reject("Rick");
}).start();
}
public void error(Callback<Employee> callback) {
callback.reject("Error");
}
public void exception(Callback<Employee> callback) {
callback.reject(new IllegalStateException("Error"));
}
}
We can have these three example tests.
import io.advantageous.reakt.Callback;
import io.advantageous.reakt.Expected;
import io.advantageous.reakt.Promises.promise;
@Test
public void testAllBlocking() throws Exception {
TestService testService = new TestService();
Promise<Employee> promise1 = Promises.promise();
Promise<Employee> promise2 = Promises.promise();
final Promise<Void> promise = Promises.allBlocking(promise1, promise2);
assertFalse(promise.complete());
testService.async(promise1);
assertFalse(promise.complete());
testService.async(promise2);
assertTrue(promise.success());
}
@Test
public void testAll() throws Exception {
/** Test service. */
TestService testService = new TestService();
/* Promise that expects an employee. */
Promise<Employee> promise1 = Promises.promise();
Promise<Employee> promise2 = Promises.promise();
/* Promise that returns when all employees are returned. */
final Promise<Void> promise = Promises.all(promise1, promise2);
promise.then(nil -> System.out.println("DONE!"));
assertFalse("Not done yet", promise.complete());
/** Call service. */
testService.simple(promise1);
/** Still not done because only one service has been called. */
assertFalse(promise.complete());
/** Ok now second service is called. */
testService.simple(promise2);
/** Wait some time. */
//...
assertTrue(promise.complete());
assertTrue(promise.success());
}
@Test
public void testAllReplay() throws Exception {
TestService testService = new TestService();
Promise<Employee> promise1 = Promises.promise();
Promise<Employee> promise2 = Promises.promise();
final ReplayPromise<Void> promise = Promises.allReplay(Duration.ofMillis(1000),
promise1, promise2);
assertFalse(promise.complete());
testService.async(promise1);
assertFalse(promise.complete());
testService.async(promise2);
for (int index=0; index < 10; index++) {
promise.check(System.currentTimeMillis());
if (promise.complete()) break;
Thread.sleep(10);
}
assertTrue(promise.complete());
assertTrue(promise.success());
}
Java Promises
- Promise
- Promise then*() and catchError()
- Promise thenMap()
- Promise all()
- Promise any()
- Blocking Promise
- Invokable Promise
- Reactor Replay Promises
Reactor, Stream, Results
- QBit Reactive Microservices
- Reakt Reactive Java
- Reakt Guava Bridge
- QBit Extensions
- Elekt Consul Leadership election
- Elekt Leadership election
- Reactive Microservices
What is Microservices Architecture?
QBit Java Micorservices lib tutorials
The Java microservice lib. QBit is a reactive programming lib for building microservices - JSON, HTTP, WebSocket, and REST. QBit uses reactive programming to build elastic REST, and WebSockets based cloud friendly, web services. SOA evolved for mobile and cloud. ServiceDiscovery, Health, reactive StatService, events, Java idiomatic reactive programming for Microservices.
Reactive Programming, Java Microservices, Rick Hightower
Java Microservices Architecture
[Microservice Service Discovery with Consul] (http://www.mammatustech.com/Microservice-Service-Discovery-with-Consul)
Microservices Service Discovery Tutorial with Consul
[Reactive Microservices] (http://www.mammatustech.com/reactive-microservices)
[High Speed Microservices] (http://www.mammatustech.com/high-speed-microservices)
Reactive Microservices Tutorial, using the Reactor
QBit is mentioned in the Restlet blog
All code is written using JetBrains Idea - the best IDE ever!
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting
Java Promises
- Promise
- Promise then*() and catchError()
- Promise thenMap()
- Promise all()
- Promise any()
- Blocking Promise
- Invokable Promise
- Reactor Replay Promises
Callback, and async Results
Reactor, Stream and Stream Result
Expected & Circuit Breaker
Scala Akka and Reakt