-
Notifications
You must be signed in to change notification settings - Fork 140
[Detailed Tutorial] Building a simple recommendation engine with QBit (CallBack nonBlocking)
##overview
To really grasp QBit, one must grasp the concepts of a CallBack and queues. A CallBack is a way to get an async response in QBit from a microservice. You call a service method and it calls you back. There are two golden rules to the Queue club:
-Don't block.
-Use a callback if you are not ready to handle events/methods, and continue handling events/methods that you are ready for.
This wiki will walk you through the process of building a simple recommendation engine with QBit, in the [previous example ] (https://github.com/advantageous/qbit/wiki/%5BQuick-Start%5D-Building-a-simple-recommendation-engine-with-QBit-(CallBack-Blocking)) we talked about how loadUser
is blocking which might result in blocking threads that handle all the messages. In this example the blocking issue will be fixed and things are going to be very simple as well.
You will build a simple recommendation engine with QBit; that will give a set of recommendations to users. When you run it you will get the following:
Recommendations for: Bob
Recommendation{recommendation='Take a walk'}
Recommendation{recommendation='Read a book'}
Recommendation{recommendation='Love more, complain less'}
Recommendations for: Joe
Recommendation{recommendation='Take a walk'}
Recommendation{recommendation='Read a book'}
Recommendation{recommendation='Love more, complain less'}
Recommendations for: Scott
Recommendation{recommendation='Take a walk'}
Recommendation{recommendation='Read a book'}
Recommendation{recommendation='Love more, complain less'}
Recommendations for: William
Recommendation{recommendation='Take a walk'}
Recommendation{recommendation='Read a book'}
Recommendation{recommendation='Love more, complain less'}
In order to complete this example successfully you will need the following installed on your machine:
- Gradle; if you need help installing it, visit Installing Gradle.
- Your favorite IDE or text editor (we recommend [Intellig IDEA ] (https://www.jetbrains.com/idea/) latest version).
- [JDK ] (http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html) 1.8 or later.
- Build and install QBit on your machine click [Building QBit ] (https://github.com/advantageous/qbit/wiki/%5BQuick-Start%5D-Building-QBit-the-microservice-lib-for-Java) for instrutions.
Now that your machine is all ready let's get started:
- [Download ] (https://github.com/fadihub/worker-callback-nonblocking/archive/master.zip) and unzip the source repository for this guide, or clone it using Git:
https://github.com/fadihub/worker-callback-nonblocking.git
Once this is done you can test the service, let's first explain the process:
Here is the User
object or domain object.
~/src/main/java/io.advantageous.qbit.example.recommendationengine/User
package io.advantageous.qbit.example.recommendationengine;
/* Domain object. */
public class User {
private final String userName;
public User(String userName){
this.userName = userName;
}
public String getUserName() {
return userName;
}
}
Here is the Recommendation
object or domain object.
package io.advantageous.qbit.example.recommendationengine;
/* Domain object. */
public class Recommendation {
private final String recommendation;
public Recommendation(String recommendation) {
this.recommendation = recommendation;
}
@Override
public String toString() {
return "Recommendation{" +
"recommendation='" + recommendation + '\'' +
'}';
}
}
Going back to the [previews example ] (https://github.com/advantageous/qbit/wiki/%5BDetailed-Tutorial%5D-Building-a-simple-recommendation-engine-with-QBit-(CallBack-Blocking)) we mentioned that if we get a lot of cache hits for user loads, perhaps the block will not be that long, but it will be there and every time we have to fault in a user, the whole system is gummed up. So in this example every time we can't handle the recommendation request, we go ahead and make an async call to the UserDataService
. When that async callback comes back, then we handle that request. In the mean time, we handle recommendation lists requests as quickly as we can. This way We will never block.
let's show how this is done; we added a CallBack to the RecommendationService
as follows:
public void recommend(final Callback<List<Recommendation>> recommendationsCallback,
final String userName) {
Now we are taking a callback and we can decide when we want to handle this recommendation request. We can do it right away if the user data is available in-memory or we can delay it.
if the User is found in memory call the callback right away for the RecommendationService
in memory
public void recommend(final Callback<List<Recommendation>> recommendationsCallback,
final String userName) {
User user = users.get(userName);
if (user == null) {
.....
} else {
recommendationsCallback.accept(runRulesEngineAgainstUser(user));
}
}
If the user is not found in memory load him from the UserDataService
, but still don't block.
ublic void recommend(final Callback<List<Recommendation>> recommendationsCallback,
final String userName) {
User user = users.get(userName);
if (user == null) {
userDataService.loadUser(
loadedUser -> {
handleLoadFromUserDataService(loadedUser, recommendationsCallback);
}, userName);
} else {
.....
}
}
Here we use a CallBack to load the user, and when the user is loaded, we call handleLoadFromUserDataService
which adds some management for handling the callback so we can still handle this call in the future.
After the user service system loads the user from its store, we want to handle the request for the recommendations.
public class RecommendationService {
............
/**
* Handle deferred recommendations based on user loads.
*/
private void handleLoadFromUserDataService(final User loadedUser,
final Callback<List<Recommendation>> recommendationsCallback) {
/** Add a runnable to the callbacks list. */
callbacks.add(() -> {
List<Recommendation> recommendations = runRulesEngineAgainstUser(loadedUser);
recommendationsCallback.accept(recommendations);
});
}
Every time we get a callback call from UserDataService
, we then perform the recommendation rules and callback our caller. We do this by enqueueing a runnable onto the callback queue, and later we will iterate through those.
A good time to handle callbacks from UserDataService
is when the RecommendationService
is notified when its queue is empty, it has started a new batch and when it has reached a batch limit.
@QueueCallback({
QueueCallbackType.EMPTY,
QueueCallbackType.START_BATCH,
QueueCallbackType.LIMIT})
private void handleCallbacks() {
flushServiceProxy(userDataService);
Runnable runnable = callbacks.poll();
while (runnable != null) {
runnable.run();
runnable = callbacks.poll();
}
}
package io.advantageous.qbit.example.recommendationengine;
import io.advantageous.boon.Lists;
import io.advantageous.boon.cache.SimpleLRUCache;
import io.advantageous.qbit.annotation.QueueCallback;
import io.advantageous.qbit.annotation.QueueCallbackType;
import io.advantageous.qbit.reactive.Callback;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import static io.advantageous.qbit.service.ServiceProxyUtils.flushServiceProxy;
public class RecommendationService {
private final SimpleLRUCache<String, User> users =
new SimpleLRUCache<>(10_000);
private BlockingQueue<Runnable> callbacks = new ArrayBlockingQueue<Runnable>(10_000);
private UserDataServiceClient userDataService;
public RecommendationService(UserDataServiceClient userDataService) {
this.userDataService = userDataService;
}
public void recommend(final Callback<List<Recommendation>> recommendationsCallback,
final String userName) {
System.out.println("recommend called");
User user = users.get(userName);
if (user == null) {
userDataService.loadUser(
loadedUser -> {
handleLoadFromUserDataService(loadedUser, recommendationsCallback);
}, userName);
} else {
recommendationsCallback.accept(runRulesEngineAgainstUser(user));
}
}
/**
* Handle defered recommendations based on user loads.
*/
private void handleLoadFromUserDataService(final User loadedUser,
final Callback<List<Recommendation>> recommendationsCallback) {
/** Add a runnable to the callbacks list. */
callbacks.add(() -> {
List<Recommendation> recommendations = runRulesEngineAgainstUser(loadedUser);
recommendationsCallback.accept(recommendations);
});
}
@QueueCallback({
QueueCallbackType.EMPTY,
QueueCallbackType.START_BATCH,
QueueCallbackType.LIMIT})
private void handleCallbacks() {
flushServiceProxy(userDataService);
Runnable runnable = callbacks.poll();
while (runnable != null) {
runnable.run();
runnable = callbacks.poll();
}
}
/* Fake CPU intensive operation. */
private List<Recommendation> runRulesEngineAgainstUser(final User user) {
return Lists.list(new Recommendation("Take a walk"), new Recommendation("Read a book"),
new Recommendation("Love more, complain less"));
}
}
UserDataService
manages editing, backup, syncing user data, keeps most users in-memory and also manages replicating and storing user data.
When the user is not found in memory UserDataService
will load that particular user and make it thread ready (runnable) then RecommendationService
will handle the callback from UserDataService
when the queue is empty, has started a new batch and when it has reached a batch limit; these are the best times to handle such callbacks.
package io.advantageous.qbit.example.recommendationengine;
import io.advantageous.qbit.annotation.QueueCallback;
import io.advantageous.qbit.annotation.QueueCallbackType;
import io.advantageous.qbit.reactive.Callback;
import io.advantageous.boon.core.Sys;
import java.util.ArrayList;
import java.util.List;
import static io.advantageous.boon.Boon.puts;
public class UserDataService {
private final List<Runnable> userLoadCallBacks = new ArrayList<>(1_000);
public void loadUser(final Callback<User> callBack, final String userId) {
puts("UserDataService :: loadUser called", userId);
userLoadCallBacks.add(
new Runnable() {
@Override
public void run() {
callBack.accept(new User(userId));
}
});
}
@QueueCallback({QueueCallbackType.EMPTY, QueueCallbackType.LIMIT})
public void pretendToDoIO() {
Sys.sleep(100);
if (userLoadCallBacks.size()==0) {
return;
}
for (Runnable runnable : userLoadCallBacks) {
runnable.run();
}
userLoadCallBacks.clear();
}
}
The client interface is your interface to calling the service. Calling methods on the client interface enqueues those method calls onto the service queue for the service.
The ServiceQueue
manages threads/queues for a Service implementation so the service can be thread safe and fast.
package io.advantageous.qbit.example.recommendationengine;
import io.advantageous.qbit.reactive.Callback;
public interface UserDataServiceClient {
void loadUser(Callback<User> callBack, String userId);
}
package io.advantageous.qbit.example.recommendationengine;
import io.advantageous.qbit.reactive.Callback;
import java.util.List;
/**
* @author rhightower
* on 2/20/15.
*/
public interface RecommendationServiceClient {
void recommend(final Callback<List<Recommendation>> recommendationsCallback,
final String userName);
}
This is Main to run the program; we create UserDataService
and its client proxy, then create RecommendationService
and its client proxy, then use RecommendationServiceClient
to give a set of recommendations for the four fake users we created.
package io.advantageous.qbit.example.recommendationengine;
import io.advantageous.boon.core.Sys;
import io.advantageous.qbit.service.ServiceQueue;
import java.util.List;
import static io.advantageous.boon.core.Lists.list;
import static io.advantageous.qbit.service.ServiceBuilder.serviceBuilder;
import static io.advantageous.qbit.service.ServiceProxyUtils.flushServiceProxy;
/**
* Created by rhightower on 2/20/15.
*/
public class PrototypeMain {
public static void main(String... args) {
/* Create user data service and client proxy. */
ServiceQueue userDataService = serviceBuilder()
.setServiceObject(new UserDataService())
.build().startServiceQueue();
userDataService.startCallBackHandler();
UserDataServiceClient userDataServiceClient = userDataService
.createProxy(UserDataServiceClient.class);
/* Create recommendation service and client proxy. */
RecommendationService recommendationServiceImpl =
new RecommendationService(userDataServiceClient);
ServiceQueue recommendationServiceQueue = serviceBuilder()
.setServiceObject(recommendationServiceImpl)
.build().startServiceQueue().startCallBackHandler();
RecommendationServiceClient recommendationServiceClient =
recommendationServiceQueue.createProxy(RecommendationServiceClient.class);
/* Use recommendationServiceClient for 4 recommendations for
Bob, Joe, Scott and William. */
List<String> userNames = list("Bob", "Joe", "Scott", "William");
userNames.forEach( userName->
recommendationServiceClient.recommend(recommendations -> {
System.out.println("Recommendations for: " + userName);
recommendations.forEach(recommendation->
System.out.println("\t" + recommendation));
}, userName)
);
flushServiceProxy(recommendationServiceClient);
Sys.sleep(1000);
}
}
With your terminal cd worker-callback-nonblocking
then gradle clean build
and finally gradle run
you should get the following:
Recommendations for: Bob
Recommendation{recommendation='Take a walk'}
Recommendation{recommendation='Read a book'}
Recommendation{recommendation='Love more, complain less'}
Recommendations for: Joe
Recommendation{recommendation='Take a walk'}
Recommendation{recommendation='Read a book'}
Recommendation{recommendation='Love more, complain less'}
Recommendations for: Scott
Recommendation{recommendation='Take a walk'}
Recommendation{recommendation='Read a book'}
Recommendation{recommendation='Love more, complain less'}
Recommendations for: William
Recommendation{recommendation='Take a walk'}
Recommendation{recommendation='Read a book'}
Recommendation{recommendation='Love more, complain less'}
The two golden rules for the Queue club are as follows: The first rule - don't block. The second rule - if you are not ready to handle events, use a callback and continue handling stuff you are ready for. In this example we followed these very important rules and we showed how to fix the [previous example ] (https://github.com/advantageous/qbit/wiki/%5BDetailed-Tutorial%5D-Building-a-simple-recommendation-engine-with-QBit-(CallBack-Blocking)) where we had a blocking issue. SO in this example you have built and tested the non blocking version of the recommendation engine with QBit, see you in the next tutorial!
QBit Website What is Microservices Architecture?
QBit Java Micorservices lib tutorials
The Java microservice lib. QBit is a reactive programming lib for building microservices - JSON, HTTP, WebSocket, and REST. QBit uses reactive programming to build elastic REST, and WebSockets based cloud friendly, web services. SOA evolved for mobile and cloud. ServiceDiscovery, Health, reactive StatService, events, Java idiomatic reactive programming for Microservices.
Reactive Programming, Java Microservices, Rick Hightower
Java Microservices Architecture
[Microservice Service Discovery with Consul] (http://www.mammatustech.com/Microservice-Service-Discovery-with-Consul)
Microservices Service Discovery Tutorial with Consul
[Reactive Microservices] (http://www.mammatustech.com/reactive-microservices)
[High Speed Microservices] (http://www.mammatustech.com/high-speed-microservices)
Reactive Microservices Tutorial, using the Reactor
QBit is mentioned in the Restlet blog
All code is written using JetBrains Idea - the best IDE ever!
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting
Tutorials
- QBit tutorials
- Microservices Intro
- Microservice KPI Monitoring
- Microservice Batteries Included
- RESTful APIs
- QBit and Reakt Promises
- Resourceful REST
- Microservices Reactor
- Working with JSON maps and lists
__
Docs
Getting Started
- First REST Microservice
- REST Microservice Part 2
- ServiceQueue
- ServiceBundle
- ServiceEndpointServer
- REST with URI Params
- Simple Single Page App
Basics
- What is QBit?
- Detailed Overview of QBit
- High level overview
- Low-level HTTP and WebSocket
- Low level WebSocket
- HttpClient
- HTTP Request filter
- HTTP Proxy
- Queues and flushing
- Local Proxies
- ServiceQueue remote and local
- ManagedServiceBuilder, consul, StatsD, Swagger support
- Working with Service Pools
- Callback Builders
- Error Handling
- Health System
- Stats System
- Reactor callback coordination
- Early Service Examples
Concepts
REST
Callbacks and Reactor
Event Bus
Advanced
Integration
- Using QBit in Vert.x
- Reactor-Integrating with Cassandra
- Using QBit with Spring Boot
- SolrJ and service pools
- Swagger support
- MDC Support
- Reactive Streams
- Mesos, Docker, Heroku
- DNS SRV
QBit case studies
QBit 2 Roadmap
-- Related Projects
- QBit Reactive Microservices
- Reakt Reactive Java
- Reakt Guava Bridge
- QBit Extensions
- Reactive Microservices
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting