-
Notifications
You must be signed in to change notification settings - Fork 140
[Document] CallbackBuilder and generics for Reactive Java Microservices
The CallbackBuilder is used to create callbacks. Callbacks have error handlers, timeout handlers and return handlers.
callbackBuilder
.setCallback(ResultSet.class, resultSet ->
statusCallback.accept(resultSet!=null))
.setOnTimeout(() -> statusCallback.accept(false))
.setOnError(error -> statusCallback.onError(error))
.build(ResultSet.class);
this.addEventStorageRecordAsync(callbackBuilder.build(), storageRec);
The CallbackBuilder
has many helper methods to help you with dealing with common Java types like Optional
, Map
, List
, Collection
, Set
, String
, primitive types and wrappers.
This allows you to quickly build callbacks without navigating the complexity of Generics. Let's cover a small example.
First let's define a basic service that uses lists, maps and optional.
package io.advantageous.qbit.example.callback;
import io.advantageous.boon.core.Lists;
import io.advantageous.boon.core.Maps;
import io.advantageous.qbit.reactive.Callback;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public class EmployeeServiceImpl implements EmployeeService {
@Override
public void getEmployeesAsMap(final Callback<Map<String, Employee>> empMapCallback) {
empMapCallback.returnThis(Maps.map("rick", new Employee("Rick")));
}
@Override
public void getEmployeesAsList(final Callback<List<Employee>> empListCallback) {
empListCallback.returnThis(Lists.list(new Employee("Rick")));
}
@Override
public void findEmployeeByName(final Callback<Optional<Employee>> employeeCallback,
final String name) {
if (name.equals("Rick")) {
employeeCallback.returnThis(Optional.of(new Employee("Rick")));
} else {
employeeCallback.returnThis(Optional.empty());
}
}
}
The interface for the above looks like this:
package io.advantageous.qbit.example.callback;
import io.advantageous.qbit.reactive.Callback;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public interface EmployeeService {
void getEmployeesAsMap(Callback<Map<String, Employee>> empMapCallback);
void getEmployeesAsList(Callback<List<Employee>> empListCallback);
void findEmployeeByName(Callback<Optional<Employee>> employeeCallback,
String name);
}
If you are familiar with QBit, all of the above should already make sense. If not, I suggest going through the home page of the WIKI and coming back here after you skim it.
To show how to use the CallbackBuilder we will define a basic Rest service called CompanyRestService
.
/**
* To access this service
* curl http://localhost:8080/emap
{"rick":{"name":"Rick"}}
*/
@RequestMapping("/")
public class CompanyRestService {
private final Logger logger = LoggerFactory.getLogger(CompanyRestService.class);
private final EmployeeService employeeService;
public CompanyRestService(EmployeeService employeeService) {
this.employeeService = employeeService;
}
...
@QueueCallback({QueueCallbackType.EMPTY, QueueCallbackType.LIMIT})
public void process(){
ServiceProxyUtils.flushServiceProxy(employeeService);
}
QBit uses micro-batching which helps optimize message passing between service queues (service actors).
The QueueCallback
annotation allows us to capture when our request queue is empty or when it has hit its limit.
The limit is usually the batch size but could be other things like hitting an important message.
Whenever we hit our limit or when are request queue is empty, we go ahead and flush things to the downstream service by calling ServiceProxyUtils.flushServiceProxy
. This should be mostly review.
As you can see, the EmployeeService
has a lot of methods that take Generic types like Optional
, List
and Map
. When we want to call a downstream service that is going to return a map, list or optional, we have helper methods to make the construction of the callback easier.
@RequestMapping("/")
public class CompanyRestService {
...
@RequestMapping("/emap")
public void employeeMap(final Callback<Map<String, Employee>> empMapCallback) {
final CallbackBuilder callbackBuilder = CallbackBuilder.newCallbackBuilder();
callbackBuilder.wrap(empMapCallback); //Forward to error handling, timeout, and callback defined in empMapCallback
employeeService.getEmployeesAsMap(callbackBuilder.build());
}
In this case, we use the wrap method. This will forward errors, timeouts and the callback return to the empMapCallback
.
To run this, we need to start up the application.
public static void main(final String... args) throws Exception {
/** Create a ManagedServiceBuilder which simplifies QBit wiring. */
final ManagedServiceBuilder managedServiceBuilder = ManagedServiceBuilder.managedServiceBuilder().setRootURI("/");
managedServiceBuilder.enableLoggingMappedDiagnosticContext();
/** Create a service queue for the employee service. */
final ServiceQueue employeeServiceQueue = managedServiceBuilder.createServiceBuilderForServiceObject(
new EmployeeServiceImpl()).buildAndStartAll();
/** Add a CompanyRestService passing it a client proxy to the employee service. */
managedServiceBuilder.addEndpointService(
new CompanyRestService(employeeServiceQueue.createProxy(EmployeeService.class)));
/** Start the server. */
managedServiceBuilder.startApplication();
}
If we wanted to copy and mutate the map before we serialized it, we could use the withMapCallback to capture the async return, i.e., Callback
from the employeeService
.
@RequestMapping("/")
public class CompanyRestService {
...
@RequestMapping("/emap2")
public void employeeMap2(final Callback<Map<String, Employee>> empMapCallback) {
final CallbackBuilder callbackBuilder = CallbackBuilder.newCallbackBuilder();
callbackBuilder.delegate(empMapCallback); //Forward to error handling and timeout defined in empMapCallback
callbackBuilder.withMapCallback(String.class, Employee.class, employeeMap -> {
logger.info("GET MAP {}", employeeMap);
empMapCallback.returnThis(employeeMap);
});
employeeService.getEmployeesAsMap(callbackBuilder.build());
}
In this case we forward just the error handling and timeout handling to the callback that we are creating, and then we create a custom return handler using withMapCallback
.
@RequestMapping("/")
public class CompanyRestService {
...
@RequestMapping("/emap3")
public void employeeMap3(final Callback<Map<String, Employee>> empMapCallback) {
final CallbackBuilder callbackBuilder = CallbackBuilder.newCallbackBuilder();
// Forward to error handling and timeout defined in empMapCallback, but install some additional logging for
// timeout and error handling that associates the error and timeout handling with this call.
callbackBuilder.delegateWithLogging(empMapCallback, logger, "employeeMap3");
callbackBuilder.withMapCallback(String.class, Employee.class, employeeMap -> {
logger.info("GET MAP {}", employeeMap);
empMapCallback.returnThis(employeeMap);
});
employeeService.getEmployeesAsMap(callbackBuilder.build());
}
If you want to handle error logging and timeout logging in the context of this services log, you can simply use the delegateWithLogging
method. This will setup some basic logging for error handing and timeouts.
We of course also have methods that work with List can Collections and Sets and....
@RequestMapping("/")
public class CompanyRestService {
...
@RequestMapping("/elist")
public void employeeList(final Callback<List<Employee>> empListCallback) {
final CallbackBuilder callbackBuilder = CallbackBuilder.newCallbackBuilder();
// Forward to error handling and timeout defined in empMapCallback, but install some additional logging for
// timeout and error handling that associates the error and timeout handling with this call.
callbackBuilder.delegateWithLogging(empListCallback, logger, "employeeList");
callbackBuilder.withListCallback(Employee.class, employeeList -> {
logger.info("GET List {}", employeeList);
empListCallback.returnThis(employeeList);
});
employeeService.getEmployeesAsList(callbackBuilder.build());
}
The above works as you would expect. Let's mix things up a bit. We will call findEmployeeByName
which may or many not return an employee.
@RequestMapping("/")
public class CompanyRestService {
...
@RequestMapping("/find")
public void findEmployee(final Callback<Employee> employeeCallback,
@RequestParam("name") final String name) {
final CallbackBuilder callbackBuilder = CallbackBuilder.newCallbackBuilder();
// Forward to error handling and timeout defined in empMapCallback,
// but install some additional logging for
// timeout and error handling that associates the error and timeout handling with this call.
callbackBuilder.delegateWithLogging(employeeCallback, logger, "employeeMap3");
callbackBuilder.withOptionalCallback(Employee.class, employeeOptional -> {
if (employeeOptional.isPresent()) {
employeeCallback.returnThis(employeeOptional.get());
} else {
employeeCallback.onError(new Exception("Employee not found"));
}
});
employeeService.findEmployeeByName(callbackBuilder.build(), name);
}
To work with Optional's we use withOptionalCallback
. Here we return an error if the employee is not found and we return the Employee
object if he is found.
/**
* You need this is you want to do error handling (Exception) from a callback.
* Callback Builder
* created by rhightower on 3/23/15.
*/
@SuppressWarnings("UnusedReturnValue")
public class CallbackBuilder {
/**
* Builder method to set callback handler that takes a list
* @param componentClass componentClass
* @param callback callback
* @param <T> T
* @return this
*/
public <T> CallbackBuilder withListCallback(final Class<T> componentClass,
final Callback<List<T>> callback) {
this.callback = callback;
return this;
}
/**
* Builder method to set callback handler that takes a set
* @param componentClass componentClass
* @param callback callback
* @param <T> T
* @return this
*/
public <T> CallbackBuilder withSetCallback(final Class<T> componentClass,
final Callback<Set<T>> callback) {
this.callback = callback;
return this;
}
/**
* Builder method to set callback handler that takes a collection
* @param componentClass componentClass
* @param callback callback
* @param <T> T
* @return this
*/
public <T> CallbackBuilder withCollectionCallback(final Class<T> componentClass,
final Callback<Collection<T>> callback) {
this.callback = callback;
return this;
}
/**
* Builder method to set callback handler that takes a map
* @param keyClass keyClass
* @param valueClass valueClass
* @param callback callback
* @param <K> key type
* @param <V> value type
* @return this
*/
public <K, V> CallbackBuilder withMapCallback(final Class<K> keyClass,
final Class<V> valueClass,
final Callback<Map<K, V>> callback) {
this.callback = callback;
return this;
}
/**
* Builder method to set callback handler that takes a boolean
* @param callback callback
* @return this
*/
public CallbackBuilder withBooleanCallback(final Callback<Boolean> callback) {
this.callback = callback;
return this;
}
/**
* Builder method to set callback handler that takes a integer
* @param callback callback
* @return this
*/
public CallbackBuilder withIntCallback(final Callback<Integer> callback) {
this.callback = callback;
return this;
}
/**
* Builder method to set callback handler that takes a long
* @param callback callback
* @return this
*/
public CallbackBuilder withLongCallback(final Callback<Long> callback) {
this.callback = callback;
return this;
}
/**
* Builder method to set callback handler that takes a string
* @param callback callback
* @return this
*/
public CallbackBuilder withStringCallback(final Callback<String> callback) {
this.callback = callback;
return this;
}
/**
* Builder method to set callback handler that takes an optional string
* @param callback callback
* @return this
*/
public CallbackBuilder withOptionalStringCallback(final Callback<Optional<String>> callback) {
this.callback = callback;
return this;
}
/**
* Builder method to set callback handler that takes an optional string
* @param callback callback
* @return this
*/
public <T> CallbackBuilder withOptionalCallback(final Class<T> cls, final Callback<Optional<T>> callback) {
this.callback = callback;
return this;
}
Read more about callback builders and how to handle errors, timeouts and downstream calls.
##Reactor
Let's say that EmployeeService was really talking to some downstream remote services or perhaps to Cassandra and/or Redis. Let's also say that you want to add some timeout for this downstream system. Let's say 10 seconds.
Then our example will use the QBit Reactor
and the easiest way to do that would be to subclass the BaseService
.
package io.advantageous.qbit.example.callback;
import io.advantageous.qbit.admin.ManagedServiceBuilder;
import io.advantageous.qbit.annotation.RequestMapping;
import io.advantageous.qbit.annotation.RequestParam;
import io.advantageous.qbit.reactive.Callback;
import io.advantageous.qbit.reactive.CallbackBuilder;
import io.advantageous.qbit.reactive.Reactor;
import io.advantageous.qbit.reactive.ReactorBuilder;
import io.advantageous.qbit.service.BaseService;
import io.advantageous.qbit.service.ServiceQueue;
import io.advantageous.qbit.service.stats.StatsCollector;
import io.advantageous.qbit.util.Timer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@RequestMapping("/")
public class CompanyRestServiceUsingReactor extends BaseService {
private final Logger logger = LoggerFactory.getLogger(CompanyRestService.class);
private final EmployeeService employeeService;
public CompanyRestServiceUsingReactor(Reactor reactor,
Timer timer,
StatsCollector statsCollector,
EmployeeService employeeService) {
super(reactor, timer, statsCollector);
this.employeeService = employeeService;
reactor.addServiceToFlush(employeeService);
}
@RequestMapping("/emap")
public void employeeMap(final Callback<Map<String, Employee>> empMapCallback) {
final CallbackBuilder callbackBuilder = super.reactor.callbackBuilder();
callbackBuilder.wrap(empMapCallback); //Forward to error handling, timeout, and callback defined in empMapCallback
employeeService.getEmployeesAsMap(callbackBuilder.build());
}
@RequestMapping("/emap2")
public void employeeMap2(final Callback<Map<String, Employee>> empMapCallback) {
final CallbackBuilder callbackBuilder = super.reactor.callbackBuilder();
callbackBuilder.delegate(empMapCallback); //Forward to error handling and timeout defined in empMapCallback
callbackBuilder.withMapCallback(String.class, Employee.class, employeeMap -> {
logger.info("GET MAP {}", employeeMap);
empMapCallback.returnThis(employeeMap);
});
employeeService.getEmployeesAsMap(callbackBuilder.build());
}
@RequestMapping("/emap3")
public void employeeMap3(final Callback<Map<String, Employee>> empMapCallback) {
final CallbackBuilder callbackBuilder = super.reactor.callbackBuilder();
// Forward to error handling and timeout defined in empMapCallback, but install some additional logging for
// timeout and error handling that associates the error and timeout handling with this call.
callbackBuilder.delegateWithLogging(empMapCallback, logger, "employeeMap3");
callbackBuilder.withMapCallback(String.class, Employee.class, employeeMap -> {
logger.info("GET MAP {}", employeeMap);
empMapCallback.returnThis(employeeMap);
});
employeeService.getEmployeesAsMap(callbackBuilder.build());
}
@RequestMapping("/elist")
public void employeeList(final Callback<List<Employee>> empListCallback) {
final CallbackBuilder callbackBuilder = super.reactor.callbackBuilder();
// Forward to error handling and timeout defined in empMapCallback, but install some additional logging for
// timeout and error handling that associates the error and timeout handling with this call.
callbackBuilder.delegateWithLogging(empListCallback, logger, "employeeList");
callbackBuilder.withListCallback(Employee.class, employeeList -> {
logger.info("GET List {}", employeeList);
empListCallback.returnThis(employeeList);
});
employeeService.getEmployeesAsList(callbackBuilder.build());
}
@RequestMapping("/find")
public void findEmployee(final Callback<Employee> employeeCallback,
@RequestParam("name") final String name) {
final long startTime = super.time;
final CallbackBuilder callbackBuilder = super.reactor.callbackBuilder();
// Forward to error handling and timeout defined in empMapCallback, but install some additional logging for
// timeout and error handling that associates the error and timeout handling with this call.
callbackBuilder.delegateWithLogging(employeeCallback, logger, "employeeMap3");
callbackBuilder.withOptionalCallback(Employee.class, employeeOptional -> {
super.recordTiming("findEmployee", time - startTime);
if (employeeOptional.isPresent()) {
employeeCallback.returnThis(employeeOptional.get());
} else {
employeeCallback.onError(new Exception("Employee not found"));
}
});
employeeService.findEmployeeByName(callbackBuilder.build(), name);
}
public static void main(final String... args) throws Exception {
/** Create a ManagedServiceBuilder which simplifies QBit wiring. */
final ManagedServiceBuilder managedServiceBuilder = ManagedServiceBuilder.managedServiceBuilder().setRootURI("/");
managedServiceBuilder.enableLoggingMappedDiagnosticContext();
/** Create a service queue for the employee service. */
final ServiceQueue employeeServiceQueue = managedServiceBuilder.createServiceBuilderForServiceObject(
new EmployeeServiceImpl()).buildAndStartAll();
/** Add a CompanyRestService passing it a client proxy to the employee service. */
managedServiceBuilder.addEndpointService(
new CompanyRestServiceUsingReactor(
ReactorBuilder.reactorBuilder().setDefaultTimeOut(10).setTimeUnit(TimeUnit.SECONDS).build(),
Timer.timer(),
managedServiceBuilder.getStatServiceBuilder().buildStatsCollector(),
employeeServiceQueue.createProxy(EmployeeService.class)));
/** Start the server. */
managedServiceBuilder.startApplication();
}
}
Notice that the callbackBuilder
is now constructed from the reactor (final CallbackBuilder callbackBuilder = super.reactor.callbackBuilder();
).
To learn more about the Reactor, please read Reactively handling async calls with QBit Reactive Microservices.
##Stats
When you use the BaseService
, you also have access to the stats system.
@RequestMapping("/find")
public void findEmployee(final Callback<Employee> employeeCallback,
@RequestParam("name") final String name) {
final long startTime = super.time;
final CallbackBuilder callbackBuilder = super.reactor.callbackBuilder();
callbackBuilder.delegateWithLogging(employeeCallback, logger, "employeeMap3");
callbackBuilder.withOptionalCallback(Employee.class, employeeOptional -> {
/** Record timing. */
super.recordTiming("findEmployee", time - startTime);
if (employeeOptional.isPresent()) {
/* Increment count of employees found. */
super.incrementCount("employeeFound");
employeeCallback.returnThis(employeeOptional.get());
} else {
/* Increment count of employees not found. */
super.incrementCount("employeeNotFound");
employeeCallback.onError(new Exception("Employee not found"));
}
});
employeeService.findEmployeeByName(callbackBuilder.build(), name);
}
To learn more about stats and the ManagedServiceBuilder read this.
QBit Website What is Microservices Architecture?
QBit Java Micorservices lib tutorials
The Java microservice lib. QBit is a reactive programming lib for building microservices - JSON, HTTP, WebSocket, and REST. QBit uses reactive programming to build elastic REST, and WebSockets based cloud friendly, web services. SOA evolved for mobile and cloud. ServiceDiscovery, Health, reactive StatService, events, Java idiomatic reactive programming for Microservices.
Reactive Programming, Java Microservices, Rick Hightower
Java Microservices Architecture
[Microservice Service Discovery with Consul] (http://www.mammatustech.com/Microservice-Service-Discovery-with-Consul)
Microservices Service Discovery Tutorial with Consul
[Reactive Microservices] (http://www.mammatustech.com/reactive-microservices)
[High Speed Microservices] (http://www.mammatustech.com/high-speed-microservices)
Reactive Microservices Tutorial, using the Reactor
QBit is mentioned in the Restlet blog
All code is written using JetBrains Idea - the best IDE ever!
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting
Tutorials
- QBit tutorials
- Microservices Intro
- Microservice KPI Monitoring
- Microservice Batteries Included
- RESTful APIs
- QBit and Reakt Promises
- Resourceful REST
- Microservices Reactor
- Working with JSON maps and lists
__
Docs
Getting Started
- First REST Microservice
- REST Microservice Part 2
- ServiceQueue
- ServiceBundle
- ServiceEndpointServer
- REST with URI Params
- Simple Single Page App
Basics
- What is QBit?
- Detailed Overview of QBit
- High level overview
- Low-level HTTP and WebSocket
- Low level WebSocket
- HttpClient
- HTTP Request filter
- HTTP Proxy
- Queues and flushing
- Local Proxies
- ServiceQueue remote and local
- ManagedServiceBuilder, consul, StatsD, Swagger support
- Working with Service Pools
- Callback Builders
- Error Handling
- Health System
- Stats System
- Reactor callback coordination
- Early Service Examples
Concepts
REST
Callbacks and Reactor
Event Bus
Advanced
Integration
- Using QBit in Vert.x
- Reactor-Integrating with Cassandra
- Using QBit with Spring Boot
- SolrJ and service pools
- Swagger support
- MDC Support
- Reactive Streams
- Mesos, Docker, Heroku
- DNS SRV
QBit case studies
QBit 2 Roadmap
-- Related Projects
- QBit Reactive Microservices
- Reakt Reactive Java
- Reakt Guava Bridge
- QBit Extensions
- Reactive Microservices
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting