-
Notifications
You must be signed in to change notification settings - Fork 140
[Detailed Tutorial] Using QBit's Event Bus System (The Employee example)
##overview
QBit has an event bus. The advantage of Using the event bus with QBit services is that the events come into the same queue that handles the method calls so that the events method calls are thread safe. Everything comes in on the same thread, events and methods. The event bus is very fast, expect speeds up to 10M to 100M messages per second. Also the event bus is a great way to include additional services without disrupting existing services. With QBit's event bus you can send objects, strongly typed objects, JSON, Maps, etc.
You can wire QBit Services into many event bus systems. This makes it easy to wire a service to listen to events coming from Kafka, RabbitMQ or something else.
This wiki will walk you through a simple "employee example" to demonstrate to you how to use QBit's event bus system.
You will build a simple "employee example" that includes four services; each service will handle the following situations: when a new employees is hired, add the employee to the payroll system, enroll the employee into the benefits system, and invite them to our community outreach program.
In this example the first service will not know about the other services. And we can add more services in the future which can listen to events and participate in the new employee being hired. This will be great example to demonstrate to you how to use QBit's event bus system. When you run this example you will get the following:
Hired employee Employee{firstName='Rick', employeeId=1}
Employee added to payroll Rick 1 100
Employee enrolled into benefits system employee Rick 1
Employee will be invited to the community outreach program Rick 1
In order to complete this example successfully you will need the following installed on your machine:
- Gradle; if you need help installing it, visit Installing Gradle.
- Your favorite IDE or text editor (we recommend [Intellig IDEA ] (https://www.jetbrains.com/idea/) latest version).
- [JDK ] (http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html) 1.8 or later.
- Build and install QBit on your machine click [Building QBit ] (https://github.com/advantageous/qbit/wiki/%5BQuick-Start%5D-Building-QBit-the-microservice-lib-for-Java) for instrutions.
Now that your machine is all ready let's get started:
- [Download ] (https://github.com/fadihub/event-bus-system-qbit/archive/master.zip) and unzip the source repository for this guide, or clone it using Git:
https://github.com/fadihub/event-bus-system-qbit.git
Once this is done you can test the service, let's first explain the process:
As mentioned before this example has four services: EmployeeHiringService, BenefitsService, VolunteerService, PayrollService. These services are all inproc services. QBit supports WebSocket, HTTP and REST remote services as well, but for now, let's focus on inproc services. If you understand inproc then you will understand remote. All four services are doing some kind of work to an employee that looks like this:
public Employee(String firstName, int employeeId) {
this.firstName = firstName;
this.employeeId = employeeId;
}
Here are the getters for this employee object:
public String getFirstName() {
return firstName;
}
public int getEmployeeId() {
return employeeId;
}
@Override
public String toString() {
return "Employee{" +
"firstName='" + firstName + '\'' +
", employeeId=" + employeeId +
'}';
}
This example has two channels:
public static final String NEW_HIRE_CHANNEL = "com.mycompnay.employee.new";
public static final String PAYROLL_ADJUSTMENT_CHANNEL = "com.mycompnay.employee.payroll";
The first channel NEW_HIRE_CHANNEL is where we send new employee objects when they are hired. A whole slew of services could be listening to this channel, in this example we have the following two services listing on this channel BenefitsService, and VolunteerService.
The EmployeeHiringService actually fires off the events to the services:
public static class EmployeeHiringService {
public void hireEmployee(final Employee employee) {
int salary = 100;
System.out.printf("Hired employee %s\n", employee);
//Does stuff to hire employee
//Sends events
final EventManager eventManager = serviceContext().eventManager();
eventManager.send(NEW_HIRE_CHANNEL, employee);
eventManager.sendArray(PAYROLL_ADJUSTMENT_CHANNEL, employee, salary);
}
When working inside of a QBit Service, you can access the event manager using serviceContext().eventManager(). If you access it this way, the flushing is taken care of for you. Flushing messages to other services in batches helps with the performance. You have to flush after you use a client proxy. The eventManager() method returns a client proxy. When running inside of QBit, you do not have to flush, it is done for you at the time when/where you will get the most performance out of the system. This is what allows the event manager to send so many messages in such a short period of time. Not only send the messages but enqueue them on other service queues.
Notice that we call sendArray so we can send the employee and their salary. The listener for PAYROLL_ADJUSTMENT_CHANNEL (which is PayrollService) will have to handle both an employee and an int that represents the new employees salary. Here are the other three services that are listening:
public static class BenefitsService {
@OnEvent(NEW_HIRE_CHANNEL)
public void enroll(final Employee employee) {
System.out.printf("Employee enrolled into benefits system employee %s %d\n",
employee.getFirstName(), employee.getEmployeeId());
}
public static class VolunteerService {
@OnEvent(NEW_HIRE_CHANNEL)
public void invite(final Employee employee) {
System.out.printf("Employee will be invited to the community outreach program %s %d\n",
employee.getFirstName(), employee.getEmployeeId());
}
}
public static class PayrollService {
@OnEvent(PAYROLL_ADJUSTMENT_CHANNEL)
public void addEmployeeToPayroll(final Employee employee, int salary) {
System.out.printf("Employee added to payroll %s %d %d\n",
employee.getFirstName(), employee.getEmployeeId(), salary);
}
}
The OnEvent annotation is an alternative to @Listen. We also have @Consume, @Subscribe, @Hear. You do not have to use our annotation. So if you wanted to write a service that was not tied to QBit at all, i.e., no compile time dependencies, then you would just define your own annotation called OnEvent, or Listen, or Consume or Subscribe or Hear. We believe in no compile time dependencies for your services. And no class-loader discovery magic that would tie you to Boon or QBit. There is an API. Your implementation can be divorced from QBit as much as possible.
@Target({ ElementType.METHOD, ElementType.FIELD })
@Retention(RetentionPolicy.RUNTIME)
public @interface OnEvent {
/* The channel you want to listen to. */;
String value() default "";
/* The consume is the last object listening to this event.
An event channel can have many subscribers but only one consume.
*/
boolean consume() default false;
}
So now we have all the services, remember The employee is the employee object from the EmployeeHiringService. Therefore To start things off, we need to get a client proxy to the EmployeeHiringService using the employeeHiringServiceQueue. But first lets wire all the four services into the QBit queuing apparatus. Here the wiring process:
Create the POJOs:
EmployeeHiringService employeeHiring = new EmployeeHiringService();
PayrollService payroll = new PayrollService();
BenefitsService benefits = new BenefitsService();
VolunteerService volunteering = new VolunteerService();
Wire in EmployeeHiringService, BenefitsService, VolunteerService, PayrollService into QBit.
ServiceQueue employeeHiringServiceQueue = serviceBuilder()
.setServiceObject(employeeHiring)
.setInvokeDynamic(false).build().startServiceQueue();
ServiceQueue payrollServiceQueue = serviceBuilder()
.setServiceObject(payroll)
.setInvokeDynamic(false).build().startServiceQueue();
ServiceQueue employeeBenefitsServiceQueue = serviceBuilder()
.setServiceObject(benefits)
.setInvokeDynamic(false).build().startServiceQueue();
ServiceQueue volunteeringServiceQueue = serviceBuilder()
.setServiceObject(volunteering)
.setInvokeDynamic(false).build().startServiceQueue();
The objects employeeHiringServiceQueue, payrollServiceQueue, employeeBenefitsServiceQueue, and volunteeringServiceQueue are QBit services.
To invoke a method on a QBit service, you want to get a client proxy. A client proxy will send messages to a service. The service will get those messages as method calls.
Every call is sent over a high-speed internal inproc queue. You can also use a client proxy to talk to QBit over WebSockets.
To create a proxy you use the createProxy method of Service:
EmployeeHiringServiceClient employeeHiringServiceClientProxy = employeeHiringServiceQueue.createProxy(EmployeeHiringServiceClient.class);
Now that we have created the proxy, we can send messages to it.
employeeHiringServiceClientProxy.hireEmployee(new Employee("Rick", 1));
Every so often, we have to flush calls to the client proxy.
The client proxy will flush calls every time the queue batch size is met. So if the queue batch size was set to 5, then it would flush every five calls. But no matter, when you are done making calls, you should flush the calls as follows:
flushServiceProxy(employeeHiringServiceClientProxy);
If you were making calls to a service in a tight loop, you may want to flush every ten calls or every 100 calls. Or you may want to flush related calls.
If you set the batch size to 1, then every method calls is flushed, but this hampers performance.
If you use the event manager service, it will get auto flushed for you but in an extremely performant way. We may provide similar support for injected client proxies into a service.
Now we are all done. Note we can add services at any time to this example by following the procedure mentioned before: create your service, create a POJO, wire it into QBit, and make it listen to the new hire event channel or whatever you decide to do. that is it. very simple.
##Full code Listing
src/main/java/io.advantageous.qbit.example/EmployeeEventExampleUsingSystemEventBus
/*
* Copyright (c) 2015. Rick Hightower, Geoff Chandler
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* QBit - The Microservice lib for Java : JSON, WebSocket, REST. Be The Web!
*/
package io.advantageous.qbit.example;
import io.advantageous.qbit.annotation.OnEvent;
import io.advantageous.qbit.events.EventManager;
import io.advantageous.qbit.service.ServiceQueue;
import io.advantageous.boon.core.Sys;
import static io.advantageous.qbit.service.ServiceBuilder.serviceBuilder;
import static io.advantageous.qbit.service.ServiceContext.serviceContext;
import static io.advantageous.qbit.service.ServiceProxyUtils.flushServiceProxy;
/**
* Created by rhightower on 2/4/15.
*/
public class EmployeeEventExampleUsingSystemEventBus {
public static final String NEW_HIRE_CHANNEL = "com.mycompnay.employee.new";
public static final String PAYROLL_ADJUSTMENT_CHANNEL = "com.mycompnay.employee.payroll";
public static void main(String... args) {
EmployeeHiringService employeeHiring = new EmployeeHiringService();
PayrollService payroll = new PayrollService();
BenefitsService benefits = new BenefitsService();
VolunteerService volunteering = new VolunteerService();
ServiceQueue employeeHiringServiceQueue = serviceBuilder()
.setServiceObject(employeeHiring)
.setInvokeDynamic(false).build().startServiceQueue();
ServiceQueue payrollServiceQueue = serviceBuilder()
.setServiceObject(payroll)
.setInvokeDynamic(false).build().startServiceQueue();
ServiceQueue employeeBenefitsServiceQueue = serviceBuilder()
.setServiceObject(benefits)
.setInvokeDynamic(false).build().startServiceQueue();
ServiceQueue volunteeringServiceQueue = serviceBuilder()
.setServiceObject(volunteering)
.setInvokeDynamic(false).build().startServiceQueue();
EmployeeHiringServiceClient employeeHiringServiceClientProxy = employeeHiringServiceQueue.createProxy(EmployeeHiringServiceClient.class);
employeeHiringServiceClientProxy.hireEmployee(new Employee("Rick", 1));
flushServiceProxy(employeeHiringServiceClientProxy);
Sys.sleep(5_000);
}
interface EmployeeHiringServiceClient {
void hireEmployee(final Employee employee);
}
public static class Employee {
final String firstName;
final int employeeId;
public Employee(String firstName, int employeeId) {
this.firstName = firstName;
this.employeeId = employeeId;
}
public String getFirstName() {
return firstName;
}
public int getEmployeeId() {
return employeeId;
}
@Override
public String toString() {
return "Employee{" +
"firstName='" + firstName + '\'' +
", employeeId=" + employeeId +
'}';
}
}
public static class EmployeeHiringService {
public void hireEmployee(final Employee employee) {
int salary = 100;
System.out.printf("Hired employee %s\n", employee);
//Does stuff to hire employee
//Sends events
final EventManager eventManager = serviceContext().eventManager();
eventManager.send(NEW_HIRE_CHANNEL, employee);
eventManager.sendArray(PAYROLL_ADJUSTMENT_CHANNEL, employee, salary);
}
}
public static class BenefitsService {
@OnEvent(NEW_HIRE_CHANNEL)
public void enroll(final Employee employee) {
System.out.printf("Employee enrolled into benefits system employee %s %d\n",
employee.getFirstName(), employee.getEmployeeId());
}
}
public static class VolunteerService {
@OnEvent(NEW_HIRE_CHANNEL)
public void invite(final Employee employee) {
System.out.printf("Employee will be invited to the community outreach program %s %d\n",
employee.getFirstName(), employee.getEmployeeId());
}
}
public static class PayrollService {
@OnEvent(PAYROLL_ADJUSTMENT_CHANNEL)
public void addEmployeeToPayroll(final Employee employee, int salary) {
System.out.printf("Employee added to payroll %s %d %d\n",
employee.getFirstName(), employee.getEmployeeId(), salary);
}
}
}
With your terminal cd event-bus-system-qbit
then gradle clean build
then gradle run
and you should get the following:
Hired employee Employee{firstName='Rick', employeeId=1}
Employee added to payroll Rick 1 100
Employee enrolled into benefits system employee Rick 1
Employee will be invited to the community outreach program Rick 1
QBit has Java objects that might not know anything about the other services running in the same JVM but still be able to communicate via the event bus. We can keep adding stuff that keep listening.
You can have more than one event bus btw. An event bus is just another QBit service. It takes three lines of code to create a high-speed event bus. So each module could have its own event bus, and then use the system event bus to send messages between modules. You have built and tested "The Employee example" to learn about QBit's event bus system, see you in the next tutorial!
QBit Website What is Microservices Architecture?
QBit Java Micorservices lib tutorials
The Java microservice lib. QBit is a reactive programming lib for building microservices - JSON, HTTP, WebSocket, and REST. QBit uses reactive programming to build elastic REST, and WebSockets based cloud friendly, web services. SOA evolved for mobile and cloud. ServiceDiscovery, Health, reactive StatService, events, Java idiomatic reactive programming for Microservices.
Reactive Programming, Java Microservices, Rick Hightower
Java Microservices Architecture
[Microservice Service Discovery with Consul] (http://www.mammatustech.com/Microservice-Service-Discovery-with-Consul)
Microservices Service Discovery Tutorial with Consul
[Reactive Microservices] (http://www.mammatustech.com/reactive-microservices)
[High Speed Microservices] (http://www.mammatustech.com/high-speed-microservices)
Reactive Microservices Tutorial, using the Reactor
QBit is mentioned in the Restlet blog
All code is written using JetBrains Idea - the best IDE ever!
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting
Tutorials
- QBit tutorials
- Microservices Intro
- Microservice KPI Monitoring
- Microservice Batteries Included
- RESTful APIs
- QBit and Reakt Promises
- Resourceful REST
- Microservices Reactor
- Working with JSON maps and lists
__
Docs
Getting Started
- First REST Microservice
- REST Microservice Part 2
- ServiceQueue
- ServiceBundle
- ServiceEndpointServer
- REST with URI Params
- Simple Single Page App
Basics
- What is QBit?
- Detailed Overview of QBit
- High level overview
- Low-level HTTP and WebSocket
- Low level WebSocket
- HttpClient
- HTTP Request filter
- HTTP Proxy
- Queues and flushing
- Local Proxies
- ServiceQueue remote and local
- ManagedServiceBuilder, consul, StatsD, Swagger support
- Working with Service Pools
- Callback Builders
- Error Handling
- Health System
- Stats System
- Reactor callback coordination
- Early Service Examples
Concepts
REST
Callbacks and Reactor
Event Bus
Advanced
Integration
- Using QBit in Vert.x
- Reactor-Integrating with Cassandra
- Using QBit with Spring Boot
- SolrJ and service pools
- Swagger support
- MDC Support
- Reactive Streams
- Mesos, Docker, Heroku
- DNS SRV
QBit case studies
QBit 2 Roadmap
-- Related Projects
- QBit Reactive Microservices
- Reakt Reactive Java
- Reakt Guava Bridge
- QBit Extensions
- Reactive Microservices
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting