-
Notifications
You must be signed in to change notification settings - Fork 140
[Z Dev Notes] Reactive programming for QBit Java microservices library
I get a lot of questions about RxJava and QBit. The first thing that I always say is that you can use RxJava and QBit.
I feel that RxJava is good at handling many calls to services, and coordinating the results.
The best description I heard about RxJava was what if you wanted to call ServiceA, ServiceB and you wanted to take what ServiceB returns and call ServiceC, but you need to return the results from ServiceA and ServiceC to a client. This is a great example of the power of RxJava.
QBit is async from the ground up. This is there from the get go. QBit does not have a lot of utilities to coordinate complex calls to/from local and out of process services. QBit has a callback. The rest is up to you, rolling your own stuff or using a lib like RxJava. Until now....
Before we begin: QBit is a microservices Java lib. It has a fast queue (100M to 200M messages a second), and it has a fast event bus that can be replicated to other nodes and can integrate with Consul to be clustered. It also has a fast JSON parser and support for HTTP calls over JSON (REST) using Spring MVC style annotations and WebSockets.
I have been on projects where I had to coordinate calls to more than one service. I wanted a good way to do this in QBit. I have always rolled one-off solutions. RxJava is a great library, but I also wanted a QBit way to do things.
Since QBit relies on Java 8, I can use what comes with Java 8 and backwards compatibility to Java 6 and Java 7 is not an issue.
Let's show an example, please note that this is from working code, but it is still in early phases.
It will become part of QBit in short order. You can roll your own like I have been doing with Runnables and such and you can see examples of this in the wiki. Or you can use RxJava. Or you can use this new feature (once I finish adding it to QBit, this is a preview).
public static class PretendService {
private final String name;
PretendService(final String name) {
this.name = name;
}
public void serviceCall(final Callback<String> callback, final int seconds, String message) {
Thread thread = new Thread(() -> {
Sys.sleep(seconds * 1000);
callback.accept(name + "::" + message);
});
thread.start();
}
}
The pretend service is nice in that it just waits as long as I tell it, and then returns its name as a return value. It is a great service for demonstrating the concept of Callback coordination.
final PretendService serviceA = new PretendService("SERVICE A");
final PretendService serviceB = new PretendService("SERVICE B");
final PretendService serviceC = new PretendService("SERVICE C");
//Inside Z Service method
final AtomicReference<String> serviceAReturn = new AtomicReference<>();
final AtomicReference<String> serviceCReturn = new AtomicReference<>();
/* Create a callbackWithTimeout for service A to demonstrate
a callbackWithTimeout to show that it can be cancelled. */
final AsyncFutureCallback<String> serviceACallback =
reactor.callback(String.class, serviceAReturn::set);
/* Register a coordinator that checks for return values from service A and C */
final CallbackCoordinator coordinator = reactor.coordinateWithTimeout(() -> {
/* If service A and service C are done, then we are done.
* Let the client know.
*/
if (serviceACallback.isDone() && serviceCReturn.get() != null) {
sendResponseBackToClient(serviceAReturn.get(), serviceCReturn.get());
return true; //true means we are done
}
return false;
}, Timer.timer().now(), 5, TimeUnit.SECONDS, RunnableCallbackTest::sendTimeoutBackToClient);
/* Call service A using the A callbackWithTimeout. */
serviceA.serviceCall(serviceACallback, 1, " from main");
/* Create service C callbackWithTimeout. */
final AsyncFutureCallback<String> serviceCCallback = reactor.callback(String.class,
returnValueFromC -> {
serviceCReturn.set(returnValueFromC);
handleReturnFromC(serviceAReturn, coordinator);
}
);
/* Call Service B, register a callback
which call service C on service b completion. */
serviceB.serviceCall(
reactor.callback(returnValue ->
serviceC.serviceCall(serviceCCallback, 1, " from " + returnValue)
), 1, " from main");
...
...
private void sendTimeoutBackToClient() {
System.out.println("You have timed out");
}
private void sendResponseBackToClient(String a, String ab) {
System.out.println(a + "::" + ab);
}
public void handleReturnFromC(AtomicReference<String> serviceAReturn,
CallbackCoordinator coordinator) {
if (serviceAReturn.get()!=null) {
coordinator.finished();
}
}
By letting the coordinator decide how to handle things and making the state of the coordination be local variables that are captured by the lambda, we can handle the coordination in the context of one method call. This simplifies the code as we use regular imperative Java to do the coordination instead of tons of small callbacks.
One more version.... This one has an async latch. This seems to be the cleanest yet. That last countdown triggers the runnable, which completes the job.
final AtomicReference<String> serviceAReturn = new AtomicReference<>();
final AtomicReference<String> serviceCReturn = new AtomicReference<>();
/* Register a coordinator that checks for return values from service A and C */
final CallbackCoordinator coordinator = reactor.coordinateWithTimeout(() -> {
/* If service A and service C are done, then we are done.
* Let the client know.
*/
if (serviceAReturn.get()!=null && serviceCReturn.get() != null) {
sendResponseBackToClient(serviceAReturn.get(), serviceCReturn.get());
return true; //true means we are done
}
return false;
}, Timer.timer().now(), 5, TimeUnit.SECONDS, RunnableCallbackTest::sendTimeoutBackToClient);
final CountDownAsyncLatch latch = countDownLatch(2,
() -> {
System.out.println("From Latch");
coordinator.finished();
}
);
/* Create a callbackWithTimeout for service A to demonstrate
a callbackWithTimeout to show that it can be cancelled. */
final AsyncFutureCallback<String> serviceACallback =
reactor.callback(String.class, (t) -> {
serviceAReturn.set(t);
latch.countDown();
});
/* Call service A using the A callbackWithTimeout. */
serviceA.serviceCall(serviceACallback, 1, " from main");
/* Create service C callbackWithTimeout. */
final AsyncFutureCallback<String> serviceCCallback = reactor.callback(String.class,
returnValueFromC -> {
serviceCReturn.set(returnValueFromC);
latch.countDown();
}
);
/* Call Service B, register a callback
which call service C on service b completion. */
serviceB.serviceCall(
reactor.callback(returnValue ->
serviceC.serviceCall(serviceCCallback, 1,
" from " + returnValue)
),
1, " from main");
Tried a few more things. Added builders so it was a little more clear what we were working with.
final AtomicReference<String> serviceAReturn = new AtomicReference<>();
final AtomicReference<String> serviceCReturn = new AtomicReference<>();
final CallbackBuilder callbackBuilder = reactor.callbackBuilder();
/* Register a coordinator that checks for return values from service A and C */
final CallbackCoordinator coordinator = reactor.coordinatorBuilder()
.setCoordinator(
() -> {
/* If service A and service C are done, then we are done.
* Let the client know.
*/
if (serviceAReturn.get() != null && serviceCReturn.get() != null) {
sendResponseBackToClient(serviceAReturn.get(), serviceCReturn.get());
return true; //true means we are done
}
return false;
})
.setTimeoutDuration(5)
.setTimeoutTimeUnit(TimeUnit.SECONDS)
.setTimeOutHandler(() -> {
System.out.println("Coordinator timed out");
sendTimeoutBackToClient();
})
.build();
final CountDownAsyncLatch latch = countDownLatch(2,
() -> {
System.out.println("From Latch");
coordinator.finished();
}
);
/* Create a callbackWithTimeout for service A to demonstrate
a callbackWithTimeout to show that it can be cancelled. */
final Callback<String> serviceACallback =
callbackBuilder
.setCallback(String.class, returnValueFromA -> {
serviceAReturn.set(returnValueFromA);
latch.countDown();
})
.setOnTimeout(() -> {
System.out.println("Service A timed out");
sendTimeoutBackToClient();
coordinator.cancel();
})
.setTimeoutDuration(4).setTimeoutTimeUnit(TimeUnit.SECONDS)
.build();
/* Call service A using the serviceACallback. */
serviceA.serviceCall(serviceACallback, 1, " from main");
/* Create service C callback. */
final Callback<String> serviceCCallback =
callbackBuilder
.setCallback(String.class, returnValueFromC -> {
serviceCReturn.set(returnValueFromC);
latch.countDown();
})
.setOnTimeout(() -> {
System.out.println("Service C timed out");
sendTimeoutBackToClient();
coordinator.cancel();
})
.setTimeoutDuration(4).setTimeoutTimeUnit(TimeUnit.SECONDS)
.build();
/* Call Service B, register a callback
which call service C on service b completion. */
serviceB.serviceCall(
reactor.callback(returnValue ->
serviceC.serviceCall(serviceCCallback, 1,
" from " + returnValue)
),
1, " from main");
The builder has reasonable defaults.
final AtomicReference<String> serviceAReturn = new AtomicReference<>();
final AtomicReference<String> serviceCReturn = new AtomicReference<>();
final CallbackBuilder callbackBuilder = reactor.callbackBuilder();
/* Register a coordinator that checks for return values from service A and C */
final CallbackCoordinator coordinator = reactor.coordinatorBuilder()
.setCoordinator(
() -> {
/* If service A and service C are done, then we are done.
* Let the client know.
*/
if (serviceAReturn.get() != null && serviceCReturn.get() != null) {
sendResponseBackToClient(serviceAReturn.get(), serviceCReturn.get());
return true; //true means we are done
}
return false;
})
.setTimeOutHandler(() -> {
System.out.println("Coordinator timed out");
sendTimeoutBackToClient();
})
.build();
final CountDownAsyncLatch latch = countDownLatch(2,
() -> {
System.out.println("From Latch");
coordinator.finished();
}
);
/* Create a callbackWithTimeout for service A to demonstrate
a callbackWithTimeout to show that it can be cancelled. */
final Callback<String> serviceACallback =
callbackBuilder
.setCallback(String.class, returnValueFromA -> {
serviceAReturn.set(returnValueFromA);
latch.countDown();
})
.setOnTimeout(() -> {
System.out.println("Service A timed out");
sendTimeoutBackToClient();
coordinator.cancel();
})
.setTimeoutDuration(4)
.build();
/* Call service A using the serviceACallback. */
serviceA.serviceCall(serviceACallback, 1, " from main");
/* Create service C callback. */
final Callback<String> serviceCCallback =
callbackBuilder
.setCallback(String.class, returnValueFromC -> {
serviceCReturn.set(returnValueFromC);
latch.countDown();
})
.setOnTimeout(() -> {
System.out.println("Service C timed out");
sendTimeoutBackToClient();
coordinator.cancel();
})
.setTimeoutDuration(4)
.build();
/* Call Service B, register a callback
which call service C on service b completion. */
serviceB.serviceCall(
reactor.callbackBuilder()
.setCallback(
returnValue ->
serviceC.serviceCall(serviceCCallback, 1,
" from " + returnValue)
).build(),
1, " from main");
More will be done. The early work seems to work really well.
QBit Website What is Microservices Architecture?
QBit Java Micorservices lib tutorials
The Java microservice lib. QBit is a reactive programming lib for building microservices - JSON, HTTP, WebSocket, and REST. QBit uses reactive programming to build elastic REST, and WebSockets based cloud friendly, web services. SOA evolved for mobile and cloud. ServiceDiscovery, Health, reactive StatService, events, Java idiomatic reactive programming for Microservices.
Reactive Programming, Java Microservices, Rick Hightower
Java Microservices Architecture
[Microservice Service Discovery with Consul] (http://www.mammatustech.com/Microservice-Service-Discovery-with-Consul)
Microservices Service Discovery Tutorial with Consul
[Reactive Microservices] (http://www.mammatustech.com/reactive-microservices)
[High Speed Microservices] (http://www.mammatustech.com/high-speed-microservices)
Reactive Microservices Tutorial, using the Reactor
QBit is mentioned in the Restlet blog
All code is written using JetBrains Idea - the best IDE ever!
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting
Tutorials
- QBit tutorials
- Microservices Intro
- Microservice KPI Monitoring
- Microservice Batteries Included
- RESTful APIs
- QBit and Reakt Promises
- Resourceful REST
- Microservices Reactor
- Working with JSON maps and lists
__
Docs
Getting Started
- First REST Microservice
- REST Microservice Part 2
- ServiceQueue
- ServiceBundle
- ServiceEndpointServer
- REST with URI Params
- Simple Single Page App
Basics
- What is QBit?
- Detailed Overview of QBit
- High level overview
- Low-level HTTP and WebSocket
- Low level WebSocket
- HttpClient
- HTTP Request filter
- HTTP Proxy
- Queues and flushing
- Local Proxies
- ServiceQueue remote and local
- ManagedServiceBuilder, consul, StatsD, Swagger support
- Working with Service Pools
- Callback Builders
- Error Handling
- Health System
- Stats System
- Reactor callback coordination
- Early Service Examples
Concepts
REST
Callbacks and Reactor
Event Bus
Advanced
Integration
- Using QBit in Vert.x
- Reactor-Integrating with Cassandra
- Using QBit with Spring Boot
- SolrJ and service pools
- Swagger support
- MDC Support
- Reactive Streams
- Mesos, Docker, Heroku
- DNS SRV
QBit case studies
QBit 2 Roadmap
-- Related Projects
- QBit Reactive Microservices
- Reakt Reactive Java
- Reakt Guava Bridge
- QBit Extensions
- Reactive Microservices
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting